You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/28 07:51:21 UTC

[flink-table-store] branch master updated: [FLINK-30223] Refactor Lock to provide Lock.Factory

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 6886303b [FLINK-30223] Refactor Lock to provide Lock.Factory
6886303b is described below

commit 6886303b2f482f2f31c7b98221691a650c1e67d3
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Nov 28 15:51:17 2022 +0800

    [FLINK-30223] Refactor Lock to provide Lock.Factory
    
    This closes #405.
---
 .../store/connector/sink/FlinkSinkBuilder.java     | 14 ++---
 .../table/store/connector/sink/StoreSink.java      | 16 ++----
 .../table/store/connector/sink/TableStoreSink.java |  6 ++-
 .../table/store/connector/FileStoreITCase.java     | 16 +++---
 .../flink/table/store/file/operation/Lock.java     | 61 +++++++++++++++++++---
 .../apache/flink/table/store/hive/HiveCatalog.java |  2 +-
 6 files changed, 76 insertions(+), 39 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 2e77a669..858eb847 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -23,10 +23,9 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
-import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.LogSinkFunction;
@@ -38,18 +37,16 @@ import java.util.Map;
 /** Sink builder to build a flink sink from input. */
 public class FlinkSinkBuilder {
 
-    private final ObjectIdentifier tableIdentifier;
     private final FileStoreTable table;
     private final Configuration conf;
 
     private DataStream<RowData> input;
-    @Nullable private CatalogLock.Factory lockFactory;
+    private Lock.Factory lockFactory = Lock.emptyFactory();
     @Nullable private Map<String, String> overwritePartition;
     @Nullable private LogSinkFunction logSinkFunction;
     @Nullable private Integer parallelism;
 
-    public FlinkSinkBuilder(ObjectIdentifier tableIdentifier, FileStoreTable table) {
-        this.tableIdentifier = tableIdentifier;
+    public FlinkSinkBuilder(FileStoreTable table) {
         this.table = table;
         this.conf = Configuration.fromMap(table.schema().options());
     }
@@ -59,7 +56,7 @@ public class FlinkSinkBuilder {
         return this;
     }
 
-    public FlinkSinkBuilder withLockFactory(CatalogLock.Factory lockFactory) {
+    public FlinkSinkBuilder withLockFactory(Lock.Factory lockFactory) {
         this.lockFactory = lockFactory;
         return this;
     }
@@ -100,11 +97,10 @@ public class FlinkSinkBuilder {
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
         StoreSink sink =
                 new StoreSink(
-                        tableIdentifier,
                         table,
+                        lockFactory,
                         conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED),
                         getCompactPartSpec(),
-                        lockFactory,
                         overwritePartition,
                         logSinkFunction);
         return sink.sinkFrom(new DataStream<>(env, partitioned));
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 9ef68a19..b556011b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -30,11 +30,9 @@ import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
-import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.table.FileStoreTable;
@@ -56,29 +54,24 @@ public class StoreSink implements Serializable {
 
     private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
 
-    private final ObjectIdentifier tableIdentifier;
     private final FileStoreTable table;
-
+    private final Lock.Factory lockFactory;
     private final boolean compactionTask;
     @Nullable private final Map<String, String> compactPartitionSpec;
-    @Nullable private final CatalogLock.Factory lockFactory;
     @Nullable private final Map<String, String> overwritePartition;
     @Nullable private final LogSinkFunction logSinkFunction;
 
     public StoreSink(
-            ObjectIdentifier tableIdentifier,
             FileStoreTable table,
+            Lock.Factory lockFactory,
             boolean compactionTask,
             @Nullable Map<String, String> compactPartitionSpec,
-            @Nullable CatalogLock.Factory lockFactory,
             @Nullable Map<String, String> overwritePartition,
             @Nullable LogSinkFunction logSinkFunction) {
-        this.tableIdentifier = tableIdentifier;
         this.table = table;
-
+        this.lockFactory = lockFactory;
         this.compactionTask = compactionTask;
         this.compactPartitionSpec = compactPartitionSpec;
-        this.lockFactory = lockFactory;
         this.overwritePartition = overwritePartition;
         this.logSinkFunction = logSinkFunction;
     }
@@ -103,12 +96,11 @@ public class StoreSink implements Serializable {
     }
 
     private StoreCommitter createCommitter(String user, boolean createEmptyCommit) {
-        Lock lock = Lock.fromCatalog(lockFactory, tableIdentifier.toObjectPath());
         return new StoreCommitter(
                 table.newCommit(user)
                         .withOverwritePartition(overwritePartition)
                         .withCreateEmptyCommit(createEmptyCommit)
-                        .withLock(lock));
+                        .withLock(lockFactory.create()));
     }
 
     public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 484fa82f..dfb9986b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
@@ -122,12 +123,13 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
                         : (logSinkProvider == null ? null : logSinkProvider.createSink());
         return new TableStoreDataStreamSinkProvider(
                 (dataStream) ->
-                        new FlinkSinkBuilder(tableIdentifier, table)
+                        new FlinkSinkBuilder(table)
                                 .withInput(
                                         new DataStream<>(
                                                 dataStream.getExecutionEnvironment(),
                                                 dataStream.getTransformation()))
-                                .withLockFactory(lockFactory)
+                                .withLockFactory(
+                                        Lock.factory(lockFactory, tableIdentifier.toObjectPath()))
                                 .withLogSinkFunction(logSinkFunction)
                                 .withOverwritePartition(overwrite ? staticPartitions : null)
                                 .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 180fef10..5b749d74 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -132,7 +132,7 @@ public class FileStoreITCase extends AbstractTestBase {
         FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2});
 
         // write
-        new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
+        new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
         env.execute();
 
         // read
@@ -152,7 +152,7 @@ public class FileStoreITCase extends AbstractTestBase {
         FileStoreTable table = buildFileStoreTable(new int[0], new int[] {2});
 
         // write
-        new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
+        new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
         env.execute();
 
         // read
@@ -171,7 +171,7 @@ public class FileStoreITCase extends AbstractTestBase {
         FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2});
 
         // write
-        new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
+        new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
         env.execute();
 
         // overwrite p2
@@ -182,7 +182,7 @@ public class FileStoreITCase extends AbstractTestBase {
                         InternalTypeInfo.of(TABLE_TYPE));
         Map<String, String> overwrite = new HashMap<>();
         overwrite.put("p", "p2");
-        new FlinkSinkBuilder(IDENTIFIER, table)
+        new FlinkSinkBuilder(table)
                 .withInput(partialData)
                 .withOverwritePartition(overwrite)
                 .build();
@@ -201,7 +201,7 @@ public class FileStoreITCase extends AbstractTestBase {
                         Collections.singletonList(
                                 wrap(GenericRowData.of(19, StringData.fromString("p2"), 6))),
                         InternalTypeInfo.of(TABLE_TYPE));
-        new FlinkSinkBuilder(IDENTIFIER, table)
+        new FlinkSinkBuilder(table)
                 .withInput(partialData)
                 .withOverwritePartition(new HashMap<>())
                 .build();
@@ -218,7 +218,7 @@ public class FileStoreITCase extends AbstractTestBase {
         FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[0]);
 
         // write
-        new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
+        new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
         env.execute();
 
         // read
@@ -247,7 +247,7 @@ public class FileStoreITCase extends AbstractTestBase {
 
     private void testProjection(FileStoreTable table) throws Exception {
         // write
-        new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
+        new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
         env.execute();
 
         // read
@@ -334,7 +334,7 @@ public class FileStoreITCase extends AbstractTestBase {
         }
         DataStreamSource<RowData> source =
                 env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE));
-        new FlinkSinkBuilder(IDENTIFIER, table).withInput(source).build();
+        new FlinkSinkBuilder(table).withInput(source).build();
         env.execute();
         assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected);
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
index 9f28769c..ca3ab6cc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.file.catalog.CatalogLock;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.concurrent.Callable;
 
 /** An interface that allows file store to use global lock to some transaction-related things. */
@@ -31,19 +32,65 @@ public interface Lock extends AutoCloseable {
     /** Run with lock. */
     <T> T runWithLock(Callable<T> callable) throws Exception;
 
-    @Nullable
-    static Lock fromCatalog(CatalogLock.Factory lockFactory, ObjectPath tablePath) {
-        if (lockFactory == null) {
-            return null;
+    /** A factory to create {@link Lock}. */
+    interface Factory extends Serializable {
+        Lock create();
+    }
+
+    static Factory factory(@Nullable CatalogLock.Factory lockFactory, ObjectPath tablePath) {
+        return lockFactory == null
+                ? new EmptyFactory()
+                : new CatalogLockFactory(lockFactory, tablePath);
+    }
+
+    static Factory emptyFactory() {
+        return new EmptyFactory();
+    }
+
+    /** A {@link Factory} creating lock from catalog. */
+    class CatalogLockFactory implements Factory {
+
+        private static final long serialVersionUID = 1L;
+
+        private final CatalogLock.Factory lockFactory;
+        private final ObjectPath tablePath;
+
+        public CatalogLockFactory(CatalogLock.Factory lockFactory, ObjectPath tablePath) {
+            this.lockFactory = lockFactory;
+            this.tablePath = tablePath;
+        }
+
+        @Override
+        public Lock create() {
+            return fromCatalog(lockFactory.create(), tablePath);
         }
+    }
+
+    /** A {@link Factory} creating empty lock. */
+    class EmptyFactory implements Factory {
+
+        private static final long serialVersionUID = 1L;
 
-        return fromCatalog(lockFactory.create(), tablePath);
+        @Override
+        public Lock create() {
+            return new EmptyLock();
+        }
+    }
+
+    /** An empty lock. */
+    class EmptyLock implements Lock {
+        @Override
+        public <T> T runWithLock(Callable<T> callable) throws Exception {
+            return callable.call();
+        }
+
+        @Override
+        public void close() {}
     }
 
-    @Nullable
     static Lock fromCatalog(CatalogLock lock, ObjectPath tablePath) {
         if (lock == null) {
-            return null;
+            return new EmptyLock();
         }
         return new CatalogLockImpl(lock, tablePath);
     }
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 59e66f00..7c8f1b23 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -376,7 +376,7 @@ public class HiveCatalog extends AbstractCatalog {
 
     private Lock lock(ObjectPath tablePath) {
         if (!lockEnabled()) {
-            return null;
+            return new Lock.EmptyLock();
         }
 
         HiveCatalogLock lock =