You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/11 07:14:00 UTC

[GitHub] [flink-table-store] LadyForest opened a new pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

LadyForest opened a new pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41


   This is a draft version


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826527466



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -184,9 +286,42 @@ private static Path tablePath(Map<String, String> options, ObjectIdentifier iden
                         identifier.getObjectName()));
     }
 
-    private static boolean enableChangeTracking(Map<String, String> options) {
+    @VisibleForTesting
+    static boolean enableChangeTracking(Map<String, String> options) {
         return Boolean.parseBoolean(
                 options.getOrDefault(
                         CHANGE_TRACKING.key(), CHANGE_TRACKING.defaultValue().toString()));
     }
+
+    private static DynamicTableFactory.Context createLogStoreContext(
+            DynamicTableFactory.Context context) {
+        return new FactoryUtil.DefaultDynamicTableContext(
+                context.getObjectIdentifier(),
+                context.getCatalogTable()
+                        .copy(filterLogStoreOptions(context.getCatalogTable().getOptions())),
+                filterLogStoreOptions(context.getEnrichmentOptions()),
+                context.getConfiguration(),
+                context.getClassLoader(),
+                context.isTemporary());
+    }
+
+    private static LogStoreTableFactory createLogStoreTableFactory() {
+        return discoverLogStoreFactory(
+                Thread.currentThread().getContextClassLoader(),
+                TableStoreFactoryOptions.LOG_SYSTEM.defaultValue());
+    }
+
+    private TableStore buildTableStore(Context context) {
+        ResolvedCatalogTable catalogTable = context.getCatalogTable();
+        ResolvedSchema schema = catalogTable.getResolvedSchema();
+        return new TableStore(
+                        Configuration.fromMap(filterFileStoreOptions(catalogTable.getOptions())))
+                .withTableIdentifier(context.getObjectIdentifier())
+                .withPrimaryKeys(context.getPrimaryKeyIndexes())
+                .withPartitions(
+                        catalogTable.getPartitionKeys().stream()
+                                .mapToInt(schema.getColumnNames()::indexOf)

Review comment:
       Keep schema consistent, physical type is used below, physical type should be used here as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r824468587



##########
File path: flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
##########
@@ -92,7 +92,6 @@ protected void doStart() {
     @Before
     public void setup() {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env);

Review comment:
       Why remove this line?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r825698967



##########
File path: flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
##########
@@ -92,7 +92,6 @@ protected void doStart() {
     @Before
     public void setup() {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env);

Review comment:
       You can just reinit it, It is not good to leave a null to a subclass




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826525653



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -129,6 +155,85 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
         throw new UnsupportedOperationException("Not implement yet");
     }
 
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        boolean streaming =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
+        LogSourceProvider logSourceProvider = null;
+        if (enableChangeTracking(context.getCatalogTable().getOptions())) {
+            logSourceProvider =
+                    createLogStoreTableFactory()
+                            .createSourceProvider(
+                                    createLogStoreContext(context),
+                                    new LogStoreTableFactory.SourceContext() {
+                                        @Override
+                                        public <T> TypeInformation<T> createTypeInformation(
+                                                DataType producedDataType) {
+                                            return createTypeInformation(
+                                                    TypeConversions.fromDataToLogicalType(
+                                                            producedDataType));
+                                        }
+
+                                        @Override
+                                        public <T> TypeInformation<T> createTypeInformation(
+                                                LogicalType producedLogicalType) {
+                                            return InternalTypeInfo.of(producedLogicalType);
+                                        }
+
+                                        @Override
+                                        public DynamicTableSource.DataStructureConverter
+                                                createDataStructureConverter(
+                                                        DataType producedDataType) {
+                                            return ScanRuntimeProviderContext.INSTANCE
+                                                    .createDataStructureConverter(producedDataType);
+                                        }
+                                    });
+        }
+        return new StoreTableSource(buildTableStore(context), streaming, logSourceProvider);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        LogSinkProvider logSinkProvider = null;
+        if (enableChangeTracking(context.getCatalogTable().getOptions())) {
+            filterLogStoreOptions(context.getCatalogTable().getOptions());
+            logSinkProvider =
+                    createLogStoreTableFactory()
+                            .createSinkProvider(
+                                    createLogStoreContext(context),
+                                    new LogStoreTableFactory.SinkContext() {

Review comment:
       Just wrap `Context` too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r825539312



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -65,8 +69,8 @@
 
     @Override
     public void onCreateTable(Context context, boolean ignoreIfExists) {
-        Map<String, String> options = context.getCatalogTable().getOptions();

Review comment:
       `onCreate` and other methods has its duties, it doesn't even need to have the concept of enriched, which I think is redundant.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
LadyForest commented on pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#issuecomment-1064860952


   > You can use the `TableStore` class directly.
   
   Yup. I just rebased FLINK-26530 one sec and haven't started to migrate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r824469259



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -65,8 +69,8 @@
 
     @Override
     public void onCreateTable(Context context, boolean ignoreIfExists) {
-        Map<String, String> options = context.getCatalogTable().getOptions();

Review comment:
       From the developer's perspective, we all know that `#onCreate` and `#onDrop` get called only after `#enrichOptions` is executed. While from the API layer, it's hard to tell they have an order (except taking a reference to Flink's codebase). That's why I change the name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r825675632



##########
File path: flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
##########
@@ -92,7 +92,6 @@ protected void doStart() {
     @Before
     public void setup() {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env);

Review comment:
       > But it contains `StreamExecutionEnvironment` and `StreamTableEnvironment`, it has inited `StreamExecutionEnvironment`, why no `StreamTableEnvironment`?
   
   cause I want to create `tEnv` conditionally according to junit parameter . Like
   ```java
   env.setRuntimeMode(executionMode);
   if (executionMode == STRAMING) {
       env.enableCheckpointing(100);
   }
   tEnv = StreamTableEnvironment.create(env);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826723223



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -84,14 +109,15 @@ public void onCreateTable(Context context, boolean ignoreIfExists) {
                                 context.getObjectIdentifier().asSerializableString()));
             }
             path.getFileSystem().mkdirs(path);
+            options.put(TABLE_PATH.key(), path.getPath());

Review comment:
       > I agree we can keep table path internal. But I think we should remove `TABLE_PATH`. This is weird. We can pass `tableIdentifier` to `FileStoreOptions`.
   
   Sure. This is to be resolved in https://github.com/apache/flink-table-store/pull/45




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826899642



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
+ * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link FileStoreSource} and kafka
+ * log source created by {@link LogSourceProvider}.
+ */
+public class TableStoreSource
+        implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    @Nullable private List<ResolvedExpression> partitionFilters;
+    @Nullable private List<ResolvedExpression> fieldFilters;
+    @Nullable private int[][] projectFields;
+
+    public TableStoreSource(
+            TableStore tableStore,
+            boolean streaming,
+            @Nullable DynamicTableFactory.Context logStoreContext,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        this.tableStore = tableStore;
+        this.streaming = streaming;
+        this.logStoreContext = logStoreContext;
+        this.logStoreTableFactory = logStoreTableFactory;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return streaming
+                ? tableStore.valueCountMode()
+                        ? ChangelogMode.all()
+                        // TODO: optimize upsert when consistency mode is transactional and
+                        // log.changelog-mode is all
+                        : ChangelogMode.upsert()
+                : ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        LogSourceProvider logSourceProvider = null;
+        if (logStoreTableFactory != null) {
+            logSourceProvider =
+                    logStoreTableFactory.createSourceProvider(
+                            logStoreContext,
+                            new LogStoreTableFactory.SourceContext() {
+                                @Override
+                                public <T> TypeInformation<T> createTypeInformation(
+                                        DataType producedDataType) {
+                                    return scanContext.createTypeInformation(producedDataType);
+                                }
+
+                                @Override
+                                public <T> TypeInformation<T> createTypeInformation(
+                                        LogicalType producedLogicalType) {
+                                    return scanContext.createTypeInformation(producedLogicalType);
+                                }
+
+                                @Override
+                                public DataStructureConverter createDataStructureConverter(
+                                        DataType producedDataType) {
+                                    return scanContext.createDataStructureConverter(
+                                            producedDataType);
+                                }
+                            });
+        }
+        TableStore.SourceBuilder builder =
+                tableStore
+                        .sourceBuilder()
+                        .withContinuousMode(streaming)
+                        .withHybridMode(streaming && logSourceProvider != null)
+                        .withLogSourceProvider(logSourceProvider)
+                        .withProjection(projectFields)
+                        .withPartitionPredicate(PredicateConverter.convert(partitionFilters))
+                        .withFieldPredicate(PredicateConverter.convert(fieldFilters));
+        return SourceProvider.of(builder.build());
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        TableStoreSource copied =
+                new TableStoreSource(tableStore, streaming, logStoreContext, logStoreTableFactory);
+        copied.partitionFilters = partitionFilters;
+        copied.fieldFilters = fieldFilters;
+        return copied;

Review comment:
       Lack `projectFields` here.
   Maybe we need unit tests for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826525388



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -129,6 +155,85 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
         throw new UnsupportedOperationException("Not implement yet");
     }
 
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        boolean streaming =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
+        LogSourceProvider logSourceProvider = null;
+        if (enableChangeTracking(context.getCatalogTable().getOptions())) {
+            logSourceProvider =
+                    createLogStoreTableFactory()
+                            .createSourceProvider(
+                                    createLogStoreContext(context),
+                                    new LogStoreTableFactory.SourceContext() {
+                                        @Override
+                                        public <T> TypeInformation<T> createTypeInformation(
+                                                DataType producedDataType) {
+                                            return createTypeInformation(
+                                                    TypeConversions.fromDataToLogicalType(
+                                                            producedDataType));
+                                        }
+
+                                        @Override
+                                        public <T> TypeInformation<T> createTypeInformation(
+                                                LogicalType producedLogicalType) {
+                                            return InternalTypeInfo.of(producedLogicalType);
+                                        }
+
+                                        @Override
+                                        public DynamicTableSource.DataStructureConverter
+                                                createDataStructureConverter(
+                                                        DataType producedDataType) {
+                                            return ScanRuntimeProviderContext.INSTANCE
+                                                    .createDataStructureConverter(producedDataType);
+                                        }
+                                    });
+        }
+        return new StoreTableSource(buildTableStore(context), streaming, logSourceProvider);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        LogSinkProvider logSinkProvider = null;
+        if (enableChangeTracking(context.getCatalogTable().getOptions())) {
+            filterLogStoreOptions(context.getCatalogTable().getOptions());

Review comment:
       ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826890536



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class TableStoreSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    private final LogOptions.LogChangelogMode logChangelogMode;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    private Map<String, String> staticPartitions = new HashMap<>();

Review comment:
       staticPartitions = null.
   Otherwise is overwrite all... Add a TODO to add cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r825538412



##########
File path: flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
##########
@@ -92,7 +92,6 @@ protected void doStart() {
     @Before
     public void setup() {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env);

Review comment:
       But it contains `StreamExecutionEnvironment` and `StreamTableEnvironment`, it has inited `StreamExecutionEnvironment`, why no `StreamTableEnvironment`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826890536



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class TableStoreSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    private final LogOptions.LogChangelogMode logChangelogMode;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    private Map<String, String> staticPartitions = new HashMap<>();

Review comment:
       staticPartitions = null.
   Otherwise is overwrite all... Add a TODO in test class to add cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826540271



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -84,14 +109,15 @@ public void onCreateTable(Context context, boolean ignoreIfExists) {
                                 context.getObjectIdentifier().asSerializableString()));
             }
             path.getFileSystem().mkdirs(path);
+            options.put(TABLE_PATH.key(), path.getPath());

Review comment:
       I agree we can keep table path internal.
   But I think we should remove `TABLE_PATH`. This is weird. We can pass `tableIdentifier` to `FileStoreOptions`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r824464510



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/utils/TableStoreUtils.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.utils;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
+
+/** Utils for {@link org.apache.flink.table.store.connector.TableStoreFactory}. */
+public class TableStoreUtils {

Review comment:
       I think it is OK to keep these methods in `TableStoreFactory`, it is not so big.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r824467515



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
##########
@@ -129,6 +129,17 @@ public String toString() {
         public InlineElement getDescription() {
             return text(description);
         }
+
+        public static LogStartupMode fromValue(String value) {

Review comment:
       enum has `valueOf` method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826888566



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class TableStoreSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    private final LogOptions.LogChangelogMode logChangelogMode;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    private Map<String, String> staticPartitions = new HashMap<>();
+    private boolean overwrite;
+    @Nullable private CatalogLock.Factory lockFactory;
+
+    public TableStoreSink(
+            TableStore tableStore,
+            boolean streaming,
+            LogOptions.LogChangelogMode logChangelogMode,
+            @Nullable DynamicTableFactory.Context logStoreContext,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        this.tableStore = tableStore;
+        this.streaming = streaming;
+        this.logChangelogMode = logChangelogMode;
+        this.logStoreContext = logStoreContext;
+        this.logStoreTableFactory = logStoreTableFactory;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        if (!tableStore.valueCountMode()
+                && logChangelogMode == LogOptions.LogChangelogMode.UPSERT) {
+            ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+            for (RowKind kind : requestedMode.getContainedKinds()) {
+                if (kind != RowKind.UPDATE_BEFORE) {
+                    builder.addContainedKind(kind);
+                }
+            }
+            return builder.build();
+        }
+        return requestedMode;
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        LogSinkProvider logSinkProvider = null;
+        if (logStoreTableFactory != null) {
+            logSinkProvider =
+                    logStoreTableFactory.createSinkProvider(
+                            logStoreContext,
+                            new LogStoreTableFactory.SinkContext() {
+                                @Override
+                                public boolean isBounded() {
+                                    return !streaming;

Review comment:
       context.isBounded, streaming is useless.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r824465815



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+
+/** End-to-end test base for table store. */
+public abstract class TableStoreTestBase extends KafkaTableTestBase {
+
+    public static final String CURRENT_CATALOG = "catalog";
+    public static final String CURRENT_DATABASE = "database";
+
+    protected final RuntimeExecutionMode executionMode;
+    protected final ObjectIdentifier tableIdentifier;
+    protected final boolean enableChangeTracking;
+    protected final ExpectedResult expectedResult;
+
+    protected String rootPath;
+
+    public TableStoreTestBase(
+            RuntimeExecutionMode executionMode,
+            String tableName,
+            boolean enableChangeTracking,
+            ExpectedResult expectedResult) {
+        this.executionMode = executionMode;
+        this.tableIdentifier = ObjectIdentifier.of(CURRENT_CATALOG, CURRENT_DATABASE, tableName);
+        this.enableChangeTracking = enableChangeTracking;
+        this.expectedResult = expectedResult;
+    }
+
+    protected abstract void prepareEnv();
+
+    @Override
+    @Before
+    public void setup() {
+        super.setup();
+        env.setRuntimeMode(executionMode);
+        if (executionMode == RuntimeExecutionMode.STREAMING) {
+            env.enableCheckpointing(100);
+        }
+        tEnv = StreamTableEnvironment.create(env);
+        ((TableEnvironmentImpl) tEnv)
+                .getCatalogManager()
+                .registerCatalog(
+                        CURRENT_CATALOG,
+                        new GenericInMemoryCatalog(CURRENT_CATALOG, CURRENT_DATABASE));
+        tEnv.useCatalog(CURRENT_CATALOG);
+        try {
+            rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+        prepareSessionContext();
+        prepareEnv();
+    }
+
+    protected void prepareSessionContext() {
+        Configuration configuration = tEnv.getConfig().getConfiguration();
+        configuration.setString(TABLE_STORE_PREFIX + FILE_PATH.key(), rootPath);
+        configuration.setString(
+                TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
+        configuration.setBoolean(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(), enableChangeTracking);
+    }
+
+    protected static ResolvedCatalogTable createResolvedTable(
+            Map<String, String> options, RowType rowType, List<String> partitionKeys, int[] pk) {
+        List<String> fieldNames = rowType.getFieldNames();
+        List<DataType> fieldDataTypes =
+                rowType.getChildren().stream()
+                        .map(TypeConversions::fromLogicalToDataType)
+                        .collect(Collectors.toList());
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromFields(fieldNames, fieldDataTypes).build(),
+                        "a comment",
+                        partitionKeys,
+                        options);
+        List<Column> resolvedColumns =
+                IntStream.range(0, fieldNames.size())
+                        .mapToObj(i -> Column.physical(fieldNames.get(i), fieldDataTypes.get(i)))
+                        .collect(Collectors.toList());
+        UniqueConstraint constraint = null;
+        if (pk.length > 0) {
+            List<String> pkNames =
+                    Arrays.stream(pk).mapToObj(fieldNames::get).collect(Collectors.toList());
+            constraint = UniqueConstraint.primaryKey("pk", pkNames);
+        }
+        return new ResolvedCatalogTable(
+                origin, new ResolvedSchema(resolvedColumns, Collections.emptyList(), constraint));
+    }
+
+    protected void deleteTablePath() {
+        try {
+            FileUtils.deleteDirectory(

Review comment:
       I think you want `FileUtils.deleteDirectoryQuietly`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826533232



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -129,6 +155,85 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
         throw new UnsupportedOperationException("Not implement yet");
     }
 
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        boolean streaming =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
+        LogSourceProvider logSourceProvider = null;
+        if (enableChangeTracking(context.getCatalogTable().getOptions())) {
+            logSourceProvider =
+                    createLogStoreTableFactory()
+                            .createSourceProvider(
+                                    createLogStoreContext(context),
+                                    new LogStoreTableFactory.SourceContext() {
+                                        @Override
+                                        public <T> TypeInformation<T> createTypeInformation(
+                                                DataType producedDataType) {
+                                            return createTypeInformation(
+                                                    TypeConversions.fromDataToLogicalType(
+                                                            producedDataType));
+                                        }
+
+                                        @Override
+                                        public <T> TypeInformation<T> createTypeInformation(
+                                                LogicalType producedLogicalType) {
+                                            return InternalTypeInfo.of(producedLogicalType);
+                                        }
+
+                                        @Override
+                                        public DynamicTableSource.DataStructureConverter
+                                                createDataStructureConverter(
+                                                        DataType producedDataType) {
+                                            return ScanRuntimeProviderContext.INSTANCE
+                                                    .createDataStructureConverter(producedDataType);
+                                        }
+                                    });
+        }
+        return new StoreTableSource(buildTableStore(context), streaming, logSourceProvider);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        LogSinkProvider logSinkProvider = null;
+        if (enableChangeTracking(context.getCatalogTable().getOptions())) {
+            filterLogStoreOptions(context.getCatalogTable().getOptions());

Review comment:
       > ?
   
   this has been resolved




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826534273



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -84,14 +109,15 @@ public void onCreateTable(Context context, boolean ignoreIfExists) {
                                 context.getObjectIdentifier().asSerializableString()));
             }
             path.getFileSystem().mkdirs(path);
+            options.put(TABLE_PATH.key(), path.getPath());

Review comment:
       > 1.I think we can enrich table path in the `enrichOptions`. 2.Maybe we can support table path configuration from user. 3.`path.getPath` is wrong, it lacks scheme, we should use `path.toString`.
   
   I think we can enrich `table.path` in `enrichOptions`, but still keep this option internal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r824471537



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+
+/** End-to-end test base for table store. */
+public abstract class TableStoreTestBase extends KafkaTableTestBase {

Review comment:
       `TableStoreTestBase` looks good~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826523559



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -84,14 +109,15 @@ public void onCreateTable(Context context, boolean ignoreIfExists) {
                                 context.getObjectIdentifier().asSerializableString()));
             }
             path.getFileSystem().mkdirs(path);
+            options.put(TABLE_PATH.key(), path.getPath());

Review comment:
       1.I think we can enrich table path in the `enrichOptions`.
   2.Maybe we can support table path configuration from user.
   3.`path.getPath` is wrong, it lacks scheme, we should use `path.toString`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826901319



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
+ * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link FileStoreSource} and kafka
+ * log source created by {@link LogSourceProvider}.
+ */
+public class TableStoreSource
+        implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    @Nullable private List<ResolvedExpression> partitionFilters;
+    @Nullable private List<ResolvedExpression> fieldFilters;
+    @Nullable private int[][] projectFields;
+
+    public TableStoreSource(
+            TableStore tableStore,
+            boolean streaming,
+            @Nullable DynamicTableFactory.Context logStoreContext,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        this.tableStore = tableStore;
+        this.streaming = streaming;
+        this.logStoreContext = logStoreContext;
+        this.logStoreTableFactory = logStoreTableFactory;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return streaming
+                ? tableStore.valueCountMode()
+                        ? ChangelogMode.all()
+                        // TODO: optimize upsert when consistency mode is transactional and
+                        // log.changelog-mode is all
+                        : ChangelogMode.upsert()
+                : ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        LogSourceProvider logSourceProvider = null;
+        if (logStoreTableFactory != null) {
+            logSourceProvider =
+                    logStoreTableFactory.createSourceProvider(
+                            logStoreContext,
+                            new LogStoreTableFactory.SourceContext() {
+                                @Override
+                                public <T> TypeInformation<T> createTypeInformation(
+                                        DataType producedDataType) {
+                                    return scanContext.createTypeInformation(producedDataType);
+                                }
+
+                                @Override
+                                public <T> TypeInformation<T> createTypeInformation(
+                                        LogicalType producedLogicalType) {
+                                    return scanContext.createTypeInformation(producedLogicalType);
+                                }
+
+                                @Override
+                                public DataStructureConverter createDataStructureConverter(
+                                        DataType producedDataType) {
+                                    return scanContext.createDataStructureConverter(
+                                            producedDataType);
+                                }
+                            });
+        }
+        TableStore.SourceBuilder builder =
+                tableStore
+                        .sourceBuilder()
+                        .withContinuousMode(streaming)
+                        .withHybridMode(streaming && logSourceProvider != null)
+                        .withLogSourceProvider(logSourceProvider)
+                        .withProjection(projectFields)
+                        .withPartitionPredicate(PredicateConverter.convert(partitionFilters))
+                        .withFieldPredicate(PredicateConverter.convert(fieldFilters));
+        return SourceProvider.of(builder.build());
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        TableStoreSource copied =
+                new TableStoreSource(tableStore, streaming, logStoreContext, logStoreTableFactory);
+        copied.partitionFilters = partitionFilters;
+        copied.fieldFilters = fieldFilters;
+        return copied;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "TableStoreSource";
+    }
+
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
+        if (tableStore.partitioned()) {
+            classifyFilters(filters);
+        } else {
+            fieldFilters = filters;
+        }
+        return Result.of(
+                new ArrayList<>(filters),
+                fieldFilters == null ? Collections.emptyList() : fieldFilters);
+    }
+
+    @Override
+    public boolean supportsNestedProjection() {
+        return false;
+    }
+
+    @Override
+    public void applyProjection(int[][] projectedFields, DataType producedDataType) {
+        this.projectFields = projectedFields;
+    }
+
+    private void classifyFilters(List<ResolvedExpression> filters) {
+        List<String> fieldNames = tableStore.fieldNames();
+        List<String> partitionKeys = tableStore.partitionKeys();
+        PartitionIndexVisitor visitor =
+                new PartitionIndexVisitor(
+                        fieldNames.stream().mapToInt(partitionKeys::indexOf).toArray());
+        filters.forEach(
+                filter -> {
+                    try {
+                        if (partitionFilters == null) {
+                            partitionFilters = new ArrayList<>();

Review comment:
       Just init with `new ArrayList<>()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826891563



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class TableStoreSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    private final LogOptions.LogChangelogMode logChangelogMode;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    private Map<String, String> staticPartitions = new HashMap<>();

Review comment:
       staticPartitions = null




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r824463678



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -65,8 +69,8 @@
 
     @Override
     public void onCreateTable(Context context, boolean ignoreIfExists) {
-        Map<String, String> options = context.getCatalogTable().getOptions();

Review comment:
       It is OK to keep options.
   All options have been enriched except `enrichOptions`, we don't need to emphasize it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r825969369



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreTableSink.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogSinkProvider;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class StoreTableSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    @Nullable private final LogSinkProvider logSinkProvider;
+
+    private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap<>();
+    private boolean overwrite;
+    @Nullable private CatalogLock.Factory lockFactory;
+
+    public StoreTableSink(TableStore tableStore, @Nullable LogSinkProvider logSinkProvider) {
+        this.tableStore = tableStore;
+        this.logSinkProvider = logSinkProvider;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+        return ChangelogMode.all();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        return (DataStreamSinkProvider)
+                (providerContext, dataStream) ->
+                        tableStore
+                                .sinkBuilder()
+                                .withInput(
+                                        new DataStream<>(
+                                                dataStream.getExecutionEnvironment(),
+                                                dataStream.getTransformation()))
+                                .withLockFactory(lockFactory)
+                                .withLogSinkProvider(logSinkProvider)
+                                .withOverwritePartition(staticPartitions)
+                                .build();
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        StoreTableSink copied = new StoreTableSink(tableStore, logSinkProvider);
+        copied.staticPartitions = new LinkedHashMap<>(staticPartitions);
+        copied.overwrite = overwrite;
+        return copied;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "StoreTableSink";

Review comment:
       "TableStore"

##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreTableSink.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogSinkProvider;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class StoreTableSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    @Nullable private final LogSinkProvider logSinkProvider;
+
+    private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap<>();
+    private boolean overwrite;
+    @Nullable private CatalogLock.Factory lockFactory;
+
+    public StoreTableSink(TableStore tableStore, @Nullable LogSinkProvider logSinkProvider) {
+        this.tableStore = tableStore;
+        this.logSinkProvider = logSinkProvider;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+        return ChangelogMode.all();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        return (DataStreamSinkProvider)
+                (providerContext, dataStream) ->
+                        tableStore
+                                .sinkBuilder()
+                                .withInput(
+                                        new DataStream<>(
+                                                dataStream.getExecutionEnvironment(),
+                                                dataStream.getTransformation()))
+                                .withLockFactory(lockFactory)
+                                .withLogSinkProvider(logSinkProvider)
+                                .withOverwritePartition(staticPartitions)
+                                .build();
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        StoreTableSink copied = new StoreTableSink(tableStore, logSinkProvider);
+        copied.staticPartitions = new LinkedHashMap<>(staticPartitions);
+        copied.overwrite = overwrite;
+        return copied;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "StoreTableSink";
+    }
+
+    @Override
+    public void applyStaticPartition(Map<String, String> partition) {

Review comment:
       Just `new HashMap<>(partition)` is OK.
   - File store don't need linked map 

##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreTableSink.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogSinkProvider;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class StoreTableSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    @Nullable private final LogSinkProvider logSinkProvider;
+
+    private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap<>();
+    private boolean overwrite;
+    @Nullable private CatalogLock.Factory lockFactory;
+
+    public StoreTableSink(TableStore tableStore, @Nullable LogSinkProvider logSinkProvider) {
+        this.tableStore = tableStore;
+        this.logSinkProvider = logSinkProvider;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+        return ChangelogMode.all();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        return (DataStreamSinkProvider)
+                (providerContext, dataStream) ->
+                        tableStore
+                                .sinkBuilder()
+                                .withInput(
+                                        new DataStream<>(
+                                                dataStream.getExecutionEnvironment(),
+                                                dataStream.getTransformation()))
+                                .withLockFactory(lockFactory)
+                                .withLogSinkProvider(logSinkProvider)
+                                .withOverwritePartition(staticPartitions)
+                                .build();
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        StoreTableSink copied = new StoreTableSink(tableStore, logSinkProvider);
+        copied.staticPartitions = new LinkedHashMap<>(staticPartitions);
+        copied.overwrite = overwrite;
+        return copied;

Review comment:
       Lack catalog lock here?

##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreTableSink.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogSinkProvider;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class StoreTableSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    @Nullable private final LogSinkProvider logSinkProvider;
+
+    private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap<>();
+    private boolean overwrite;
+    @Nullable private CatalogLock.Factory lockFactory;
+
+    public StoreTableSink(TableStore tableStore, @Nullable LogSinkProvider logSinkProvider) {
+        this.tableStore = tableStore;
+        this.logSinkProvider = logSinkProvider;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+        return ChangelogMode.all();

Review comment:
       We can take a look to `log.changelog-mode` and pk:
   - has pk: all or upsert
   - no pk: all
   
   Note here:
   - all is `return requestedMode`.
   - upsert, you can take a look to `HBaseDynamicTableSink`.

##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StoreTableSource.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
+ * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link FileStoreSource} and kafka
+ * log source created by {@link LogSourceProvider}.
+ */
+public class StoreTableSource
+        implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    @Nullable private final LogSourceProvider logSourceProvider;
+
+    @Nullable private List<ResolvedExpression> partitionFilters;
+    @Nullable private List<ResolvedExpression> fieldFilters;
+    @Nullable private int[][] projectFields;
+
+    public StoreTableSource(
+            TableStore tableStore,
+            boolean streaming,
+            @Nullable LogSourceProvider logSourceProvider) {
+        this.tableStore = tableStore;
+        this.streaming = streaming;
+        this.logSourceProvider = logSourceProvider;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return streaming
+                ? tableStore.valueCountMode() ? ChangelogMode.all() : ChangelogMode.upsert()

Review comment:
       `ChangelogMode.upsert()` can be optimized, we can add a TODO here.

##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StoreTableSource.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
+ * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link FileStoreSource} and kafka
+ * log source created by {@link LogSourceProvider}.
+ */
+public class StoreTableSource
+        implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    @Nullable private final LogSourceProvider logSourceProvider;
+
+    @Nullable private List<ResolvedExpression> partitionFilters;
+    @Nullable private List<ResolvedExpression> fieldFilters;
+    @Nullable private int[][] projectFields;
+
+    public StoreTableSource(
+            TableStore tableStore,
+            boolean streaming,
+            @Nullable LogSourceProvider logSourceProvider) {
+        this.tableStore = tableStore;
+        this.streaming = streaming;
+        this.logSourceProvider = logSourceProvider;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return streaming
+                ? tableStore.valueCountMode() ? ChangelogMode.all() : ChangelogMode.upsert()
+                : ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        TableStore.SourceBuilder builder =
+                tableStore
+                        .sourceBuilder()
+                        .withContinuousMode(streaming)
+                        .withHybridMode(streaming && logSourceProvider != null)
+                        .withLogSourceProvider(logSourceProvider)
+                        .withProjection(projectFields)
+                        .withPartitionPredicate(
+                                partitionFilters != null
+                                        ? partitionFilters.stream()
+                                                .map(
+                                                        filter ->
+                                                                filter.accept(
+                                                                        PredicateConverter
+                                                                                .CONVERTER))
+                                                .reduce(And::new)
+                                                .orElse(null)
+                                        : null)
+                        .withFieldPredicate(
+                                fieldFilters != null
+                                        ? fieldFilters.stream()
+                                                .map(
+                                                        filter ->
+                                                                filter.accept(
+                                                                        PredicateConverter
+                                                                                .CONVERTER))
+                                                .reduce(And::new)
+                                                .orElse(null)
+                                        : null);
+        return SourceProvider.of(builder.build());
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        StoreTableSource copied = new StoreTableSource(tableStore, streaming, logSourceProvider);
+        copied.partitionFilters = partitionFilters;
+        copied.fieldFilters = fieldFilters;
+        return copied;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "StoreTableSource";

Review comment:
       TableStore too

##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StoreTableSource.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
+ * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link FileStoreSource} and kafka
+ * log source created by {@link LogSourceProvider}.
+ */
+public class StoreTableSource
+        implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    @Nullable private final LogSourceProvider logSourceProvider;
+
+    @Nullable private List<ResolvedExpression> partitionFilters;
+    @Nullable private List<ResolvedExpression> fieldFilters;
+    @Nullable private int[][] projectFields;
+
+    public StoreTableSource(
+            TableStore tableStore,
+            boolean streaming,
+            @Nullable LogSourceProvider logSourceProvider) {
+        this.tableStore = tableStore;
+        this.streaming = streaming;
+        this.logSourceProvider = logSourceProvider;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return streaming
+                ? tableStore.valueCountMode() ? ChangelogMode.all() : ChangelogMode.upsert()
+                : ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        TableStore.SourceBuilder builder =
+                tableStore
+                        .sourceBuilder()
+                        .withContinuousMode(streaming)
+                        .withHybridMode(streaming && logSourceProvider != null)
+                        .withLogSourceProvider(logSourceProvider)
+                        .withProjection(projectFields)
+                        .withPartitionPredicate(
+                                partitionFilters != null
+                                        ? partitionFilters.stream()
+                                                .map(
+                                                        filter ->

Review comment:
       Add a static method `convert(List<Expression>)` to `PredicateConverter `

##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StoreTableSource.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
+ * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link FileStoreSource} and kafka
+ * log source created by {@link LogSourceProvider}.
+ */
+public class StoreTableSource
+        implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    @Nullable private final LogSourceProvider logSourceProvider;
+
+    @Nullable private List<ResolvedExpression> partitionFilters;
+    @Nullable private List<ResolvedExpression> fieldFilters;
+    @Nullable private int[][] projectFields;
+
+    public StoreTableSource(
+            TableStore tableStore,
+            boolean streaming,
+            @Nullable LogSourceProvider logSourceProvider) {
+        this.tableStore = tableStore;
+        this.streaming = streaming;
+        this.logSourceProvider = logSourceProvider;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return streaming
+                ? tableStore.valueCountMode() ? ChangelogMode.all() : ChangelogMode.upsert()
+                : ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        TableStore.SourceBuilder builder =
+                tableStore
+                        .sourceBuilder()
+                        .withContinuousMode(streaming)
+                        .withHybridMode(streaming && logSourceProvider != null)
+                        .withLogSourceProvider(logSourceProvider)
+                        .withProjection(projectFields)
+                        .withPartitionPredicate(
+                                partitionFilters != null
+                                        ? partitionFilters.stream()
+                                                .map(
+                                                        filter ->
+                                                                filter.accept(
+                                                                        PredicateConverter
+                                                                                .CONVERTER))
+                                                .reduce(And::new)
+                                                .orElse(null)
+                                        : null)
+                        .withFieldPredicate(
+                                fieldFilters != null
+                                        ? fieldFilters.stream()
+                                                .map(
+                                                        filter ->
+                                                                filter.accept(
+                                                                        PredicateConverter
+                                                                                .CONVERTER))
+                                                .reduce(And::new)
+                                                .orElse(null)
+                                        : null);
+        return SourceProvider.of(builder.build());
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        StoreTableSource copied = new StoreTableSource(tableStore, streaming, logSourceProvider);
+        copied.partitionFilters = partitionFilters;
+        copied.fieldFilters = fieldFilters;
+        return copied;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "StoreTableSource";
+    }
+
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
+        if (tableStore.partitioned()) {
+            classifyFilters(filters);
+        } else {
+            fieldFilters = filters;
+        }
+        return Result.of(

Review comment:
       The filter is a best effort, so just return:
   `return Result.of(new ArrayList<>(filters), new ArrayList<>(filters));`
   Accept all, and remain all. Planner will do filter again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826526318



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -184,9 +286,42 @@ private static Path tablePath(Map<String, String> options, ObjectIdentifier iden
                         identifier.getObjectName()));
     }
 
-    private static boolean enableChangeTracking(Map<String, String> options) {
+    @VisibleForTesting
+    static boolean enableChangeTracking(Map<String, String> options) {
         return Boolean.parseBoolean(
                 options.getOrDefault(
                         CHANGE_TRACKING.key(), CHANGE_TRACKING.defaultValue().toString()));
     }
+
+    private static DynamicTableFactory.Context createLogStoreContext(
+            DynamicTableFactory.Context context) {
+        return new FactoryUtil.DefaultDynamicTableContext(
+                context.getObjectIdentifier(),
+                context.getCatalogTable()
+                        .copy(filterLogStoreOptions(context.getCatalogTable().getOptions())),
+                filterLogStoreOptions(context.getEnrichmentOptions()),
+                context.getConfiguration(),
+                context.getClassLoader(),
+                context.isTemporary());
+    }
+
+    private static LogStoreTableFactory createLogStoreTableFactory() {

Review comment:
       Take a look to previous `createLogStoreTableFactory`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826892306



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class TableStoreSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    private final LogOptions.LogChangelogMode logChangelogMode;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    private Map<String, String> staticPartitions = new HashMap<>();
+    private boolean overwrite;
+    @Nullable private CatalogLock.Factory lockFactory;
+
+    public TableStoreSink(
+            TableStore tableStore,
+            boolean streaming,
+            LogOptions.LogChangelogMode logChangelogMode,
+            @Nullable DynamicTableFactory.Context logStoreContext,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        this.tableStore = tableStore;
+        this.streaming = streaming;
+        this.logChangelogMode = logChangelogMode;
+        this.logStoreContext = logStoreContext;
+        this.logStoreTableFactory = logStoreTableFactory;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        if (!tableStore.valueCountMode()
+                && logChangelogMode == LogOptions.LogChangelogMode.UPSERT) {
+            ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+            for (RowKind kind : requestedMode.getContainedKinds()) {
+                if (kind != RowKind.UPDATE_BEFORE) {
+                    builder.addContainedKind(kind);
+                }
+            }
+            return builder.build();
+        }
+        return requestedMode;
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        LogSinkProvider logSinkProvider = null;
+        if (logStoreTableFactory != null) {
+            logSinkProvider =
+                    logStoreTableFactory.createSinkProvider(
+                            logStoreContext,
+                            new LogStoreTableFactory.SinkContext() {
+                                @Override
+                                public boolean isBounded() {
+                                    return !streaming;
+                                }
+
+                                @Override
+                                public <T> TypeInformation<T> createTypeInformation(
+                                        DataType consumedDataType) {
+                                    return context.createTypeInformation(consumedDataType);
+                                }
+
+                                @Override
+                                public <T> TypeInformation<T> createTypeInformation(
+                                        LogicalType consumedLogicalType) {
+                                    return context.createTypeInformation(consumedLogicalType);
+                                }
+
+                                @Override
+                                public DynamicTableSink.DataStructureConverter
+                                        createDataStructureConverter(DataType consumedDataType) {
+                                    return context.createDataStructureConverter(consumedDataType);
+                                }
+                            });
+        }
+        final LogSinkProvider finalLogSinkProvider = logSinkProvider;
+        return (DataStreamSinkProvider)
+                (providerContext, dataStream) ->
+                        tableStore
+                                .sinkBuilder()
+                                .withInput(
+                                        new DataStream<>(
+                                                dataStream.getExecutionEnvironment(),
+                                                dataStream.getTransformation()))
+                                .withLockFactory(lockFactory)
+                                .withLogSinkProvider(finalLogSinkProvider)
+                                .withOverwritePartition(staticPartitions)

Review comment:
       Empty `staticPartitions` is overwrite all... 
   Here should be `overwrite ? staticPartitions : null`
   Add a TODO in test class to add cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826894107



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class TableStoreSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    private final LogOptions.LogChangelogMode logChangelogMode;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    private Map<String, String> staticPartitions = new HashMap<>();
+    private boolean overwrite;

Review comment:
       overwrite = false;
   I know the default value is false, but it's better to write it out explicitly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r824471671



##########
File path: flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
##########
@@ -92,7 +92,6 @@ protected void doStart() {
     @Before
     public void setup() {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env);

Review comment:
       `KafkaTableTestBase` is a dedicated test base for preparing Kafka test container, it has nothing to do with a table environment, so I initialized `tEnv` in the subclass




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826524928



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -129,6 +155,85 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
         throw new UnsupportedOperationException("Not implement yet");
     }
 
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        boolean streaming =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
+        LogSourceProvider logSourceProvider = null;
+        if (enableChangeTracking(context.getCatalogTable().getOptions())) {
+            logSourceProvider =
+                    createLogStoreTableFactory()
+                            .createSourceProvider(
+                                    createLogStoreContext(context),
+                                    new LogStoreTableFactory.SourceContext() {
+                                        @Override
+                                        public <T> TypeInformation<T> createTypeInformation(
+                                                DataType producedDataType) {
+                                            return createTypeInformation(

Review comment:
       We can just wrap `Context context`.
   Use `context. createTypeInformation `.
   Ditto.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826532696



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -84,14 +109,15 @@ public void onCreateTable(Context context, boolean ignoreIfExists) {
                                 context.getObjectIdentifier().asSerializableString()));
             }
             path.getFileSystem().mkdirs(path);
+            options.put(TABLE_PATH.key(), path.getPath());

Review comment:
       I have some questions here. 
   
   1. If the users specify `table.path`, does it indicate `file.path` is useless and can be simplified as one?
   
   2. Let users specify the `table.path` means we lose control of the relative table path/table name. We cannot reconstruct the relative path/table name simply from `tableIdentifier`. If we give users this choice, do we also consider let users name their topic freely?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r826888566



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class TableStoreSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+    private final TableStore tableStore;
+    private final boolean streaming;
+    private final LogOptions.LogChangelogMode logChangelogMode;
+    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+    private Map<String, String> staticPartitions = new HashMap<>();
+    private boolean overwrite;
+    @Nullable private CatalogLock.Factory lockFactory;
+
+    public TableStoreSink(
+            TableStore tableStore,
+            boolean streaming,
+            LogOptions.LogChangelogMode logChangelogMode,
+            @Nullable DynamicTableFactory.Context logStoreContext,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        this.tableStore = tableStore;
+        this.streaming = streaming;
+        this.logChangelogMode = logChangelogMode;
+        this.logStoreContext = logStoreContext;
+        this.logStoreTableFactory = logStoreTableFactory;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        if (!tableStore.valueCountMode()
+                && logChangelogMode == LogOptions.LogChangelogMode.UPSERT) {
+            ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+            for (RowKind kind : requestedMode.getContainedKinds()) {
+                if (kind != RowKind.UPDATE_BEFORE) {
+                    builder.addContainedKind(kind);
+                }
+            }
+            return builder.build();
+        }
+        return requestedMode;
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        LogSinkProvider logSinkProvider = null;
+        if (logStoreTableFactory != null) {
+            logSinkProvider =
+                    logStoreTableFactory.createSinkProvider(
+                            logStoreContext,
+                            new LogStoreTableFactory.SinkContext() {
+                                @Override
+                                public boolean isBounded() {
+                                    return !streaming;

Review comment:
       context.isBounded




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #41: [FLINK-26535] Introduce StoreTableSource And StoreTableSink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #41:
URL: https://github.com/apache/flink-table-store/pull/41#discussion_r824463019



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StoreTableContext.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.utils.TableStoreUtils;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.table.store.connector.utils.TableStoreUtils.filterFileStoreOptions;
+import static org.apache.flink.table.store.connector.utils.TableStoreUtils.tablePath;
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_PATH;
+
+/**
+ * Table context to help create {@link
+ * org.apache.flink.table.store.connector.source.StoreTableSource} and {@link
+ * org.apache.flink.table.store.connector.sink.StoreTableSink}.
+ */
+public class StoreTableContext {
+
+    private final DynamicTableFactory.Context context;
+    private final boolean batchMode;
+    private final boolean enableChangeTracking;
+    private final ObjectIdentifier tableIdentifier;
+    private final int numBucket;
+
+    private RowType rowType;
+    private List<String> partitionKeys;
+    private RowType partitionType;
+    private int[] partitionIndex;
+    private int[] primaryKeyIndex;
+    private FileStore fileStore;
+
+    public StoreTableContext(DynamicTableFactory.Context context) {
+        this.context = context;
+        batchMode =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.BATCH;
+        enableChangeTracking =
+                TableStoreUtils.enableChangeTracking(context.getCatalogTable().getOptions());
+        if (batchMode && enableChangeTracking) {
+            throw new TableException("Change tracking is not supported under batch mode.");
+        }
+        tableIdentifier = context.getObjectIdentifier();
+        numBucket =
+                Integer.parseInt(
+                        context.getCatalogTable()
+                                .getOptions()
+                                .getOrDefault(BUCKET.key(), BUCKET.defaultValue().toString()));
+        initialize();
+    }
+
+    public DynamicTableFactory.Context getContext() {
+        return context;
+    }
+
+    public boolean batchMode() {
+        return batchMode;
+    }
+
+    public boolean enableChangeTracking() {
+        return enableChangeTracking;
+    }
+
+    public ObjectIdentifier tableIdentifier() {
+        return tableIdentifier;
+    }
+
+    public FileStore fileStore() {
+        return fileStore;
+    }
+
+    public int[] partitionIndex() {
+        return partitionIndex;
+    }
+
+    public List<String> getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    public int[] primaryKeyIndex() {
+        return primaryKeyIndex;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public RowType getPartitionType() {
+        return partitionType;
+    }
+
+    public ChangelogMode getChangelogMode() {
+        return batchMode || primaryKeyIndex.length > 0
+                ? ChangelogMode.insertOnly()
+                : ChangelogMode.all();
+    }
+
+    public int numBucket() {
+        return numBucket;
+    }
+
+    // ~ Tools ------------------------------------------------------------------
+
+    private void initialize() {
+        rowType =
+                (RowType)
+                        context.getCatalogTable()
+                                .getResolvedSchema()
+                                .toPhysicalRowDataType()
+                                .getLogicalType();
+
+        partitionKeys = context.getCatalogTable().getPartitionKeys();
+        RowType.RowField[] fields = new RowType.RowField[partitionKeys.size()];
+        partitionIndex = new int[partitionKeys.size()];
+        List<String> fieldNames = rowType.getFieldNames();
+        for (int i = 0; i < partitionKeys.size(); i++) {
+            int position = fieldNames.indexOf(partitionKeys.get(i));
+            partitionIndex[i] = position;
+            fields[i] = rowType.getFields().get(position);
+        }
+        partitionType = new RowType(Arrays.asList(fields));
+
+        primaryKeyIndex = context.getCatalogTable().getResolvedSchema().getPrimaryKeyIndexes();
+        RowType keyType = primaryKeyIndex.length > 0 ? getType(rowType, primaryKeyIndex) : rowType;
+        RowType valueType =
+                primaryKeyIndex.length > 0
+                        ? rowType
+                        : new RowType(
+                                Collections.singletonList(
+                                        new RowType.RowField("COUNT", new BigIntType(false))));
+
+        Accumulator accumulator =
+                primaryKeyIndex.length > 0
+                        ? new DeduplicateAccumulator()
+                        : new ValueCountAccumulator();
+        Map<String, String> options = context.getCatalogTable().getOptions();
+        options.put(TABLE_PATH.key(), tablePath(options, context.getObjectIdentifier()).getPath());

Review comment:
       `TABLE_PATH` should be enriched in `TableFactory.enrich`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org