You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/24 09:25:01 UTC

[flink] branch release-1.9 updated: [FLINK-13341][table][connectors] StreamTableSink#consumeDataStream returns DataStreamSink when using blink planner.

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new c7ddabe  [FLINK-13341][table][connectors] StreamTableSink#consumeDataStream returns DataStreamSink when using blink planner.
c7ddabe is described below

commit c7ddabe66d17263336a7ad9176c1ef566c247167
Author: chenqi <ch...@youzan.com>
AuthorDate: Sat Jul 20 23:22:43 2019 +0800

    [FLINK-13341][table][connectors] StreamTableSink#consumeDataStream returns DataStreamSink when using blink planner.
    
    This closes #9186
---
 .../cassandra/CassandraAppendTableSink.java        | 31 +++++++++++++++-------
 .../ElasticsearchUpsertTableSinkBase.java          | 10 +++++--
 .../connectors/kafka/KafkaTableSinkBase.java       | 10 +++++--
 .../api/java/io/jdbc/JDBCAppendTableSink.java      | 12 ++++++---
 4 files changed, 47 insertions(+), 16 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
index 443a780..edf8c88 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.cassandra;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.utils.TableConnectorUtils;
 import org.apache.flink.types.Row;
@@ -76,15 +78,26 @@ public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
 	}
 
 	@Override
-	public void emitDataStream(DataStream<Row> dataStream) {
-		try {
-			CassandraSink.addSink(dataStream)
-				.setClusterBuilder(this.builder)
-				.setQuery(this.cql)
-				.build()
-				.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
-		} catch (Exception e) {
-			throw new RuntimeException(e);
+	public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+		if (!(dataStream.getType() instanceof RowTypeInfo)) {
+			throw new TableException("No support for the type of the given DataStream: " + dataStream.getType());
 		}
+
+		CassandraRowSink sink = new CassandraRowSink(
+			dataStream.getType().getArity(),
+			cql,
+			builder,
+			CassandraSinkBaseConfig.newBuilder().build(),
+			new NoOpCassandraFailureHandler());
+
+		return dataStream
+				.addSink(sink)
+				.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
+
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Row> dataStream) {
+		consumeDataStream(dataStream);
 	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
index 045d3a6..427bb6e 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
@@ -168,7 +169,7 @@ public abstract class ElasticsearchUpsertTableSinkBase implements UpsertStreamTa
 	}
 
 	@Override
-	public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+	public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
 		final ElasticsearchUpsertSinkFunction upsertFunction =
 			new ElasticsearchUpsertSinkFunction(
 				index,
@@ -184,11 +185,16 @@ public abstract class ElasticsearchUpsertTableSinkBase implements UpsertStreamTa
 			failureHandler,
 			sinkOptions,
 			upsertFunction);
-		dataStream.addSink(sinkFunction)
+		return dataStream.addSink(sinkFunction)
 			.name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
 	}
 
 	@Override
+	public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+		consumeDataStream(dataStream);
+	}
+
+	@Override
 	public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
 		return Types.TUPLE(Types.BOOLEAN, getRecordType());
 	}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
index ce46eb6..ecadace 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.TableSchema;
@@ -89,13 +90,18 @@ public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> {
 		Optional<FlinkKafkaPartitioner<Row>> partitioner);
 
 	@Override
-	public void emitDataStream(DataStream<Row> dataStream) {
+	public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
 		final SinkFunction<Row> kafkaProducer = createKafkaProducer(
 			topic,
 			properties,
 			serializationSchema,
 			partitioner);
-		dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
+		return dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Row> dataStream) {
+		consumeDataStream(dataStream);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
index 5df66f6..c53bbc1 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.BatchTableSink;
 import org.apache.flink.table.sinks.TableSink;
@@ -60,10 +61,15 @@ public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTab
 	}
 
 	@Override
+	public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+		return dataStream
+			.addSink(new JDBCSinkFunction(outputFormat))
+			.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
+	}
+
+	@Override
 	public void emitDataStream(DataStream<Row> dataStream) {
-		dataStream
-				.addSink(new JDBCSinkFunction(outputFormat))
-				.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
+		consumeDataStream(dataStream);
 	}
 
 	@Override