You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/28 07:51:21 UTC
[flink-table-store] branch master updated: [FLINK-30223] Refactor Lock to provide Lock.Factory
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 6886303b [FLINK-30223] Refactor Lock to provide Lock.Factory
6886303b is described below
commit 6886303b2f482f2f31c7b98221691a650c1e67d3
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Nov 28 15:51:17 2022 +0800
[FLINK-30223] Refactor Lock to provide Lock.Factory
This closes #405.
---
.../store/connector/sink/FlinkSinkBuilder.java | 14 ++---
.../table/store/connector/sink/StoreSink.java | 16 ++----
.../table/store/connector/sink/TableStoreSink.java | 6 ++-
.../table/store/connector/FileStoreITCase.java | 16 +++---
.../flink/table/store/file/operation/Lock.java | 61 +++++++++++++++++++---
.../apache/flink/table/store/hive/HiveCatalog.java | 2 +-
6 files changed, 76 insertions(+), 39 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 2e77a669..858eb847 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -23,10 +23,9 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
-import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
@@ -38,18 +37,16 @@ import java.util.Map;
/** Sink builder to build a flink sink from input. */
public class FlinkSinkBuilder {
- private final ObjectIdentifier tableIdentifier;
private final FileStoreTable table;
private final Configuration conf;
private DataStream<RowData> input;
- @Nullable private CatalogLock.Factory lockFactory;
+ private Lock.Factory lockFactory = Lock.emptyFactory();
@Nullable private Map<String, String> overwritePartition;
@Nullable private LogSinkFunction logSinkFunction;
@Nullable private Integer parallelism;
- public FlinkSinkBuilder(ObjectIdentifier tableIdentifier, FileStoreTable table) {
- this.tableIdentifier = tableIdentifier;
+ public FlinkSinkBuilder(FileStoreTable table) {
this.table = table;
this.conf = Configuration.fromMap(table.schema().options());
}
@@ -59,7 +56,7 @@ public class FlinkSinkBuilder {
return this;
}
- public FlinkSinkBuilder withLockFactory(CatalogLock.Factory lockFactory) {
+ public FlinkSinkBuilder withLockFactory(Lock.Factory lockFactory) {
this.lockFactory = lockFactory;
return this;
}
@@ -100,11 +97,10 @@ public class FlinkSinkBuilder {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
StoreSink sink =
new StoreSink(
- tableIdentifier,
table,
+ lockFactory,
conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED),
getCompactPartSpec(),
- lockFactory,
overwritePartition,
logSinkFunction);
return sink.sinkFrom(new DataStream<>(env, partitioned));
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 9ef68a19..b556011b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -30,11 +30,9 @@ import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
-import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTable;
@@ -56,29 +54,24 @@ public class StoreSink implements Serializable {
private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
- private final ObjectIdentifier tableIdentifier;
private final FileStoreTable table;
-
+ private final Lock.Factory lockFactory;
private final boolean compactionTask;
@Nullable private final Map<String, String> compactPartitionSpec;
- @Nullable private final CatalogLock.Factory lockFactory;
@Nullable private final Map<String, String> overwritePartition;
@Nullable private final LogSinkFunction logSinkFunction;
public StoreSink(
- ObjectIdentifier tableIdentifier,
FileStoreTable table,
+ Lock.Factory lockFactory,
boolean compactionTask,
@Nullable Map<String, String> compactPartitionSpec,
- @Nullable CatalogLock.Factory lockFactory,
@Nullable Map<String, String> overwritePartition,
@Nullable LogSinkFunction logSinkFunction) {
- this.tableIdentifier = tableIdentifier;
this.table = table;
-
+ this.lockFactory = lockFactory;
this.compactionTask = compactionTask;
this.compactPartitionSpec = compactPartitionSpec;
- this.lockFactory = lockFactory;
this.overwritePartition = overwritePartition;
this.logSinkFunction = logSinkFunction;
}
@@ -103,12 +96,11 @@ public class StoreSink implements Serializable {
}
private StoreCommitter createCommitter(String user, boolean createEmptyCommit) {
- Lock lock = Lock.fromCatalog(lockFactory, tableIdentifier.toObjectPath());
return new StoreCommitter(
table.newCommit(user)
.withOverwritePartition(overwritePartition)
.withCreateEmptyCommit(createEmptyCommit)
- .withLock(lock));
+ .withLock(lockFactory.create()));
}
public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 484fa82f..dfb9986b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
@@ -122,12 +123,13 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
: (logSinkProvider == null ? null : logSinkProvider.createSink());
return new TableStoreDataStreamSinkProvider(
(dataStream) ->
- new FlinkSinkBuilder(tableIdentifier, table)
+ new FlinkSinkBuilder(table)
.withInput(
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
- .withLockFactory(lockFactory)
+ .withLockFactory(
+ Lock.factory(lockFactory, tableIdentifier.toObjectPath()))
.withLogSinkFunction(logSinkFunction)
.withOverwritePartition(overwrite ? staticPartitions : null)
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 180fef10..5b749d74 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -132,7 +132,7 @@ public class FileStoreITCase extends AbstractTestBase {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2});
// write
- new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
+ new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
env.execute();
// read
@@ -152,7 +152,7 @@ public class FileStoreITCase extends AbstractTestBase {
FileStoreTable table = buildFileStoreTable(new int[0], new int[] {2});
// write
- new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
+ new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
env.execute();
// read
@@ -171,7 +171,7 @@ public class FileStoreITCase extends AbstractTestBase {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2});
// write
- new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
+ new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
env.execute();
// overwrite p2
@@ -182,7 +182,7 @@ public class FileStoreITCase extends AbstractTestBase {
InternalTypeInfo.of(TABLE_TYPE));
Map<String, String> overwrite = new HashMap<>();
overwrite.put("p", "p2");
- new FlinkSinkBuilder(IDENTIFIER, table)
+ new FlinkSinkBuilder(table)
.withInput(partialData)
.withOverwritePartition(overwrite)
.build();
@@ -201,7 +201,7 @@ public class FileStoreITCase extends AbstractTestBase {
Collections.singletonList(
wrap(GenericRowData.of(19, StringData.fromString("p2"), 6))),
InternalTypeInfo.of(TABLE_TYPE));
- new FlinkSinkBuilder(IDENTIFIER, table)
+ new FlinkSinkBuilder(table)
.withInput(partialData)
.withOverwritePartition(new HashMap<>())
.build();
@@ -218,7 +218,7 @@ public class FileStoreITCase extends AbstractTestBase {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[0]);
// write
- new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
+ new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
env.execute();
// read
@@ -247,7 +247,7 @@ public class FileStoreITCase extends AbstractTestBase {
private void testProjection(FileStoreTable table) throws Exception {
// write
- new FlinkSinkBuilder(IDENTIFIER, table).withInput(buildTestSource(env, isBatch)).build();
+ new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build();
env.execute();
// read
@@ -334,7 +334,7 @@ public class FileStoreITCase extends AbstractTestBase {
}
DataStreamSource<RowData> source =
env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE));
- new FlinkSinkBuilder(IDENTIFIER, table).withInput(source).build();
+ new FlinkSinkBuilder(table).withInput(source).build();
env.execute();
assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
index 9f28769c..ca3ab6cc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.file.catalog.CatalogLock;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.util.concurrent.Callable;
/** An interface that allows file store to use global lock to some transaction-related things. */
@@ -31,19 +32,65 @@ public interface Lock extends AutoCloseable {
/** Run with lock. */
<T> T runWithLock(Callable<T> callable) throws Exception;
- @Nullable
- static Lock fromCatalog(CatalogLock.Factory lockFactory, ObjectPath tablePath) {
- if (lockFactory == null) {
- return null;
+ /** A factory to create {@link Lock}. */
+ interface Factory extends Serializable {
+ Lock create();
+ }
+
+ static Factory factory(@Nullable CatalogLock.Factory lockFactory, ObjectPath tablePath) {
+ return lockFactory == null
+ ? new EmptyFactory()
+ : new CatalogLockFactory(lockFactory, tablePath);
+ }
+
+ static Factory emptyFactory() {
+ return new EmptyFactory();
+ }
+
+ /** A {@link Factory} creating lock from catalog. */
+ class CatalogLockFactory implements Factory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final CatalogLock.Factory lockFactory;
+ private final ObjectPath tablePath;
+
+ public CatalogLockFactory(CatalogLock.Factory lockFactory, ObjectPath tablePath) {
+ this.lockFactory = lockFactory;
+ this.tablePath = tablePath;
+ }
+
+ @Override
+ public Lock create() {
+ return fromCatalog(lockFactory.create(), tablePath);
}
+ }
+
+ /** A {@link Factory} creating empty lock. */
+ class EmptyFactory implements Factory {
+
+ private static final long serialVersionUID = 1L;
- return fromCatalog(lockFactory.create(), tablePath);
+ @Override
+ public Lock create() {
+ return new EmptyLock();
+ }
+ }
+
+ /** An empty lock. */
+ class EmptyLock implements Lock {
+ @Override
+ public <T> T runWithLock(Callable<T> callable) throws Exception {
+ return callable.call();
+ }
+
+ @Override
+ public void close() {}
}
- @Nullable
static Lock fromCatalog(CatalogLock lock, ObjectPath tablePath) {
if (lock == null) {
- return null;
+ return new EmptyLock();
}
return new CatalogLockImpl(lock, tablePath);
}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 59e66f00..7c8f1b23 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -376,7 +376,7 @@ public class HiveCatalog extends AbstractCatalog {
private Lock lock(ObjectPath tablePath) {
if (!lockEnabled()) {
- return null;
+ return new Lock.EmptyLock();
}
HiveCatalogLock lock =