You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/29 06:34:45 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #17537: [FLINK-19722][table-planner]Pushdown Watermark to SourceProvider (new Source)

godfreyhe commented on a change in pull request #17537:
URL: https://github.com/apache/flink/pull/17537#discussion_r738902990



##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -18,100 +18,43 @@
 
 package org.apache.flink.table.planner.factories;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.WatermarkSpec;
-import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.RuntimeConverter;
-import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.OutputFormatProvider;
-import org.apache.flink.table.connector.sink.SinkFunctionProvider;
-import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
-import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
-import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
-import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.InputFormatProvider;
-import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
-import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
-import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
-import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
-import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
-import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
-import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
-import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.functions.AsyncTableFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AppendingOutputFormat;
-import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AppendingSinkFunction;
-import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AsyncTestValueLookupFunction;
-import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.KeyedUpsertingSinkFunction;
-import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.RetractingSinkFunction;
-import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestValuesLookupFunction;
-import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
-import org.apache.flink.table.planner.utils.FilterUtils;
+import org.apache.flink.table.planner.factories.dynamictablesink.TestValuesTableSink;

Review comment:
       dynamictablesink and sinkfunction can be unified into sink, dynamictablesource, sourcefunction, lookupfunction and source can be unified into source

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -237,94 +188,93 @@ private static RowKind parseRowKind(String rowKindShortString) {
 
     public static final String IDENTIFIER = "values";
 
-    private static final ConfigOption<String> DATA_ID =
+    public static final ConfigOption<String> DATA_ID =
             ConfigOptions.key("data-id").stringType().noDefaultValue();
 
-    private static final ConfigOption<Boolean> BOUNDED =
+    public static final ConfigOption<Boolean> BOUNDED =
             ConfigOptions.key("bounded").booleanType().defaultValue(false);
 
-    private static final ConfigOption<String> CHANGELOG_MODE =
+    public static final ConfigOption<String> CHANGELOG_MODE =
             ConfigOptions.key("changelog-mode")
                     .stringType()
                     .defaultValue("I"); // all available "I,UA,UB,D"
 
-    private static final ConfigOption<String> RUNTIME_SOURCE =
+    public static final ConfigOption<String> RUNTIME_SOURCE =
             ConfigOptions.key("runtime-source")
                     .stringType()
-                    .defaultValue("SourceFunction"); // another is "InputFormat"
+                    .defaultValue("SourceFunction"); // others are "InputFormat" and "Source"
 
-    private static final ConfigOption<Boolean> FAILING_SOURCE =
+    public static final ConfigOption<Boolean> FAILING_SOURCE =
             ConfigOptions.key("failing-source").booleanType().defaultValue(false);
 
-    private static final ConfigOption<String> RUNTIME_SINK =
-            ConfigOptions.key("runtime-sink")
-                    .stringType()
-                    .defaultValue("SinkFunction"); // another is "OutputFormat"
+    public static final ConfigOption<String> RUNTIME_SINK =
+            ConfigOptions.key("runtime-sink").stringType().defaultValue("SinkFunction");
+    // others are "OutputFormat" and "SinkWithCollectingWatermark"
 
-    private static final ConfigOption<String> TABLE_SOURCE_CLASS =
+    public static final ConfigOption<String> TABLE_SOURCE_CLASS =
             ConfigOptions.key("table-source-class")
                     .stringType()
                     .defaultValue("DEFAULT"); // class path which implements DynamicTableSource
 
-    private static final ConfigOption<String> TABLE_SINK_CLASS =
+    public static final ConfigOption<String> TABLE_SINK_CLASS =
             ConfigOptions.key("table-sink-class")
                     .stringType()
                     .defaultValue("DEFAULT"); // class path which implements DynamicTableSink
 
-    private static final ConfigOption<String> LOOKUP_FUNCTION_CLASS =
+    public static final ConfigOption<String> LOOKUP_FUNCTION_CLASS =
             ConfigOptions.key("lookup-function-class").stringType().noDefaultValue();
 
-    private static final ConfigOption<Boolean> ASYNC_ENABLED =
+    public static final ConfigOption<Boolean> ASYNC_ENABLED =
             ConfigOptions.key("async").booleanType().defaultValue(false);
 
-    private static final ConfigOption<Boolean> DISABLE_LOOKUP =
-            ConfigOptions.key("disable-lookup").booleanType().defaultValue(false);
+    public static final ConfigOption<Boolean> ENABLE_LOOKUP =
+            ConfigOptions.key("enable-lookup").booleanType().defaultValue(true);

Review comment:
       the default value is false ? for most tests, it is used for scan table source

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/source/TestValuesSource.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/** A data source implementation for {@link TestValuesTableFactory}. */
+public class TestValuesSource
+        implements Source<
+                RowData,
+                TestValuesSource.TestValuesSplit,
+                Collection<TestValuesSource.TestValuesSplit>> {
+
+    private transient Iterable<RowData> elements;
+
+    private int elementNums;

Review comment:
       nit: mark it as final

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/source/TestValuesSource.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/** A data source implementation for {@link TestValuesTableFactory}. */
+public class TestValuesSource
+        implements Source<
+                RowData,
+                TestValuesSource.TestValuesSplit,
+                Collection<TestValuesSource.TestValuesSplit>> {
+
+    private transient Iterable<RowData> elements;
+
+    private int elementNums;
+
+    private byte[] elementsSerialized;
+
+    private TypeSerializer<RowData> serializer;

Review comment:
       nit: ditto

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
##########
@@ -19,56 +19,17 @@
 package org.apache.flink.table.planner.factories;
 
 import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkGenerator;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.functions.AsyncTableFunction;
-import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.test.util.SuccessException;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.types.RowUtils;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.apache.flink.table.planner.factories.TestValuesTableFactory.RESOURCE_COUNTER;
-import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Runtime function implementations for {@link TestValuesTableFactory}. */
-final class TestValuesRuntimeFunctions {
+public final class TestValuesRuntimeFunctions {

Review comment:
       This class is util now, we can give it a new name and update the java-doc

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
##########
@@ -536,7 +536,7 @@ private void writeChangelogToUpsertKafkaWithMetadata(String userTable) throws Ex
                                 + "  'connector' = 'values',"
                                 + "  'data-id' = '%s',"
                                 + "  'changelog-mode' = 'UA,D',"
-                                + "  'disable-lookup' = 'true'"
+                                + "  'enable-lookup' = 'false'"

Review comment:
       we can remove this, if the default value of `enable-lookup` is false




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org