You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/24 09:25:01 UTC
[flink] branch release-1.9 updated:
[FLINK-13341][table][connectors] StreamTableSink#consumeDataStream returns
DataStreamSink when using blink planner.
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new c7ddabe [FLINK-13341][table][connectors] StreamTableSink#consumeDataStream returns DataStreamSink when using blink planner.
c7ddabe is described below
commit c7ddabe66d17263336a7ad9176c1ef566c247167
Author: chenqi <ch...@youzan.com>
AuthorDate: Sat Jul 20 23:22:43 2019 +0800
[FLINK-13341][table][connectors] StreamTableSink#consumeDataStream returns DataStreamSink when using blink planner.
This closes #9186
---
.../cassandra/CassandraAppendTableSink.java | 31 +++++++++++++++-------
.../ElasticsearchUpsertTableSinkBase.java | 10 +++++--
.../connectors/kafka/KafkaTableSinkBase.java | 10 +++++--
.../api/java/io/jdbc/JDBCAppendTableSink.java | 12 ++++++---
4 files changed, 47 insertions(+), 16 deletions(-)
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
index 443a780..edf8c88 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.cassandra;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.types.Row;
@@ -76,15 +78,26 @@ public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
}
@Override
- public void emitDataStream(DataStream<Row> dataStream) {
- try {
- CassandraSink.addSink(dataStream)
- .setClusterBuilder(this.builder)
- .setQuery(this.cql)
- .build()
- .name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
- } catch (Exception e) {
- throw new RuntimeException(e);
+ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+ if (!(dataStream.getType() instanceof RowTypeInfo)) {
+ throw new TableException("No support for the type of the given DataStream: " + dataStream.getType());
}
+
+ CassandraRowSink sink = new CassandraRowSink(
+ dataStream.getType().getArity(),
+ cql,
+ builder,
+ CassandraSinkBaseConfig.newBuilder().build(),
+ new NoOpCassandraFailureHandler());
+
+ return dataStream
+ .addSink(sink)
+ .name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
+
+ }
+
+ @Override
+ public void emitDataStream(DataStream<Row> dataStream) {
+ consumeDataStream(dataStream);
}
}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
index 045d3a6..427bb6e 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
@@ -168,7 +169,7 @@ public abstract class ElasticsearchUpsertTableSinkBase implements UpsertStreamTa
}
@Override
- public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
final ElasticsearchUpsertSinkFunction upsertFunction =
new ElasticsearchUpsertSinkFunction(
index,
@@ -184,11 +185,16 @@ public abstract class ElasticsearchUpsertTableSinkBase implements UpsertStreamTa
failureHandler,
sinkOptions,
upsertFunction);
- dataStream.addSink(sinkFunction)
+ return dataStream.addSink(sinkFunction)
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}
@Override
+ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+ consumeDataStream(dataStream);
+ }
+
+ @Override
public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
return Types.TUPLE(Types.BOOLEAN, getRecordType());
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
index ce46eb6..ecadace 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableSchema;
@@ -89,13 +90,18 @@ public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> {
Optional<FlinkKafkaPartitioner<Row>> partitioner);
@Override
- public void emitDataStream(DataStream<Row> dataStream) {
+ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
final SinkFunction<Row> kafkaProducer = createKafkaProducer(
topic,
properties,
serializationSchema,
partitioner);
- dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
+ return dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
+ }
+
+ @Override
+ public void emitDataStream(DataStream<Row> dataStream) {
+ consumeDataStream(dataStream);
}
@Override
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
index 5df66f6..c53bbc1 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.BatchTableSink;
import org.apache.flink.table.sinks.TableSink;
@@ -60,10 +61,15 @@ public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTab
}
@Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+ return dataStream
+ .addSink(new JDBCSinkFunction(outputFormat))
+ .name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
+ }
+
+ @Override
public void emitDataStream(DataStream<Row> dataStream) {
- dataStream
- .addSink(new JDBCSinkFunction(outputFormat))
- .name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
+ consumeDataStream(dataStream);
}
@Override