You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/18 09:04:51 UTC

[GitHub] [flink] TsReaper commented on a change in pull request #12199: [FLINK-17774] [table] supports all kinds of changes for select result

TsReaper commented on a change in pull request #12199:
URL: https://github.com/apache/flink/pull/12199#discussion_r426454791



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/BatchSelectTableSink.java
##########
@@ -43,71 +43,68 @@
 import java.util.concurrent.ExecutionException;
 
 /**
- * A {@link SelectTableSink} for batch select job.
+ * A {@link StreamTableSink} for batch select job to collect the result to local.
  *
  * <p><strong>NOTES:</strong> This is a temporary solution,
  * once FLINK-14807 is finished, the implementation should be changed.
  */
-public class BatchSelectTableSink implements StreamTableSink<Row>, SelectTableSink {
-	private final TableSchema tableSchema;
+public class BatchSelectTableSink extends SelectTableSinkBase<RowData> implements StreamTableSink<RowData> {
 	private final String accumulatorName;
-	private final TypeSerializer<Row> typeSerializer;
-	private JobClient jobClient;
+	private final TypeSerializer<RowData> typeSerializer;
 
 	@SuppressWarnings("unchecked")
 	public BatchSelectTableSink(TableSchema tableSchema) {
-		this.tableSchema = SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp(
-				SelectTableSinkSchemaConverter.changeDefaultConversionClass(tableSchema));
+		super(tableSchema);
 		this.accumulatorName = new AbstractID().toString();
-		this.typeSerializer = (TypeSerializer<Row>) TypeInfoDataTypeConverter
-				.fromDataTypeToTypeInfo(this.tableSchema.toRowDataType())
-				.createSerializer(new ExecutionConfig());
+		this.typeSerializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
 	}
 
 	@Override
-	public DataType getConsumedDataType() {
-		return tableSchema.toRowDataType();
+	public TypeInformation<RowData> getOutputType() {
+		return rowDataTypeInfo;
 	}
 
 	@Override
-	public TableSchema getTableSchema() {
-		return tableSchema;
-	}
-
-	@Override
-	public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+	public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
 		return dataStream.writeUsingOutputFormat(
 				new Utils.CollectHelper<>(accumulatorName, typeSerializer))
 				.name("Batch select table sink")
 				.setParallelism(1);
 	}
 
 	@Override
-	public void setJobClient(JobClient jobClient) {
-		this.jobClient = Preconditions.checkNotNull(jobClient, "jobClient should not be null");
-	}
+	public SelectResultProvider getSelectResultProvider() {
+		return new SelectResultProvider() {

Review comment:
       Move into an inner class. The implementation is too long for an anonymous class.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/StreamSelectTableSink.java
##########
@@ -66,30 +66,44 @@ public StreamSelectTableSink(TableSchema tableSchema) {
 	}
 
 	@Override
-	public DataType getConsumedDataType() {
-		return tableSchema.toRowDataType();
+	public TypeInformation<RowData> getRecordType() {
+		return rowDataTypeInfo;
 	}
 
 	@Override
-	public TableSchema getTableSchema() {
-		return tableSchema;
-	}
-
-	@Override
-	public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+	public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, RowData>> dataStream) {
 		return dataStream
 				.addSink(new CollectSink<>(iterator.getBindAddress(), iterator.getPort(), typeSerializer))
 				.name("Streaming select table sink")
 				.setParallelism(1);
 	}
 
 	@Override
-	public void setJobClient(JobClient jobClient) {
-	}
+	public SelectResultProvider getSelectResultProvider() {
+		return new SelectResultProvider() {

Review comment:
       Also move this to an inner class.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/StreamSelectTableSink.java
##########
@@ -19,44 +19,44 @@
 package org.apache.flink.table.planner.sinks;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.experimental.CollectSink;
 import org.apache.flink.streaming.experimental.SocketStreamIterator;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.internal.SelectTableSink;
-import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.api.internal.SelectResultProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
 import org.apache.flink.types.Row;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Iterator;
 
 /**
- * A {@link SelectTableSink} for streaming select job.
+ * A {@link RetractStreamTableSink} for streaming select job to collect the result to local.
  *
  * <p><strong>NOTES:</strong> This is a temporary solution,
  * once FLINK-14807 is finished, the implementation should be changed.
- * Currently, only insert changes (AppendStreamTableSink) is supported.
- * Once FLINK-16998 is finished, all kinds of changes will be supported.
  */
-public class StreamSelectTableSink implements AppendStreamTableSink<Row>, SelectTableSink {
-	private final TableSchema tableSchema;
-	private final TypeSerializer<Row> typeSerializer;
-	private final SocketStreamIterator<Row> iterator;
+public class StreamSelectTableSink
+		extends SelectTableSinkBase<Tuple2<Boolean, RowData>>
+		implements RetractStreamTableSink<RowData> {
+	private final TypeSerializer<Tuple2<Boolean, RowData>> typeSerializer;
+	private final SocketStreamIterator<Tuple2<Boolean, RowData>> iterator;
 
 	@SuppressWarnings("unchecked")
 	public StreamSelectTableSink(TableSchema tableSchema) {
-		this.tableSchema = SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp(
-				SelectTableSinkSchemaConverter.changeDefaultConversionClass(tableSchema));
-		this.typeSerializer = (TypeSerializer<Row>) TypeInfoDataTypeConverter
-				.fromDataTypeToTypeInfo(this.tableSchema.toRowDataType())
-				.createSerializer(new ExecutionConfig());
+		super(tableSchema);
+		TypeInformation<Tuple2<Boolean, RowData>> tupleTypeInfo = new TupleTypeInfo<>(Types.BOOLEAN, rowDataTypeInfo);

Review comment:
       Add comments to state that we're not using this boolean value and we define this only because of `RetractStreamTableSink`.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java
##########
@@ -74,33 +79,39 @@ public TableSchema getTableSchema() {
 				.setParallelism(1);
 	}
 
-	@Override
-	public void setJobClient(JobClient jobClient) {
-		this.jobClient = Preconditions.checkNotNull(jobClient, "jobClient should not be null");
-	}
+	public SelectResultProvider getSelectResultProvider() {
+		return new SelectResultProvider() {

Review comment:
       ditto.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java
##########
@@ -58,29 +61,61 @@ public StreamSelectTableSink(TableSchema tableSchema) {
 	}
 
 	@Override
-	public DataType getConsumedDataType() {
-		return tableSchema.toRowDataType();
+	public TableSchema getTableSchema() {
+		return tableSchema;
 	}
 
 	@Override
-	public TableSchema getTableSchema() {
-		return tableSchema;
+	public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public TypeInformation<Row> getRecordType() {
+		return tableSchema.toRowType();
+	}
+
+	@Override
+	public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
+		return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
 	}
 
 	@Override
-	public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+	public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
 		return dataStream
 				.addSink(new CollectSink<>(iterator.getBindAddress(), iterator.getPort(), typeSerializer))
 				.name("Streaming select table sink")
 				.setParallelism(1);
 	}
 
-	@Override
-	public void setJobClient(JobClient jobClient) {
-	}
+	public SelectResultProvider getSelectResultProvider() {
+		return new SelectResultProvider() {

Review comment:
       ditto.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org