You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/01 01:15:41 UTC
[flink-table-store] branch master updated: [FLINK-28299] Get rid of Sink v2
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new e5a37fd0 [FLINK-28299] Get rid of Sink v2
e5a37fd0 is described below
commit e5a37fd07aa7996b037f88e48b1a19ba5b6ab89d
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jul 1 09:15:36 2022 +0800
[FLINK-28299] Get rid of Sink v2
This closes #183
---
.../CatalogLock.java} | 29 +-
.../table/store/connector/sink/Committable.java | 8 +-
.../connector/sink/CommittableSerializer.java | 14 +-
.../store/connector/sink/CommittableTypeInfo.java | 95 ++++++
.../GlobalCommitter.java => Committer.java} | 27 +-
...mmitterOperator.java => CommitterOperator.java} | 78 +++--
.../store/connector/sink/FlinkSinkBuilder.java | 19 +-
.../connector/sink/PrepareCommitOperator.java | 63 ++++
...oreGlobalCommitter.java => StoreCommitter.java} | 14 +-
.../store/connector/sink/StoreCompactOperator.java | 66 ++++
.../store/connector/sink/StoreLocalCommitter.java | 69 -----
.../table/store/connector/sink/StoreSink.java | 175 +++--------
.../store/connector/sink/StoreSinkCompactor.java | 69 -----
.../store/connector/sink/StoreSinkWriter.java | 152 ----------
.../store/connector/sink/StoreWriteOperator.java | 233 ++++++++++++++
.../table/store/connector/sink/TableStoreSink.java | 44 +--
.../sink/global/GlobalCommitterOperator.java | 76 -----
.../sink/global/GlobalCommittingSink.java | 46 ---
.../global/GlobalCommittingSinkTranslator.java | 78 -----
.../sink/global/LocalCommitterOperator.java | 205 -------------
.../store/connector/source/TableStoreSource.java | 25 +-
.../connector/sink/CommittableSerializerTest.java | 21 +-
.../store/connector/sink/LogStoreSinkITCase.java | 9 +-
.../table/store/connector/sink/StoreSinkTest.java | 335 ---------------------
.../table/store/connector/sink/TestFileStore.java | 278 -----------------
.../store/connector/sink/TestFileStoreTable.java | 110 -------
.../sink/global/GlobalCommitterOperatorTest.java | 278 -----------------
.../sink/global/LocalCommitterOperatorTest.java | 255 ----------------
.../flink/table/store/log/LogInitContext.java | 87 ------
.../flink/table/store/log/LogSinkProvider.java | 28 +-
.../table/store/log/LogStoreTableFactory.java | 12 +-
.../flink/table/store/log/LogWriteCallback.java | 2 +-
.../table/store/table/FileStoreTableFactory.java | 8 +
.../sink/LogSinkFunction.java} | 21 +-
.../store/kafka/KafkaLogSerializationSchema.java | 10 +-
.../table/store/kafka/KafkaLogSinkProvider.java | 32 +-
.../table/store/kafka/KafkaLogStoreFactory.java | 20 +-
.../flink/table/store/kafka/KafkaSinkFunction.java | 80 +++++
.../flink/table/store/kafka/KafkaLogITCase.java | 28 +-
.../store/kafka/KafkaLogSerializationTest.java | 4 +-
.../flink/table/store/kafka/KafkaLogTestUtils.java | 8 +-
.../table/store/kafka/TestOffsetsLogSink.java | 105 -------
42 files changed, 738 insertions(+), 2578 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java
similarity index 54%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java
index f4266d82..08b5b520 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java
@@ -16,18 +16,25 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.connector;
+package org.apache.flink.table.store.connector.sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.connector.sink.Committable;
+import org.apache.flink.annotation.Internal;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.concurrent.Callable;
/**
- * The base interface for file store sink writers.
- *
- * @param <WriterStateT> The type of the writer's state.
+ * An interface that allows source and sink to use global lock to some transaction-related things.
*/
-public interface StatefulPrecommittingSinkWriter<WriterStateT>
- extends StatefulSink.StatefulSinkWriter<RowData, WriterStateT>,
- TwoPhaseCommittingSink.PrecommittingSinkWriter<RowData, Committable> {}
+@Internal
+public interface CatalogLock extends Closeable {
+
+ /** Run with catalog lock. The caller should tell catalog the database and table name. */
+ <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception;
+
+ /** Factory to create {@link CatalogLock}. */
+ interface Factory extends Serializable {
+ CatalogLock create();
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
index cea1b359..158287a7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.connector.sink;
-/** Committable produced by {@link StoreSinkWriter}. */
+/** Committable produced by {@link PrepareCommitOperator}. */
public class Committable {
private final Kind kind;
@@ -41,9 +41,7 @@ public class Committable {
enum Kind {
FILE((byte) 0),
- LOG((byte) 1),
-
- LOG_OFFSET((byte) 2);
+ LOG_OFFSET((byte) 1);
private final byte value;
@@ -60,8 +58,6 @@ public class Committable {
case 0:
return FILE;
case 1:
- return LOG;
- case 2:
return LOG_OFFSET;
default:
throw new UnsupportedOperationException(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
index 9f6dc1d1..b5e2ed77 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
@@ -29,13 +29,8 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
private final FileCommittableSerializer fileCommittableSerializer;
- private final SimpleVersionedSerializer<Object> logCommittableSerializer;
-
- public CommittableSerializer(
- FileCommittableSerializer fileCommittableSerializer,
- SimpleVersionedSerializer<Object> logCommittableSerializer) {
+ public CommittableSerializer(FileCommittableSerializer fileCommittableSerializer) {
this.fileCommittableSerializer = fileCommittableSerializer;
- this.logCommittableSerializer = logCommittableSerializer;
}
@Override
@@ -54,10 +49,6 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
fileCommittableSerializer.serialize(
(FileCommittable) committable.wrappedCommittable());
break;
- case LOG:
- version = logCommittableSerializer.getVersion();
- wrapped = logCommittableSerializer.serialize(committable.wrappedCommittable());
- break;
case LOG_OFFSET:
version = 1;
wrapped = ((LogOffsetCommittable) committable.wrappedCommittable()).toBytes();
@@ -86,9 +77,6 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
case FILE:
wrappedCommittable = fileCommittableSerializer.deserialize(version, wrapped);
break;
- case LOG:
- wrappedCommittable = logCommittableSerializer.deserialize(version, wrapped);
- break;
case LOG_OFFSET:
wrappedCommittable = LogOffsetCommittable.fromBytes(wrapped);
break;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableTypeInfo.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableTypeInfo.java
new file mode 100644
index 00000000..dcc2cd67
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableTypeInfo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+
+/** Type information of {@link Committable}. */
+public class CommittableTypeInfo extends TypeInformation<Committable> {
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ public Class<Committable> getTypeClass() {
+ return Committable.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<Committable> createSerializer(ExecutionConfig config) {
+ // no copy, so that data from writer is directly going into committer while chaining
+ return new SimpleVersionedSerializerTypeSerializerProxy<Committable>(
+ () -> new CommittableSerializer(new FileCommittableSerializer())) {
+ @Override
+ public Committable copy(Committable from) {
+ return from;
+ }
+
+ @Override
+ public Committable copy(Committable from, Committable reuse) {
+ return from;
+ }
+ };
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof CommittableTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof CommittableTypeInfo;
+ }
+
+ @Override
+ public String toString() {
+ return "CommittableTypeInfo";
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committer.java
similarity index 56%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committer.java
index 90eee7be..b7d97d55 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committer.java
@@ -17,29 +17,30 @@
*
*/
-package org.apache.flink.table.store.connector.sink.global;
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import java.io.IOException;
import java.util.List;
/**
- * The {@code GlobalCommitter} is responsible for creating and committing an aggregated committable,
- * which we call global committable (see {@link #combine}).
- *
- * <p>The {@code GlobalCommitter} runs with parallelism equal to 1.
+ * The {@code Committer} is responsible for creating and committing an aggregated committable, which
+ * we call committable (see {@link #combine}).
*
- * @param <CommT> The type of information needed to commit data staged by the sink
- * @param <GlobalCommT> The type of the aggregated committable
+ * <p>The {@code Committer} runs with parallelism equal to 1.
*/
-public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {
+public interface Committer extends AutoCloseable {
/** Find out which global committables need to be retried when recovering from the failure. */
- List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables)
- throws IOException;
+ List<ManifestCommittable> filterRecoveredCommittables(
+ List<ManifestCommittable> globalCommittables) throws IOException;
/** Compute an aggregated committable from a list of committables. */
- GlobalCommT combine(long checkpointId, List<CommT> committables) throws IOException;
+ ManifestCommittable combine(long checkpointId, List<Committable> committables)
+ throws IOException;
- /** Commits the given {@link GlobalCommT}. */
- void commit(List<GlobalCommT> globalCommittables) throws IOException, InterruptedException;
+ /** Commits the given {@link ManifestCommittable}. */
+ void commit(List<ManifestCommittable> globalCommittables)
+ throws IOException, InterruptedException;
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
similarity index 68%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index fad3e8df..fa7f60a7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -15,23 +15,21 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.connector.sink.global;
+package org.apache.flink.table.store.connector.sink;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.util.function.SerializableSupplier;
import java.util.ArrayDeque;
@@ -41,16 +39,16 @@ import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
-/** An operator that processes committables of a {@link Sink}. */
-public abstract class AbstractCommitterOperator<IN, CommT>
- extends AbstractStreamOperator<CommittableMessage<IN>>
- implements OneInputStreamOperator<CommittableMessage<IN>, CommittableMessage<IN>>,
- BoundedOneInput {
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Committer operator to commit {@link Committable}. */
+public class CommitterOperator extends AbstractStreamOperator<Committable>
+ implements OneInputStreamOperator<Committable, Committable>, BoundedOneInput {
private static final long serialVersionUID = 1L;
/** Record all the inputs until commit. */
- private final Deque<IN> inputs = new ArrayDeque<>();
+ private final Deque<Committable> inputs = new ArrayDeque<>();
/** The operator's state descriptor. */
private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
@@ -58,43 +56,64 @@ public abstract class AbstractCommitterOperator<IN, CommT>
"streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
/** Group the committable by the checkpoint id. */
- private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+ private final NavigableMap<Long, ManifestCommittable> committablesPerCheckpoint;
/** The committable's serializer. */
- private final SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer;
+ private final SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
+ committableSerializer;
/** The operator's state. */
- private ListState<CommT> streamingCommitterState;
+ private ListState<ManifestCommittable> streamingCommitterState;
+
+ private final SerializableSupplier<Committer> committerFactory;
- public AbstractCommitterOperator(
- SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer) {
+ /**
+ * Aggregate committables to global committables and commit the global committables to the
+ * external system.
+ */
+ private Committer committer;
+
+ public CommitterOperator(
+ SerializableSupplier<Committer> committerFactory,
+ SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
+ committableSerializer) {
this.committableSerializer = committableSerializer;
this.committablesPerCheckpoint = new TreeMap<>();
+ this.committerFactory = checkNotNull(committerFactory);
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
+ committer = committerFactory.get();
streamingCommitterState =
new SimpleVersionedListState<>(
context.getOperatorStateStore()
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
committableSerializer.get());
- List<CommT> restored = new ArrayList<>();
+ List<ManifestCommittable> restored = new ArrayList<>();
streamingCommitterState.get().forEach(restored::add);
streamingCommitterState.clear();
commit(true, restored);
}
- public abstract void commit(boolean isRecover, List<CommT> committables) throws Exception;
+ public void commit(boolean isRecover, List<ManifestCommittable> committables) throws Exception {
+ if (isRecover) {
+ committables = committer.filterRecoveredCommittables(committables);
+ }
+ committer.commit(committables);
+ }
- public abstract List<CommT> toCommittables(long checkpoint, List<IN> inputs) throws Exception;
+ public ManifestCommittable toCommittables(long checkpoint, List<Committable> inputs)
+ throws Exception {
+ return committer.combine(checkpoint, inputs);
+ }
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
- List<IN> poll = pollInputs();
+ List<Committable> poll = pollInputs();
if (poll.size() > 0) {
committablesPerCheckpoint.put(
context.getCheckpointId(), toCommittables(context.getCheckpointId(), poll));
@@ -102,10 +121,8 @@ public abstract class AbstractCommitterOperator<IN, CommT>
streamingCommitterState.update(committables(committablesPerCheckpoint));
}
- private List<CommT> committables(NavigableMap<Long, List<CommT>> map) {
- List<CommT> committables = new ArrayList<>();
- map.values().forEach(committables::addAll);
- return committables;
+ private List<ManifestCommittable> committables(NavigableMap<Long, ManifestCommittable> map) {
+ return new ArrayList<>(map.values());
}
@Override
@@ -118,7 +135,7 @@ public abstract class AbstractCommitterOperator<IN, CommT>
// 5. this.notifyCheckpointComplete(5)
// So we should submit all the data in the endInput in order to avoid disordered commits.
long checkpointId = Long.MAX_VALUE;
- List<IN> poll = pollInputs();
+ List<Committable> poll = pollInputs();
if (!poll.isEmpty()) {
committablesPerCheckpoint.put(checkpointId, toCommittables(checkpointId, poll));
}
@@ -132,19 +149,16 @@ public abstract class AbstractCommitterOperator<IN, CommT>
}
private void commitUpToCheckpoint(long checkpointId) throws Exception {
- NavigableMap<Long, List<CommT>> headMap =
+ NavigableMap<Long, ManifestCommittable> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
commit(false, committables(headMap));
headMap.clear();
}
@Override
- public void processElement(StreamRecord<CommittableMessage<IN>> element) {
+ public void processElement(StreamRecord<Committable> element) {
output.collect(element);
- CommittableMessage<IN> message = element.getValue();
- if (message instanceof CommittableWithLineage) {
- this.inputs.add(((CommittableWithLineage<IN>) message).getCommittable());
- }
+ this.inputs.add(element.getValue());
}
@Override
@@ -154,8 +168,8 @@ public abstract class AbstractCommitterOperator<IN, CommT>
super.close();
}
- private List<IN> pollInputs() {
- List<IN> poll = new ArrayList<>(this.inputs);
+ private List<Committable> pollInputs() {
+ List<Committable> poll = new ArrayList<>(this.inputs);
this.inputs.clear();
return poll;
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 8215d475..91141ece 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -22,15 +22,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
-import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
-import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
import javax.annotation.Nullable;
@@ -46,7 +44,7 @@ public class FlinkSinkBuilder {
private DataStream<RowData> input;
@Nullable private CatalogLock.Factory lockFactory;
@Nullable private Map<String, String> overwritePartition;
- @Nullable private LogSinkProvider logSinkProvider;
+ @Nullable private LogSinkFunction logSinkFunction;
@Nullable private Integer parallelism;
public FlinkSinkBuilder(ObjectIdentifier tableIdentifier, FileStoreTable table) {
@@ -70,8 +68,8 @@ public class FlinkSinkBuilder {
return this;
}
- public FlinkSinkBuilder withLogSinkProvider(LogSinkProvider logSinkProvider) {
- this.logSinkProvider = logSinkProvider;
+ public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction logSinkFunction) {
+ this.logSinkFunction = logSinkFunction;
return this;
}
@@ -101,16 +99,15 @@ public class FlinkSinkBuilder {
partitioned.setParallelism(parallelism);
}
- StoreSink<?, ?> sink =
- new StoreSink<>(
+ StoreSink sink =
+ new StoreSink(
tableIdentifier,
table,
conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED),
getCompactPartSpec(),
lockFactory,
overwritePartition,
- logSinkProvider);
- return GlobalCommittingSinkTranslator.translate(
- new DataStream<>(input.getExecutionEnvironment(), partitioned), sink);
+ logSinkFunction);
+ return sink.sinkTo(new DataStream<>(input.getExecutionEnvironment(), partitioned));
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
new file mode 100644
index 00000000..7d686d9e
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Prepare commit operator to emit {@link Committable}s. */
+public abstract class PrepareCommitOperator extends AbstractStreamOperator<Committable>
+ implements OneInputStreamOperator<RowData, Committable>, BoundedOneInput {
+
+ private boolean endOfInput = false;
+
+ public PrepareCommitOperator() {
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws Exception {}
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ if (!endOfInput) {
+ emitCommittables();
+ }
+ // no records are expected to emit after endOfInput
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ endOfInput = true;
+ emitCommittables();
+ }
+
+ private void emitCommittables() throws IOException {
+ prepareCommit().forEach(committable -> output.collect(new StreamRecord<>(committable)));
+ }
+
+ protected abstract List<Committable> prepareCommit() throws IOException;
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
similarity index 83%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
index eaa2c9c9..0d35d4ad 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.connector.sink;
-import org.apache.flink.table.catalog.CatalogLock;
-import org.apache.flink.table.store.connector.sink.global.GlobalCommitter;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
@@ -29,14 +27,14 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
-/** {@link GlobalCommitter} for dynamic store. */
-public class StoreGlobalCommitter implements GlobalCommitter<Committable, ManifestCommittable> {
+/** {@link Committer} for dynamic store. */
+public class StoreCommitter implements Committer {
private final TableCommit commit;
@Nullable private final CatalogLock lock;
- public StoreGlobalCommitter(TableCommit commit, @Nullable CatalogLock lock) {
+ public StoreCommitter(TableCommit commit, @Nullable CatalogLock lock) {
this.commit = commit;
this.lock = lock;
}
@@ -55,8 +53,7 @@ public class StoreGlobalCommitter implements GlobalCommitter<Committable, Manife
}
@Override
- public ManifestCommittable combine(long checkpointId, List<Committable> committables)
- throws IOException {
+ public ManifestCommittable combine(long checkpointId, List<Committable> committables) {
ManifestCommittable fileCommittable = new ManifestCommittable(String.valueOf(checkpointId));
for (Committable committable : committables) {
switch (committable.kind()) {
@@ -70,9 +67,6 @@ public class StoreGlobalCommitter implements GlobalCommitter<Committable, Manife
(LogOffsetCommittable) committable.wrappedCommittable();
fileCommittable.addLogOffset(offset.bucket(), offset.offset());
break;
- case LOG:
- // log should be committed in local committer
- break;
}
}
return fileCommittable;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
new file mode 100644
index 00000000..8ad6bf07
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCompact;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** A dedicated operator for manual triggered compaction. */
+public class StoreCompactOperator extends PrepareCommitOperator {
+
+ private final FileStoreTable table;
+
+ @Nullable private final Map<String, String> compactPartitionSpec;
+
+ private TableCompact compact;
+
+ public StoreCompactOperator(
+ FileStoreTable table, @Nullable Map<String, String> compactPartitionSpec) {
+ this.table = table;
+ this.compactPartitionSpec = compactPartitionSpec;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ int task = getRuntimeContext().getIndexOfThisSubtask();
+ int numTask = getRuntimeContext().getNumberOfParallelSubtasks();
+ compact = table.newCompact();
+ compact.withPartitions(
+ compactPartitionSpec == null ? Collections.emptyMap() : compactPartitionSpec);
+ compact.withFilter(
+ (partition, bucket) -> task == Math.abs(Objects.hash(partition, bucket) % numTask));
+ }
+
+ @Override
+ protected List<Committable> prepareCommit() throws IOException {
+ return compact.compact().stream()
+ .map(c -> new Committable(Committable.Kind.FILE, c))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java
deleted file mode 100644
index d3bda6ab..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.api.connector.sink2.Committer;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-import static org.apache.flink.table.store.connector.sink.global.LocalCommitterOperator.convertCommitRequest;
-
-/** Store local {@link Committer} to commit log sink. */
-public class StoreLocalCommitter<LogCommT> implements Committer<Committable> {
-
- @Nullable private final Committer<LogCommT> logCommitter;
-
- public StoreLocalCommitter(@Nullable Committer<LogCommT> logCommitter) {
- this.logCommitter = logCommitter;
- }
-
- @Override
- public void commit(Collection<CommitRequest<Committable>> requests)
- throws IOException, InterruptedException {
- List<CommitRequest<LogCommT>> logRequests = new ArrayList<>();
- for (CommitRequest<Committable> request : requests) {
- if (request.getCommittable().kind() == Committable.Kind.LOG) {
- //noinspection unchecked
- logRequests.add(
- convertCommitRequest(
- request,
- committable -> (LogCommT) committable.wrappedCommittable(),
- committable -> new Committable(Committable.Kind.LOG, committable)));
- }
- }
-
- if (logRequests.size() > 0) {
- Objects.requireNonNull(logCommitter, "logCommitter should not be null.");
- logCommitter.commit(logRequests);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (logCommitter != null) {
- logCommitter.close();
- }
- }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index eb60b409..3a8ed3a3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -18,45 +18,33 @@
package org.apache.flink.table.store.connector.sink;
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
-import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.operation.Lock;
-import org.apache.flink.table.store.log.LogInitContext;
-import org.apache.flink.table.store.log.LogSinkProvider;
-import org.apache.flink.table.store.log.LogWriteCallback;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.SinkRecord;
-import org.apache.flink.table.store.table.sink.TableCompact;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
import javax.annotation.Nullable;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Collection;
-import java.util.Collections;
+import java.io.Serializable;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.Callable;
-import java.util.function.Consumer;
-/** {@link Sink} of dynamic store. */
-public class StoreSink<WriterStateT, LogCommT>
- implements StatefulSink<RowData, WriterStateT>,
- GlobalCommittingSink<RowData, Committable, ManifestCommittable> {
+/** Sink of dynamic store. */
+public class StoreSink implements Serializable {
private static final long serialVersionUID = 1L;
+ private static final String WRITER_NAME = "Writer";
+
+ private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
+
private final ObjectIdentifier tableIdentifier;
private final FileStoreTable table;
@@ -69,7 +57,7 @@ public class StoreSink<WriterStateT, LogCommT>
@Nullable private final Map<String, String> overwritePartition;
- @Nullable private final LogSinkProvider logSinkProvider;
+ @Nullable private final LogSinkFunction logSinkFunction;
public StoreSink(
ObjectIdentifier tableIdentifier,
@@ -78,100 +66,24 @@ public class StoreSink<WriterStateT, LogCommT>
@Nullable Map<String, String> compactPartitionSpec,
@Nullable CatalogLock.Factory lockFactory,
@Nullable Map<String, String> overwritePartition,
- @Nullable LogSinkProvider logSinkProvider) {
+ @Nullable LogSinkFunction logSinkFunction) {
this.tableIdentifier = tableIdentifier;
this.table = table;
this.compactionTask = compactionTask;
this.compactPartitionSpec = compactPartitionSpec;
this.lockFactory = lockFactory;
this.overwritePartition = overwritePartition;
- this.logSinkProvider = logSinkProvider;
- }
-
- @Override
- public StatefulPrecommittingSinkWriter<WriterStateT> createWriter(InitContext initContext)
- throws IOException {
- return restoreWriter(initContext, null);
+ this.logSinkFunction = logSinkFunction;
}
- @Override
- public StatefulPrecommittingSinkWriter<WriterStateT> restoreWriter(
- InitContext initContext, Collection<WriterStateT> states) throws IOException {
+ private OneInputStreamOperator<RowData, Committable> createWriteOperator() {
if (compactionTask) {
- return createCompactWriter(initContext);
- }
- SinkWriter<SinkRecord> logWriter = null;
- LogWriteCallback logCallback = null;
- if (logSinkProvider != null) {
- logCallback = new LogWriteCallback();
- Consumer<?> metadataConsumer = logSinkProvider.createMetadataConsumer(logCallback);
- LogInitContext logInitContext = new LogInitContext(initContext, metadataConsumer);
- Sink<SinkRecord> logSink = logSinkProvider.createSink();
- logWriter =
- states == null
- ? logSink.createWriter(logInitContext)
- : ((StatefulSink<SinkRecord, WriterStateT>) logSink)
- .restoreWriter(logInitContext, states);
- }
- return new StoreSinkWriter<>(
- table.newWrite().withOverwrite(overwritePartition != null), logWriter, logCallback);
- }
-
- private StoreSinkCompactor<WriterStateT> createCompactWriter(InitContext initContext) {
- int task = initContext.getSubtaskId();
- int numTask = initContext.getNumberOfParallelSubtasks();
- TableCompact tableCompact = table.newCompact();
- tableCompact.withPartitions(
- compactPartitionSpec == null ? Collections.emptyMap() : compactPartitionSpec);
- tableCompact.withFilter(
- (partition, bucket) -> task == Math.abs(Objects.hash(partition, bucket) % numTask));
- return new StoreSinkCompactor<>(tableCompact);
- }
-
- @Override
- public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
- return logSinkProvider == null
- ? new NoOutputSerializer<>()
- : ((StatefulSink<SinkRecord, WriterStateT>) logSinkProvider.createSink())
- .getWriterStateSerializer();
- }
-
- @Nullable
- private Committer<LogCommT> logCommitter() {
- if (logSinkProvider != null) {
- Sink<SinkRecord> sink = logSinkProvider.createSink();
- if (sink instanceof TwoPhaseCommittingSink) {
- try {
- return ((TwoPhaseCommittingSink<SinkRecord, LogCommT>) sink).createCommitter();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
+ return new StoreCompactOperator(table, compactPartitionSpec);
}
-
- return null;
+ return new StoreWriteOperator(table, overwritePartition, logSinkFunction);
}
- @Nullable
- private SimpleVersionedSerializer<LogCommT> logCommitSerializer() {
- if (logSinkProvider != null) {
- Sink<SinkRecord> sink = logSinkProvider.createSink();
- if (sink instanceof TwoPhaseCommittingSink) {
- return ((TwoPhaseCommittingSink<SinkRecord, LogCommT>) sink)
- .getCommittableSerializer();
- }
- }
-
- return null;
- }
-
- @Override
- public Committer<Committable> createCommitter() {
- return new StoreLocalCommitter<>(logCommitter());
- }
-
- @Override
- public StoreGlobalCommitter createGlobalCommitter() {
+ private StoreCommitter createCommitter() {
CatalogLock catalogLock;
Lock lock;
if (lockFactory == null) {
@@ -191,40 +103,25 @@ public class StoreSink<WriterStateT, LogCommT>
};
}
- return new StoreGlobalCommitter(
+ return new StoreCommitter(
table.newCommit().withOverwritePartition(overwritePartition).withLock(lock),
catalogLock);
}
- @SuppressWarnings("unchecked")
- @Override
- public SimpleVersionedSerializer<Committable> getCommittableSerializer() {
- return new CommittableSerializer(
- fileCommitSerializer(), (SimpleVersionedSerializer<Object>) logCommitSerializer());
- }
-
- @Override
- public ManifestCommittableSerializer getGlobalCommittableSerializer() {
- return new ManifestCommittableSerializer();
- }
-
- private FileCommittableSerializer fileCommitSerializer() {
- return new FileCommittableSerializer();
- }
-
- private static class NoOutputSerializer<T> implements SimpleVersionedSerializer<T> {
- private NoOutputSerializer() {}
-
- public int getVersion() {
- return 1;
- }
-
- public byte[] serialize(T obj) {
- throw new IllegalStateException("Should not serialize anything");
- }
-
- public T deserialize(int version, byte[] serialized) {
- throw new IllegalStateException("Should not deserialize anything");
- }
+ public DataStreamSink<?> sinkTo(DataStream<RowData> input) {
+ CommittableTypeInfo typeInfo = new CommittableTypeInfo();
+ SingleOutputStreamOperator<Committable> written =
+ input.transform(WRITER_NAME, typeInfo, createWriteOperator())
+ .setParallelism(input.getParallelism());
+
+ SingleOutputStreamOperator<?> committed =
+ written.transform(
+ GLOBAL_COMMITTER_NAME,
+ typeInfo,
+ new CommitterOperator(
+ this::createCommitter, ManifestCommittableSerializer::new))
+ .setParallelism(1)
+ .setMaxParallelism(1);
+ return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
deleted file mode 100644
index 4a79a8c6..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
-import org.apache.flink.table.store.table.sink.TableCompact;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/** A dedicated {@link SinkWriter} for manual triggered compaction. */
-public class StoreSinkCompactor<WriterStateT>
- implements StatefulPrecommittingSinkWriter<WriterStateT> {
-
- private final TableCompact tableCompact;
-
- public StoreSinkCompactor(TableCompact tableCompact) {
- this.tableCompact = tableCompact;
- }
-
- @Override
- public void flush(boolean endOfInput) {
- // nothing to flush
- }
-
- @Override
- public void write(RowData element, Context context) throws IOException, InterruptedException {
- // nothing to write
- }
-
- @Override
- public void close() throws Exception {
- // nothing to close
- }
-
- @Override
- public List<WriterStateT> snapshotState(long checkpointId) {
- // nothing to snapshot
- return Collections.emptyList();
- }
-
- @Override
- public Collection<Committable> prepareCommit() {
- return tableCompact.compact().stream()
- .map(c -> new Committable(Committable.Kind.FILE, c))
- .collect(Collectors.toList());
- }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
deleted file mode 100644
index c3c6a7db..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.log.LogWriteCallback;
-import org.apache.flink.table.store.table.sink.AbstractTableWrite;
-import org.apache.flink.table.store.table.sink.FileCommittable;
-import org.apache.flink.table.store.table.sink.SinkRecord;
-import org.apache.flink.table.store.table.sink.SinkRecordConverter;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/** A {@link SinkWriter} for dynamic store. */
-public class StoreSinkWriter<WriterStateT>
- implements StatefulPrecommittingSinkWriter<WriterStateT> {
-
- private final TableWrite write;
-
- @Nullable private final SinkWriter<SinkRecord> logWriter;
-
- @Nullable private final LogWriteCallback logCallback;
-
- private final ExecutorService compactExecutor;
-
- public StoreSinkWriter(
- TableWrite write,
- @Nullable SinkWriter<SinkRecord> logWriter,
- @Nullable LogWriteCallback logCallback) {
- this.write = write;
- this.logWriter = logWriter;
- this.logCallback = logCallback;
- this.compactExecutor =
- Executors.newSingleThreadScheduledExecutor(
- new ExecutorThreadFactory("compaction-thread"));
- }
-
- @Override
- public void write(RowData rowData, Context context) throws IOException, InterruptedException {
- SinkRecord record;
- try {
- record = write.write(rowData);
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- // write to log store, need to preserve original pk (which includes partition fields)
- if (logWriter != null) {
- SinkRecordConverter converter = write.recordConverter();
- logWriter.write(converter.convertToLogSinkRecord(record), context);
- }
- }
-
- @Override
- public void flush(boolean endOfInput) throws IOException, InterruptedException {
- if (logWriter != null) {
- logWriter.flush(endOfInput);
- }
- }
-
- @Override
- public List<WriterStateT> snapshotState(long checkpointId) throws IOException {
- if (logWriter != null && logWriter instanceof StatefulSinkWriter) {
- return ((StatefulSinkWriter<?, WriterStateT>) logWriter).snapshotState(checkpointId);
- }
- return Collections.emptyList();
- }
-
- @Override
- public List<Committable> prepareCommit() throws IOException, InterruptedException {
- List<Committable> committables = new ArrayList<>();
- try {
- for (FileCommittable committable : write.prepareCommit()) {
- committables.add(new Committable(Committable.Kind.FILE, committable));
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- if (logWriter != null) {
- if (logWriter instanceof PrecommittingSinkWriter) {
- Collection<?> logCommittables =
- ((PrecommittingSinkWriter<?, ?>) logWriter).prepareCommit();
- for (Object logCommittable : logCommittables) {
- committables.add(new Committable(Committable.Kind.LOG, logCommittable));
- }
- }
-
- Objects.requireNonNull(logCallback, "logCallback should not be null.");
- logCallback
- .offsets()
- .forEach(
- (k, v) ->
- committables.add(
- new Committable(
- Committable.Kind.LOG_OFFSET,
- new LogOffsetCommittable(k, v))));
- }
- return committables;
- }
-
- @Override
- public void close() throws Exception {
- this.compactExecutor.shutdownNow();
- write.close();
-
- if (logWriter != null) {
- logWriter.close();
- }
- }
-
- @SuppressWarnings("unchecked")
- @VisibleForTesting
- Map<BinaryRowData, Map<Integer, RecordWriter<?>>> writers() {
- return ((AbstractTableWrite) write).writers();
- }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
new file mode 100644
index 00000000..6679ca21
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.log.LogWriteCallback;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.TableWrite;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** A {@link PrepareCommitOperator} to write records. */
+public class StoreWriteOperator extends PrepareCommitOperator {
+
+ private final FileStoreTable table;
+
+ @Nullable private final Map<String, String> overwritePartition;
+
+ @Nullable private final LogSinkFunction logSinkFunction;
+
+ private transient SimpleContext sinkContext;
+
+ /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
+ private long currentWatermark = Long.MIN_VALUE;
+
+ private TableWrite write;
+
+ @Nullable private LogWriteCallback logCallback;
+
+ public StoreWriteOperator(
+ FileStoreTable table,
+ @Nullable Map<String, String> overwritePartition,
+ @Nullable LogSinkFunction logSinkFunction) {
+ this.table = table;
+ this.overwritePartition = overwritePartition;
+ this.logSinkFunction = logSinkFunction;
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<Committable>> output) {
+ super.setup(containingTask, config, output);
+ if (logSinkFunction != null) {
+ FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext());
+ }
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+ if (logSinkFunction != null) {
+ StreamingFunctionUtils.restoreFunctionState(context, logSinkFunction);
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.write = table.newWrite().withOverwrite(overwritePartition != null);
+ this.sinkContext = new SimpleContext(getProcessingTimeService());
+ if (logSinkFunction != null) {
+ FunctionUtils.openFunction(logSinkFunction, new Configuration());
+ logCallback = new LogWriteCallback();
+ logSinkFunction.setWriteCallback(logCallback);
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ this.currentWatermark = mark.getTimestamp();
+ if (logSinkFunction != null) {
+ logSinkFunction.writeWatermark(
+ new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws Exception {
+ sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
+
+ SinkRecord record;
+ try {
+ record = write.write(element.getValue());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ if (logSinkFunction != null) {
+ // write to log store, need to preserve original pk (which includes partition fields)
+ SinkRecord logRecord = write.recordConverter().convertToLogSinkRecord(record);
+ logSinkFunction.invoke(logRecord, sinkContext);
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ if (logSinkFunction != null) {
+ StreamingFunctionUtils.snapshotFunctionState(
+ context, getOperatorStateBackend(), logSinkFunction);
+ }
+ }
+
+ @Override
+ public void finish() throws Exception {
+ super.finish();
+ if (logSinkFunction != null) {
+ logSinkFunction.finish();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (write != null) {
+ write.close();
+ }
+
+ if (logSinkFunction != null) {
+ FunctionUtils.closeFunction(logSinkFunction);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ super.notifyCheckpointComplete(checkpointId);
+
+ if (logSinkFunction instanceof CheckpointListener) {
+ ((CheckpointListener) logSinkFunction).notifyCheckpointComplete(checkpointId);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ super.notifyCheckpointAborted(checkpointId);
+
+ if (logSinkFunction instanceof CheckpointListener) {
+ ((CheckpointListener) logSinkFunction).notifyCheckpointAborted(checkpointId);
+ }
+ }
+
+ @Override
+ protected List<Committable> prepareCommit() throws IOException {
+ List<Committable> committables = new ArrayList<>();
+ try {
+ for (FileCommittable committable : write.prepareCommit()) {
+ committables.add(new Committable(Committable.Kind.FILE, committable));
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ if (logCallback != null) {
+ logCallback
+ .offsets()
+ .forEach(
+ (k, v) ->
+ committables.add(
+ new Committable(
+ Committable.Kind.LOG_OFFSET,
+ new LogOffsetCommittable(k, v))));
+ }
+
+ return committables;
+ }
+
+ private class SimpleContext implements SinkFunction.Context {
+
+ @Nullable private Long timestamp;
+
+ private final ProcessingTimeService processingTimeService;
+
+ public SimpleContext(ProcessingTimeService processingTimeService) {
+ this.processingTimeService = processingTimeService;
+ }
+
+ @Override
+ public long currentProcessingTime() {
+ return processingTimeService.getCurrentProcessingTime();
+ }
+
+ @Override
+ public long currentWatermark() {
+ return currentWatermark;
+ }
+
+ @Override
+ public Long timestamp() {
+ return timestamp;
+ }
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 5cf04f8c..b08b81d7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -18,15 +18,12 @@
package org.apache.flink.table.store.connector.sink;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.RequireCatalogLock;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
@@ -40,8 +37,7 @@ import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
@@ -50,8 +46,7 @@ import java.util.HashMap;
import java.util.Map;
/** Table sink to create {@link StoreSink}. */
-public class TableStoreSink
- implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
private final ObjectIdentifier tableIdentifier;
private final FileStoreTable table;
@@ -113,41 +108,15 @@ public class TableStoreSink
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
LogSinkProvider logSinkProvider = null;
if (logStoreTableFactory != null) {
- logSinkProvider =
- logStoreTableFactory.createSinkProvider(
- logStoreContext,
- new LogStoreTableFactory.SinkContext() {
- @Override
- public boolean isBounded() {
- return context.isBounded();
- }
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(
- DataType consumedDataType) {
- return context.createTypeInformation(consumedDataType);
- }
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(
- LogicalType consumedLogicalType) {
- return context.createTypeInformation(consumedLogicalType);
- }
-
- @Override
- public DynamicTableSink.DataStructureConverter
- createDataStructureConverter(DataType consumedDataType) {
- return context.createDataStructureConverter(consumedDataType);
- }
- });
+ logSinkProvider = logStoreTableFactory.createSinkProvider(logStoreContext, context);
}
Configuration conf = Configuration.fromMap(table.schema().options());
// Do not sink to log store when overwrite mode
- final LogSinkProvider finalLogSinkProvider =
+ final LogSinkFunction logSinkFunction =
overwrite || conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED)
? null
- : logSinkProvider;
+ : (logSinkProvider == null ? null : logSinkProvider.createSink());
return (DataStreamSinkProvider)
(providerContext, dataStream) ->
new FlinkSinkBuilder(tableIdentifier, table)
@@ -156,7 +125,7 @@ public class TableStoreSink
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
.withLockFactory(lockFactory)
- .withLogSinkProvider(finalLogSinkProvider)
+ .withLogSinkFunction(logSinkFunction)
.withOverwritePartition(overwrite ? staticPartitions : null)
.withParallelism(
conf.get(TableStoreFactoryOptions.SINK_PARALLELISM))
@@ -196,7 +165,6 @@ public class TableStoreSink
this.overwrite = overwrite;
}
- @Override
public void setLockFactory(@Nullable CatalogLock.Factory lockFactory) {
this.lockFactory = lockFactory;
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
deleted file mode 100644
index a606bf97..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.util.function.SerializableSupplier;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** An {@link AbstractCommitterOperator} to process global committer. */
-public class GlobalCommitterOperator<CommT, GlobalCommT>
- extends AbstractCommitterOperator<CommT, GlobalCommT> {
-
- private static final long serialVersionUID = 1L;
-
- private final SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> committerFactory;
-
- /**
- * Aggregate committables to global committables and commit the global committables to the
- * external system.
- */
- private GlobalCommitter<CommT, GlobalCommT> committer;
-
- public GlobalCommitterOperator(
- SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> committerFactory,
- SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>> committableSerializer) {
- super(committableSerializer);
- this.committerFactory = checkNotNull(committerFactory);
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws Exception {
- committer = committerFactory.get();
- super.initializeState(context);
- }
-
- @Override
- public void commit(boolean isRecover, List<GlobalCommT> committables)
- throws IOException, InterruptedException {
- if (isRecover) {
- committables = committer.filterRecoveredCommittables(committables);
- }
- committer.commit(committables);
- }
-
- @Override
- public List<GlobalCommT> toCommittables(long checkpoint, List<CommT> inputs) throws Exception {
- return Collections.singletonList(committer.combine(checkpoint, inputs));
- }
-
- @Override
- public void close() throws Exception {
- committer.close();
- super.close();
- }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
deleted file mode 100644
index 90d97b9f..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-/**
- * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink}
- * consists of a {@link SinkWriter} that performs the precommits and a {@link GlobalCommitter} that
- * actually commits the data.
- *
- * @param <InputT> The type of the sink's input
- * @param <CommT> The type of the committables.
- * @param <GlobalCommT> The type of the aggregated committable.
- */
-public interface GlobalCommittingSink<InputT, CommT, GlobalCommT>
- extends TwoPhaseCommittingSink<InputT, CommT> {
-
- /**
- * Creates a {@link GlobalCommitter} that permanently makes the previously written data visible
- * through {@link GlobalCommitter#commit}.
- */
- GlobalCommitter<CommT, GlobalCommT> createGlobalCommitter();
-
- /** Returns the serializer of the global committable type. */
- SimpleVersionedSerializer<GlobalCommT> getGlobalCommittableSerializer();
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
deleted file mode 100644
index c034276f..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-
-/** A translator for the {@link GlobalCommittingSink}. */
-public class GlobalCommittingSinkTranslator {
-
- private static final String WRITER_NAME = "Writer";
-
- private static final String LOCAL_COMMITTER_NAME = "Local Committer";
-
- private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
-
- public static <T, CommT, GlobalCommT> DataStreamSink<?> translate(
- DataStream<T> input, GlobalCommittingSink<T, CommT, GlobalCommT> sink) {
- TypeInformation<CommittableMessage<CommT>> commitType =
- CommittableMessageTypeInfo.of(sink::getCommittableSerializer);
-
- SingleOutputStreamOperator<CommittableMessage<CommT>> written =
- input.transform(WRITER_NAME, commitType, new SinkWriterOperatorFactory<>(sink))
- .setParallelism(input.getParallelism());
-
- SingleOutputStreamOperator<CommittableMessage<CommT>> local =
- written.transform(
- LOCAL_COMMITTER_NAME,
- commitType,
- new LocalCommitterOperator<>(
- () -> {
- try {
- return sink.createCommitter();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- },
- sink::getCommittableSerializer))
- .setParallelism(written.getParallelism());
-
- SingleOutputStreamOperator<?> committed =
- local.global()
- .transform(
- GLOBAL_COMMITTER_NAME,
- commitType,
- new GlobalCommitterOperator<>(
- sink::createGlobalCommitter,
- sink::getGlobalCommittableSerializer))
- .setParallelism(1)
- .setMaxParallelism(1);
- return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
- }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java
deleted file mode 100644
index 9e7974ce..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
-import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState;
-import org.apache.flink.util.function.SerializableSupplier;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Function;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** A {@link AbstractCommitterOperator} to process local committer. */
-public class LocalCommitterOperator<CommT> extends AbstractCommitterOperator<CommT, CommT> {
-
- private static final long serialVersionUID = 1L;
-
- private final SerializableSupplier<Committer<CommT>> committerFactory;
-
- private Committer<CommT> committer;
-
- public LocalCommitterOperator(
- SerializableSupplier<Committer<CommT>> committerFactory,
- SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer) {
- super(committableSerializer);
- this.committerFactory = checkNotNull(committerFactory);
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws Exception {
- committer = committerFactory.get();
- super.initializeState(context);
- }
-
- @Override
- public void commit(boolean isRecover, List<CommT> committables)
- throws IOException, InterruptedException {
- if (committables.isEmpty()) {
- return;
- }
-
- List<CommitRequestImpl> requests = new ArrayList<>(committables.size());
- for (CommT comm : committables) {
- requests.add(new CommitRequestImpl(comm));
- }
-
- long sleep = 1000;
- while (true) {
- // commit
- requests.forEach(CommitRequestImpl::setSelected);
- committer.commit(new ArrayList<>(requests));
- requests.forEach(CommitRequestImpl::setCommittedIfNoError);
-
- // drain finished
- requests.removeIf(CommitRequestImpl::isFinished);
- if (requests.isEmpty()) {
- return;
- }
-
- //noinspection BusyWait
- Thread.sleep(sleep);
- sleep *= 2;
- }
- }
-
- @Override
- public List<CommT> toCommittables(long checkpoint, List<CommT> inputs) {
- return inputs;
- }
-
- @Override
- public void close() throws Exception {
- committer.close();
- super.close();
- }
-
- /** {@link CommitRequest} implementation. */
- public class CommitRequestImpl implements CommitRequest<CommT> {
-
- private CommT committable;
- private int numRetries;
- private CommitRequestState state;
-
- private CommitRequestImpl(CommT committable) {
- this.committable = committable;
- this.state = CommitRequestState.RECEIVED;
- }
-
- private boolean isFinished() {
- return state.isFinalState();
- }
-
- @Override
- public CommT getCommittable() {
- return this.committable;
- }
-
- @Override
- public int getNumberOfRetries() {
- return this.numRetries;
- }
-
- @Override
- public void signalFailedWithKnownReason(Throwable t) {
- this.state = CommitRequestState.FAILED;
- }
-
- @Override
- public void signalFailedWithUnknownReason(Throwable t) {
- this.state = CommitRequestState.FAILED;
- throw new IllegalStateException("Failed to commit " + this.committable, t);
- }
-
- @Override
- public void retryLater() {
- this.state = CommitRequestState.RETRY;
- ++this.numRetries;
- }
-
- @Override
- public void updateAndRetryLater(CommT committable) {
- this.committable = committable;
- this.retryLater();
- }
-
- @Override
- public void signalAlreadyCommitted() {
- this.state = CommitRequestState.COMMITTED;
- }
-
- void setSelected() {
- state = CommitRequestState.RECEIVED;
- }
-
- void setCommittedIfNoError() {
- if (state == CommitRequestState.RECEIVED) {
- state = CommitRequestState.COMMITTED;
- }
- }
- }
-
- /** Convert a {@link CommitRequest} to another type. */
- public static <CommT, NewT> CommitRequest<NewT> convertCommitRequest(
- CommitRequest<CommT> request, Function<CommT, NewT> to, Function<NewT, CommT> from) {
- return new CommitRequest<NewT>() {
-
- @Override
- public NewT getCommittable() {
- return to.apply(request.getCommittable());
- }
-
- @Override
- public int getNumberOfRetries() {
- return request.getNumberOfRetries();
- }
-
- @Override
- public void signalFailedWithKnownReason(Throwable throwable) {
- request.signalFailedWithKnownReason(throwable);
- }
-
- @Override
- public void signalFailedWithUnknownReason(Throwable throwable) {
- request.signalFailedWithUnknownReason(throwable);
- }
-
- @Override
- public void retryLater() {
- request.retryLater();
- }
-
- @Override
- public void updateAndRetryLater(NewT committable) {
- request.updateAndRetryLater(from.apply(committable));
- }
-
- @Override
- public void signalAlreadyCommitted() {
- request.signalAlreadyCommitted();
- }
- };
- }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index d181e426..6c6ea646 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.connector.source;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -46,7 +45,6 @@ import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
@@ -119,28 +117,7 @@ public class TableStoreSource
if (logStoreTableFactory != null) {
logSourceProvider =
logStoreTableFactory.createSourceProvider(
- logStoreContext,
- new LogStoreTableFactory.SourceContext() {
- @Override
- public <T> TypeInformation<T> createTypeInformation(
- DataType producedDataType) {
- return scanContext.createTypeInformation(producedDataType);
- }
-
- @Override
- public <T> TypeInformation<T> createTypeInformation(
- LogicalType producedLogicalType) {
- return scanContext.createTypeInformation(producedLogicalType);
- }
-
- @Override
- public DataStructureConverter createDataStructureConverter(
- DataType producedDataType) {
- return scanContext.createDataStructureConverter(
- producedDataType);
- }
- },
- projectFields);
+ logStoreContext, scanContext, projectFields);
}
FlinkSourceBuilder sourceBuilder =
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
index 98db8f5b..03b5e614 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.connector.sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.table.sink.FileCommittable;
@@ -36,10 +34,7 @@ public class CommittableSerializerTest {
private final FileCommittableSerializer fileSerializer = new FileCommittableSerializer();
- private final CommittableSerializer serializer =
- new CommittableSerializer(
- fileSerializer,
- (SimpleVersionedSerializer) TestSink.StringCommittableSerializer.INSTANCE);
+ private final CommittableSerializer serializer = new CommittableSerializer(fileSerializer);
@Test
public void testFile() throws IOException {
@@ -71,18 +66,4 @@ public class CommittableSerializerTest {
.wrappedCommittable();
assertThat(newCommittable).isEqualTo(committable);
}
-
- @Test
- public void testLog() throws IOException {
- String log = "random_string";
- String newCommittable =
- (String)
- serializer
- .deserialize(
- 1,
- serializer.serialize(
- new Committable(Committable.Kind.LOG, log)))
- .wrappedCommittable();
- assertThat(newCommittable).isEqualTo(log);
- }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 9458e504..201f1692 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -47,8 +47,6 @@ import static org.apache.flink.table.store.connector.FileStoreITCase.buildFileSt
import static org.apache.flink.table.store.connector.FileStoreITCase.buildStreamEnv;
import static org.apache.flink.table.store.connector.FileStoreITCase.buildTestSource;
import static org.apache.flink.table.store.connector.FileStoreITCase.executeAndCollect;
-import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
-import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SOURCE_CONTEXT;
import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
import static org.assertj.core.api.Assertions.assertThat;
@@ -112,9 +110,8 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
hasPk ? new int[] {2} : new int[0]);
KafkaLogStoreFactory factory = discoverKafkaLogFactory();
- KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, SINK_CONTEXT);
- KafkaLogSourceProvider sourceProvider =
- factory.createSourceProvider(context, SOURCE_CONTEXT, null);
+ KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, null);
+ KafkaLogSourceProvider sourceProvider = factory.createSourceProvider(context, null, null);
factory.onCreateTable(context, 3, true);
@@ -122,7 +119,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
// write
new FlinkSinkBuilder(IDENTIFIER, table)
.withInput(buildTestSource(env, isBatch))
- .withLogSinkProvider(sinkProvider)
+ .withLogSinkFunction(sinkProvider.createSink())
.build();
env.execute();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
deleted file mode 100644
index 561dbce0..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.table.catalog.CatalogLock;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
-import org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.UserCodeClassLoader;
-
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.OptionalLong;
-import java.util.concurrent.Callable;
-
-import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link StoreSink}. */
-@RunWith(Parameterized.class)
-public class StoreSinkTest {
-
- @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
- private final boolean hasPk;
-
- private final boolean partitioned;
-
- private final ObjectIdentifier identifier =
- ObjectIdentifier.of("my_catalog", "my_database", "my_table");
-
- private final TestLock lock = new TestLock();
-
- private TestFileStore fileStore;
- private TestFileStoreTable table;
-
- public StoreSinkTest(boolean hasPk, boolean partitioned) {
- this.hasPk = hasPk;
- this.partitioned = partitioned;
- }
-
- @Before
- public void before() throws Exception {
- Path path = new Path(tempFolder.newFolder().toURI().toString());
- TableSchema tableSchema =
- new SchemaManager(path)
- .commitNewVersion(
- new UpdateSchema(
- RowType.of(
- new LogicalType[] {
- new IntType(), new IntType(), new IntType()
- },
- new String[] {"a", "b", "c"}),
- partitioned
- ? Collections.singletonList("a")
- : Collections.emptyList(),
- hasPk ? Arrays.asList("a", "b") : Collections.emptyList(),
- new HashMap<>(),
- ""));
-
- RowType partitionType = tableSchema.logicalPartitionType();
- fileStore = new TestFileStore(hasPk, partitionType);
- table = new TestFileStoreTable(path, fileStore, tableSchema);
- }
-
- @Parameterized.Parameters(name = "hasPk-{0}, partitioned-{1}")
- public static List<Boolean[]> data() {
- return Arrays.asList(
- new Boolean[] {true, true},
- new Boolean[] {true, false},
- new Boolean[] {false, false},
- new Boolean[] {false, true});
- }
-
- @Test
- public void testChangelogs() throws Exception {
- Assume.assumeTrue(hasPk && partitioned);
- StoreSink<?, ?> sink = newSink(null);
- writeAndCommit(
- sink,
- GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
- GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 0, 2, 3),
- GenericRowData.ofKind(RowKind.UPDATE_AFTER, 0, 7, 5),
- GenericRowData.ofKind(RowKind.DELETE, 1, 0, 1));
- assertThat(fileStore.committedFiles.get(row(1)).get(1))
- .isEqualTo(Collections.singletonList("DELETE-key-0-value-1/0/1"));
- assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Collections.singletonList("DELETE-key-2-value-0/2/3"));
- assertThat(fileStore.committedFiles.get(row(0)).get(1))
- .isEqualTo(Arrays.asList("INSERT-key-0-value-0/0/1", "INSERT-key-7-value-0/7/5"));
- }
-
- @Test
- public void testNoKeyChangelogs() throws Exception {
- Assume.assumeTrue(!hasPk && partitioned);
- StoreSink<?, ?> sink =
- new StoreSink<>(identifier, table, false, null, () -> lock, new HashMap<>(), null);
- writeAndCommit(
- sink,
- GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
- GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 0, 2, 3),
- GenericRowData.ofKind(RowKind.UPDATE_AFTER, 0, 4, 5),
- GenericRowData.ofKind(RowKind.DELETE, 1, 0, 1));
- assertThat(fileStore.committedFiles.get(row(1)).get(0))
- .isEqualTo(Collections.singletonList("INSERT-key-1/0/1-value--1"));
- assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Collections.singletonList("INSERT-key-0/4/5-value-1"));
- assertThat(fileStore.committedFiles.get(row(0)).get(1))
- .isEqualTo(Arrays.asList("INSERT-key-0/0/1-value-1", "INSERT-key-0/2/3-value--1"));
- }
-
- @Test
- public void testAppend() throws Exception {
- Assume.assumeTrue(hasPk && partitioned);
- StoreSink<?, ?> sink = newSink(null);
- writeAndAssert(sink);
-
- writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
- assertThat(fileStore.committedFiles.get(row(1)).get(0))
- .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
- assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Arrays.asList("INSERT-key-2-value-0/2/3", "INSERT-key-8-value-0/8/9"));
- }
-
- @Test
- public void testOverwrite() throws Exception {
- Assume.assumeTrue(hasPk && partitioned);
- StoreSink<?, ?> sink = newSink(new HashMap<>());
- writeAndAssert(sink);
-
- writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
- assertThat(fileStore.committedFiles.get(row(1)).get(1)).isNull();
- assertThat(fileStore.committedFiles.get(row(1)).get(0))
- .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
- assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Collections.singletonList("INSERT-key-8-value-0/8/9"));
- }
-
- @Test
- public void testOverwritePartition() throws Exception {
- Assume.assumeTrue(hasPk && partitioned);
- HashMap<String, String> partition = new HashMap<>();
- partition.put("part", "0");
- StoreSink<?, ?> sink = newSink(partition);
- writeAndAssert(sink);
-
- writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
- assertThat(fileStore.committedFiles.get(row(1)).get(1))
- .isEqualTo(Collections.singletonList("INSERT-key-0-value-1/0/1"));
- assertThat(fileStore.committedFiles.get(row(1)).get(0))
- .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
- assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Collections.singletonList("INSERT-key-8-value-0/8/9"));
- }
-
- private void writeAndAssert(StoreSink<?, ?> sink) throws Exception {
- writeAndCommit(
- sink,
- GenericRowData.of(0, 0, 1),
- GenericRowData.of(0, 2, 3),
- GenericRowData.of(0, 7, 5),
- GenericRowData.of(1, 0, 1));
- assertThat(fileStore.committedFiles.get(row(1)).get(1))
- .isEqualTo(Collections.singletonList("INSERT-key-0-value-1/0/1"));
- assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Collections.singletonList("INSERT-key-2-value-0/2/3"));
- assertThat(fileStore.committedFiles.get(row(0)).get(1))
- .isEqualTo(Arrays.asList("INSERT-key-0-value-0/0/1", "INSERT-key-7-value-0/7/5"));
- }
-
- private void writeAndCommit(StoreSink<?, ?> sink, RowData... rows) throws Exception {
- commit(sink, write(sink, rows));
- }
-
- private List<Committable> write(StoreSink<?, ?> sink, RowData... rows) throws Exception {
- StatefulPrecommittingSinkWriter<?> writer = sink.createWriter(null);
- for (RowData row : rows) {
- writer.write(row, null);
- }
-
- List<Committable> committables = ((StoreSinkWriter) writer).prepareCommit();
- Map<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> writers =
- new HashMap<>(((StoreSinkWriter) writer).writers());
- assertThat(writers.size()).isGreaterThan(0);
-
- writer.close();
- writers.forEach(
- (part, map) ->
- map.forEach(
- (bucket, recordWriter) ->
- assertThat(((TestRecordWriter) recordWriter).closed)
- .isTrue()));
- return committables;
- }
-
- private void commit(StoreSink<?, ?> sink, List<Committable> fileCommittables) throws Exception {
- StoreGlobalCommitter committer = sink.createGlobalCommitter();
- ManifestCommittable committable = committer.combine(0, fileCommittables);
-
- fileStore.expired = false;
- lock.locked = false;
- committer.commit(Collections.singletonList(committable));
- assertThat(fileStore.expired).isTrue();
- assertThat(lock.locked).isTrue();
-
- assertThat(
- committer
- .filterRecoveredCommittables(Collections.singletonList(committable))
- .size())
- .isEqualTo(0);
-
- lock.closed = false;
- committer.close();
- assertThat(lock.closed).isTrue();
- }
-
- private StoreSink<?, ?> newSink(@Nullable Map<String, String> overwritePartition) {
- return new StoreSink<>(
- identifier, table, false, null, () -> lock, overwritePartition, null);
- }
-
- private class TestLock implements CatalogLock {
-
- private boolean locked = false;
-
- private boolean closed = false;
-
- @Override
- public <T> T runWithLock(String database, String table, Callable<T> callable)
- throws Exception {
- assertThat(database).isEqualTo(identifier.getDatabaseName());
- assertThat(table).isEqualTo(identifier.getObjectName());
- locked = true;
- return callable.call();
- }
-
- @Override
- public void close() {
- closed = true;
- }
- }
-
- private Sink.InitContext initContext() {
- return new Sink.InitContext() {
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- return null;
- }
-
- @Override
- public MailboxExecutor getMailboxExecutor() {
- return null;
- }
-
- @Override
- public ProcessingTimeService getProcessingTimeService() {
- return null;
- }
-
- @Override
- public int getSubtaskId() {
- return 0;
- }
-
- @Override
- public int getNumberOfParallelSubtasks() {
- return 1;
- }
-
- @Override
- public SinkWriterMetricGroup metricGroup() {
- return null;
- }
-
- @Override
- public OptionalLong getRestoredCheckpointId() {
- return OptionalLong.empty();
- }
-
- @Override
- public SerializationSchema.InitializationContext
- asSerializationSchemaInitializationContext() {
- return null;
- }
- };
- }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
deleted file mode 100644
index aa8be1f0..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
-import org.apache.flink.table.store.file.operation.FileStoreCommit;
-import org.apache.flink.table.store.file.operation.FileStoreExpire;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.operation.Lock;
-import org.apache.flink.table.store.file.stats.StatsTestUtils;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.types.logical.RowType;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
-import static org.apache.flink.table.store.file.stats.StatsTestUtils.newEmptyTableStats;
-
-/** Test {@link FileStore}. */
-public class TestFileStore implements FileStore<KeyValue> {
-
- public final Set<ManifestCommittable> committed = new HashSet<>();
-
- public final Map<BinaryRowData, Map<Integer, List<String>>> committedFiles = new HashMap<>();
-
- public final boolean hasPk;
- private final RowType partitionType;
-
- public boolean expired = false;
-
- public TestFileStore(boolean hasPk, RowType partitionType) {
- this.hasPk = hasPk;
- this.partitionType = partitionType;
- }
-
- @Override
- public FileStoreWrite<KeyValue> newWrite() {
- return new FileStoreWrite<KeyValue>() {
- @Override
- public RecordWriter<KeyValue> createWriter(
- BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
- TestRecordWriter writer = new TestRecordWriter(hasPk);
- writer.records.addAll(
- committedFiles
- .computeIfAbsent(partition, k -> new HashMap<>())
- .computeIfAbsent(bucket, k -> new ArrayList<>()));
- committedFiles.get(partition).remove(bucket);
- return writer;
- }
-
- @Override
- public RecordWriter<KeyValue> createEmptyWriter(
- BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
- return new TestRecordWriter(hasPk);
- }
-
- @Override
- public Callable<CompactResult> createCompactWriter(
- BinaryRowData partition,
- int bucket,
- @Nullable List<DataFileMeta> compactFiles) {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public FileStoreRead<KeyValue> newRead() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public FileStoreCommit newCommit() {
- return new TestCommit();
- }
-
- @Override
- public FileStoreExpire newExpire() {
- return new FileStoreExpire() {
- @Override
- public FileStoreExpire withLock(Lock lock) {
- return this;
- }
-
- @Override
- public void expire() {
- expired = true;
- }
- };
- }
-
- @Override
- public RowType partitionType() {
- return partitionType;
- }
-
- @Override
- public FileStoreScan newScan() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public SnapshotManager snapshotManager() {
- throw new UnsupportedOperationException();
- }
-
- static class TestRecordWriter implements RecordWriter<KeyValue> {
-
- final List<String> records = new ArrayList<>();
- final boolean hasPk;
-
- boolean closed = false;
-
- TestRecordWriter(boolean hasPk) {
- this.hasPk = hasPk;
- }
-
- private String rowToString(RowData row, boolean key) {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < row.getArity(); i++) {
- if (i != 0) {
- builder.append("/");
- }
- if (key) {
- builder.append(row.getInt(i));
- } else {
- if (i < row.getArity() - 1) {
- builder.append(row.getInt(i));
- } else {
- builder.append(hasPk ? row.getInt(i) : row.getLong(i));
- }
- }
- }
- return builder.toString();
- }
-
- @Override
- public void write(KeyValue kv) {
- if (!hasPk) {
- assert kv.value().getArity() == 1;
- assert kv.value().getLong(0) >= -1L;
- }
- records.add(
- kv.valueKind().toString()
- + "-key-"
- + rowToString(kv.key(), true)
- + "-value-"
- + rowToString(kv.value(), false));
- }
-
- @Override
- public Increment prepareCommit() {
- List<DataFileMeta> newFiles =
- records.stream()
- .map(
- s ->
- new DataFileMeta(
- s,
- 0,
- 0,
- null,
- null,
- StatsTestUtils.newEmptyTableStats(),
- newEmptyTableStats(3),
- 0,
- 0,
- 0,
- 0))
- .collect(Collectors.toList());
- return new Increment(newFiles, Collections.emptyList(), Collections.emptyList());
- }
-
- @Override
- public void sync() {}
-
- @Override
- public List<DataFileMeta> close() {
- closed = true;
- return Collections.emptyList();
- }
- }
-
- class TestCommit implements FileStoreCommit {
-
- Lock lock;
-
- @Override
- public FileStoreCommit withLock(Lock lock) {
- this.lock = lock;
- return this;
- }
-
- @Override
- public List<ManifestCommittable> filterCommitted(
- List<ManifestCommittable> committableList) {
- return committableList.stream()
- .filter(c -> !committed.contains(c))
- .collect(Collectors.toList());
- }
-
- @Override
- public void commit(ManifestCommittable committable, Map<String, String> properties) {
- try {
- lock.runWithLock(() -> committed.add(committable));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- committable
- .newFiles()
- .forEach(
- (part, bMap) ->
- bMap.forEach(
- (bucket, files) -> {
- List<String> committed =
- committedFiles
- .computeIfAbsent(
- part, k -> new HashMap<>())
- .computeIfAbsent(
- bucket,
- k -> new ArrayList<>());
- files.stream()
- .map(DataFileMeta::fileName)
- .forEach(committed::add);
- }));
- }
-
- @Override
- public void overwrite(
- Map<String, String> partition,
- ManifestCommittable committable,
- Map<String, String> properties) {
- if (partition.isEmpty()) {
- committedFiles.clear();
- } else {
- BinaryRowData partRow = row(Integer.parseInt(partition.get("part")));
- committedFiles.remove(partRow);
- }
- commit(committable, properties);
- }
- }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
deleted file mode 100644
index 52b7febd..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.AbstractTableWrite;
-import org.apache.flink.table.store.table.sink.SinkRecord;
-import org.apache.flink.table.store.table.sink.SinkRecordConverter;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableCompact;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
-import org.apache.flink.types.RowKind;
-
-/** {@link FileStoreTable} for tests. */
-public class TestFileStoreTable implements FileStoreTable {
-
- private final Path path;
- private final TestFileStore store;
- private final TableSchema tableSchema;
-
- public TestFileStoreTable(Path path, TestFileStore store, TableSchema tableSchema) {
- this.path = path;
- this.store = store;
- this.tableSchema = tableSchema;
- }
-
- @Override
- public Path location() {
- return path;
- }
-
- @Override
- public TableSchema schema() {
- return tableSchema;
- }
-
- @Override
- public SnapshotManager snapshotManager() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TableScan newScan() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TableRead newRead() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TableWrite newWrite() {
- return new AbstractTableWrite<KeyValue>(
- store.newWrite(), new SinkRecordConverter(2, tableSchema)) {
- @Override
- protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
- throws Exception {
- boolean isInsert =
- record.row().getRowKind() == RowKind.INSERT
- || record.row().getRowKind() == RowKind.UPDATE_AFTER;
- KeyValue kv = new KeyValue();
- if (store.hasPk) {
- kv.replace(
- record.primaryKey(),
- isInsert ? RowKind.INSERT : RowKind.DELETE,
- record.row());
- } else {
- kv.replace(
- record.row(), RowKind.INSERT, GenericRowData.of(isInsert ? 1L : -1L));
- }
- writer.write(kv);
- }
- };
- }
-
- @Override
- public TableCommit newCommit() {
- return new TableCommit(store.newCommit(), store.newExpire());
- }
-
- @Override
- public TableCompact newCompact() {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
deleted file mode 100644
index 17cbba1f..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import org.apache.flink.streaming.runtime.operators.sink.TestSink.StringCommittableSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.function.SerializableSupplier;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link GlobalCommitterOperator}. */
-public class GlobalCommitterOperatorTest {
-
- @Test
- public void closeCommitter() throws Exception {
- final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
- final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness =
- createTestHarness(globalCommitter);
- testHarness.initializeEmptyState();
- testHarness.open();
- testHarness.close();
- assertThat(globalCommitter.isClosed()).isTrue();
- }
-
- @Test
- public void restoredFromMergedState() throws Exception {
-
- final List<String> input1 = Arrays.asList("host", "drop");
- final OperatorSubtaskState operatorSubtaskState1 =
- buildSubtaskState(createTestHarness(), input1);
-
- final List<String> input2 = Arrays.asList("future", "evil", "how");
- final OperatorSubtaskState operatorSubtaskState2 =
- buildSubtaskState(createTestHarness(), input2);
-
- final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
- final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness =
- createTestHarness(globalCommitter);
-
- final OperatorSubtaskState mergedOperatorSubtaskState =
- OneInputStreamOperatorTestHarness.repackageState(
- operatorSubtaskState1, operatorSubtaskState2);
-
- testHarness.initializeState(
- OneInputStreamOperatorTestHarness.repartitionOperatorState(
- mergedOperatorSubtaskState, 2, 2, 1, 0));
- testHarness.open();
-
- final List<String> expectedOutput = new ArrayList<>();
- expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input1));
- expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
-
- testHarness.snapshot(1L, 1L);
- testHarness.notifyOfCompletedCheckpoint(1L);
- testHarness.close();
-
- assertThat(globalCommitter.getCommittedData())
- .containsExactlyInAnyOrder(expectedOutput.toArray(new String[0]));
- }
-
- @Test
- public void commitMultipleStagesTogether() throws Exception {
-
- final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
-
- final List<String> input1 = Arrays.asList("cautious", "nature");
- final List<String> input2 = Arrays.asList("count", "over");
- final List<String> input3 = Arrays.asList("lawyer", "grammar");
-
- final List<String> expectedOutput = new ArrayList<>();
-
- expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input1));
- expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
- expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input3));
-
- final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness =
- createTestHarness(globalCommitter);
- testHarness.initializeEmptyState();
- testHarness.open();
-
- testHarness.processElements(committableRecords(input1));
- testHarness.snapshot(1L, 1L);
- testHarness.processElements(committableRecords(input2));
- testHarness.snapshot(2L, 2L);
- testHarness.processElements(committableRecords(input3));
- testHarness.snapshot(3L, 3L);
-
- testHarness.notifyOfCompletedCheckpoint(3L);
-
- testHarness.close();
-
- assertThat(globalCommitter.getCommittedData())
- .containsExactlyInAnyOrder(expectedOutput.toArray(new String[0]));
- }
-
- @Test
- public void filterRecoveredCommittables() throws Exception {
- final List<String> input = Arrays.asList("silent", "elder", "patience");
- final String successCommittedCommittable = DefaultGlobalCommitter.COMBINER.apply(input);
-
- final OperatorSubtaskState operatorSubtaskState =
- buildSubtaskState(createTestHarness(), input);
- final DefaultGlobalCommitter globalCommitter =
- new DefaultGlobalCommitter(successCommittedCommittable);
-
- final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness =
- createTestHarness(globalCommitter);
-
- // all data from previous checkpoint are expected to be committed,
- // so we expect no data to be re-committed.
- testHarness.initializeState(operatorSubtaskState);
- testHarness.open();
- testHarness.snapshot(1L, 1L);
- testHarness.notifyOfCompletedCheckpoint(1L);
- assertThat(globalCommitter.getCommittedData()).isEmpty();
- testHarness.close();
- }
-
- @Test
- public void endOfInput() throws Exception {
- final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
-
- final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness =
- createTestHarness(globalCommitter);
- testHarness.initializeEmptyState();
- testHarness.open();
- List<String> input = Arrays.asList("silent", "elder", "patience");
- testHarness.processElements(committableRecords(input));
- testHarness.endInput();
- testHarness.close();
- assertThat(globalCommitter.getCommittedData()).contains("elder+patience+silent");
- }
-
- private OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness()
- throws Exception {
- return createTestHarness(new DefaultGlobalCommitter());
- }
-
- private OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness(
- GlobalCommitter<String, String> globalCommitter) throws Exception {
- return new OneInputStreamOperatorTestHarness<>(
- new GlobalCommitterOperator<>(
- () -> globalCommitter, () -> StringCommittableSerializer.INSTANCE),
- CommittableMessageTypeInfo.of(
- (SerializableSupplier<SimpleVersionedSerializer<String>>)
- () -> StringCommittableSerializer.INSTANCE)
- .createSerializer(new ExecutionConfig()));
- }
-
- public static OperatorSubtaskState buildSubtaskState(
- OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness,
- List<String> input)
- throws Exception {
- testHarness.initializeEmptyState();
- testHarness.open();
- testHarness.processElements(
- input.stream()
- .map(GlobalCommitterOperatorTest::toCommittableMessage)
- .map(StreamRecord::new)
- .collect(Collectors.toList()));
- testHarness.prepareSnapshotPreBarrier(1L);
- OperatorSubtaskState operatorSubtaskState = testHarness.snapshot(1L, 1L);
- testHarness.close();
- return operatorSubtaskState;
- }
-
- static List<StreamRecord<CommittableMessage<String>>> committableRecords(
- Collection<String> elements) {
- return elements.stream()
- .map(GlobalCommitterOperatorTest::toCommittableMessage)
- .map(StreamRecord::new)
- .collect(Collectors.toList());
- }
-
- private static CommittableMessage<String> toCommittableMessage(String input) {
- return new CommittableWithLineage<>(input, null, -1);
- }
-
- /** A {@link GlobalCommitter} that always commits global committables successfully. */
- private static class DefaultGlobalCommitter implements GlobalCommitter<String, String> {
-
- private static final Function<List<String>, String> COMBINER =
- strings -> {
- // we sort here because we want to have a deterministic result during the unit
- // test
- Collections.sort(strings);
- return String.join("+", strings);
- };
-
- private final Queue<String> committedData;
-
- private boolean isClosed;
-
- private final String committedSuccessData;
-
- DefaultGlobalCommitter() {
- this("");
- }
-
- DefaultGlobalCommitter(String committedSuccessData) {
- this.committedData = new ConcurrentLinkedQueue<>();
- this.isClosed = false;
- this.committedSuccessData = committedSuccessData;
- }
-
- @Override
- public List<String> filterRecoveredCommittables(List<String> globalCommittables) {
- if (committedSuccessData == null) {
- return globalCommittables;
- }
- return globalCommittables.stream()
- .filter(s -> !s.equals(committedSuccessData))
- .collect(Collectors.toList());
- }
-
- @Override
- public String combine(long checkpointId, List<String> committables) {
- return COMBINER.apply(committables);
- }
-
- @Override
- public void commit(List<String> committables) {
- committedData.addAll(committables);
- }
-
- public List<String> getCommittedData() {
- if (committedData != null) {
- return new ArrayList<>(committedData);
- } else {
- return Collections.emptyList();
- }
- }
-
- @Override
- public void close() {
- isClosed = true;
- }
-
- public boolean isClosed() {
- return isClosed;
- }
- }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
deleted file mode 100644
index 528f5dc5..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import org.apache.flink.streaming.runtime.operators.sink.TestSink.StringCommittableSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.store.connector.sink.global.GlobalCommitterOperatorTest.buildSubtaskState;
-import static org.apache.flink.table.store.connector.sink.global.GlobalCommitterOperatorTest.committableRecords;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertNotNull;
-
-/** Test the {@link LocalCommitterOperator}. */
-public class LocalCommitterOperatorTest {
-
- @Test
- public void supportRetry() throws Exception {
- final List<String> input = Arrays.asList("lazy", "leaf");
- final RetryOnceCommitter committer = new RetryOnceCommitter();
- final OneInputStreamOperatorTestHarness<
- CommittableMessage<String>, CommittableMessage<String>>
- testHarness = createTestHarness(committer);
-
- testHarness.initializeEmptyState();
- testHarness.open();
- testHarness.processElements(committableRecords(input));
- testHarness.prepareSnapshotPreBarrier(1);
- testHarness.snapshot(1L, 1L);
- testHarness.notifyOfCompletedCheckpoint(1L);
- testHarness.snapshot(2L, 2L);
- testHarness.notifyOfCompletedCheckpoint(2L);
-
- testHarness.close();
-
- assertThat(committer.getCommittedData()).contains("lazy", "leaf");
- }
-
- @Test
- public void closeCommitter() throws Exception {
- final DefaultCommitter committer = new DefaultCommitter();
- final OneInputStreamOperatorTestHarness<
- CommittableMessage<String>, CommittableMessage<String>>
- testHarness = createTestHarness(committer);
- testHarness.initializeEmptyState();
- testHarness.open();
- testHarness.close();
- assertThat(committer.isClosed()).isTrue();
- }
-
- @Test
- public void restoredFromMergedState() throws Exception {
- final List<String> input1 = Arrays.asList("today", "whom");
- final OperatorSubtaskState operatorSubtaskState1 =
- buildSubtaskState(createTestHarness(), input1);
-
- final List<String> input2 = Arrays.asList("future", "evil", "how");
- final OperatorSubtaskState operatorSubtaskState2 =
- buildSubtaskState(createTestHarness(), input2);
-
- final DefaultCommitter committer = new DefaultCommitter();
- final OneInputStreamOperatorTestHarness<
- CommittableMessage<String>, CommittableMessage<String>>
- testHarness = createTestHarness(committer);
-
- final OperatorSubtaskState mergedOperatorSubtaskState =
- OneInputStreamOperatorTestHarness.repackageState(
- operatorSubtaskState1, operatorSubtaskState2);
-
- testHarness.initializeState(
- OneInputStreamOperatorTestHarness.repartitionOperatorState(
- mergedOperatorSubtaskState, 2, 2, 1, 0));
- testHarness.open();
-
- final List<String> expectedOutput = new ArrayList<>();
- expectedOutput.addAll(input1);
- expectedOutput.addAll(input2);
-
- testHarness.prepareSnapshotPreBarrier(1L);
- testHarness.snapshot(1L, 1L);
- testHarness.notifyOfCompletedCheckpoint(1);
-
- testHarness.close();
-
- assertThat(committer.getCommittedData())
- .containsExactlyInAnyOrder(expectedOutput.toArray(new String[0]));
- }
-
- @Test
- public void commitMultipleStagesTogether() throws Exception {
-
- final DefaultCommitter committer = new DefaultCommitter();
-
- final List<String> input1 = Arrays.asList("cautious", "nature");
- final List<String> input2 = Arrays.asList("count", "over");
- final List<String> input3 = Arrays.asList("lawyer", "grammar");
-
- final List<String> expectedOutput = new ArrayList<>();
-
- expectedOutput.addAll(input1);
- expectedOutput.addAll(input2);
- expectedOutput.addAll(input3);
-
- final OneInputStreamOperatorTestHarness<
- CommittableMessage<String>, CommittableMessage<String>>
- testHarness = createTestHarness(committer);
- testHarness.initializeEmptyState();
- testHarness.open();
-
- testHarness.processElements(committableRecords(input1));
- testHarness.prepareSnapshotPreBarrier(1L);
- testHarness.snapshot(1L, 1L);
- testHarness.processElements(committableRecords(input2));
- testHarness.prepareSnapshotPreBarrier(2L);
- testHarness.snapshot(2L, 2L);
- testHarness.processElements(committableRecords(input3));
- testHarness.prepareSnapshotPreBarrier(3L);
- testHarness.snapshot(3L, 3L);
-
- testHarness.notifyOfCompletedCheckpoint(1);
- testHarness.notifyOfCompletedCheckpoint(3);
-
- testHarness.close();
-
- assertThat(fromRecords(testHarness.getRecordOutput())).isEqualTo(expectedOutput);
-
- assertThat(committer.getCommittedData()).isEqualTo(expectedOutput);
- }
-
- private static List<String> fromRecords(
- Collection<StreamRecord<CommittableMessage<String>>> elements) {
- return elements.stream()
- .map(StreamRecord::getValue)
- .filter(message -> message instanceof CommittableWithLineage)
- .map(message -> ((CommittableWithLineage<String>) message).getCommittable())
- .collect(Collectors.toList());
- }
-
- private OneInputStreamOperatorTestHarness<
- CommittableMessage<String>, CommittableMessage<String>>
- createTestHarness() throws Exception {
- return createTestHarness(new DefaultCommitter());
- }
-
- private OneInputStreamOperatorTestHarness<
- CommittableMessage<String>, CommittableMessage<String>>
- createTestHarness(Committer<String> committer) throws Exception {
- return new OneInputStreamOperatorTestHarness<>(
- new LocalCommitterOperator<>(
- () -> committer, () -> StringCommittableSerializer.INSTANCE));
- }
-
- /** Base class for testing {@link Committer}. */
- private static class DefaultCommitter implements Committer<String>, Serializable {
-
- @Nullable protected Queue<String> committedData;
-
- private boolean isClosed;
-
- @Nullable private final Supplier<Queue<String>> queueSupplier;
-
- public DefaultCommitter() {
- this.committedData = new ConcurrentLinkedQueue<>();
- this.isClosed = false;
- this.queueSupplier = null;
- }
-
- public List<String> getCommittedData() {
- if (committedData != null) {
- return new ArrayList<>(committedData);
- } else {
- return Collections.emptyList();
- }
- }
-
- @Override
- public void close() {
- isClosed = true;
- }
-
- public boolean isClosed() {
- return isClosed;
- }
-
- @Override
- public void commit(Collection<CommitRequest<String>> requests) {
- if (committedData == null) {
- assertNotNull(queueSupplier);
- committedData = queueSupplier.get();
- }
- committedData.addAll(
- requests.stream()
- .map(CommitRequest::getCommittable)
- .collect(Collectors.toList()));
- }
- }
-
- /** A {@link Committer} that always re-commits the committables data it received. */
- private static class RetryOnceCommitter extends DefaultCommitter implements Committer<String> {
-
- private final Set<String> seen = new LinkedHashSet<>();
-
- @Override
- public void commit(Collection<CommitRequest<String>> requests) {
- requests.forEach(
- c -> {
- if (seen.remove(c.getCommittable())) {
- checkNotNull(committedData);
- committedData.add(c.getCommittable());
- } else {
- seen.add(c.getCommittable());
- c.retryLater();
- }
- });
- }
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java
deleted file mode 100644
index a5983a93..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.log;
-
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.util.UserCodeClassLoader;
-
-import java.util.Optional;
-import java.util.OptionalLong;
-import java.util.function.Consumer;
-
-/** A {@link InitContext} with {@link InitContext#metadataConsumer()}. */
-public class LogInitContext implements InitContext {
-
- private final InitContext context;
- private final Consumer<?> metaConsumer;
-
- public LogInitContext(InitContext context, Consumer<?> metaConsumer) {
- this.context = context;
- this.metaConsumer = metaConsumer;
- }
-
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- return context.getUserCodeClassLoader();
- }
-
- @Override
- public MailboxExecutor getMailboxExecutor() {
- return context.getMailboxExecutor();
- }
-
- @Override
- public ProcessingTimeService getProcessingTimeService() {
- return context.getProcessingTimeService();
- }
-
- @Override
- public int getSubtaskId() {
- return context.getSubtaskId();
- }
-
- @Override
- public int getNumberOfParallelSubtasks() {
- return context.getNumberOfParallelSubtasks();
- }
-
- @Override
- public SinkWriterMetricGroup metricGroup() {
- return context.metricGroup();
- }
-
- @Override
- public OptionalLong getRestoredCheckpointId() {
- return context.getRestoredCheckpointId();
- }
-
- @Override
- public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
- return context.asSerializationSchemaInitializationContext();
- }
-
- @Override
- public Optional<Consumer<?>> metadataConsumer() {
- return Optional.of(metaConsumer);
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
index effa38e8..319324f1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
@@ -18,35 +18,13 @@
package org.apache.flink.table.store.log;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
import java.io.Serializable;
-import java.util.function.Consumer;
/** A {@link Serializable} sink provider for log store. */
public interface LogSinkProvider extends Serializable {
- /** Creates a {@link Sink} instance. */
- Sink<SinkRecord> createSink();
-
- /**
- * Create a metadata consumer for {@link Sink.InitContext#metadataConsumer()} from {@link
- * WriteCallback}.
- */
- Consumer<?> createMetadataConsumer(WriteCallback callback);
-
- /**
- * A callback interface that the user can implement to know the offset of the bucket when the
- * request is complete.
- */
- interface WriteCallback {
-
- /**
- * A callback method the user can implement to provide asynchronous handling of request
- * completion. This method will be called when the record sent to the server has been
- * acknowledged.
- */
- void onCompletion(int bucket, long offset);
- }
+ /** Creates a {@link LogSinkFunction} instance. */
+ LogSinkFunction createSink();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 22c4e6c7..7032ea06 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -63,19 +63,15 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
* context information.
*/
LogSourceProvider createSourceProvider(
- Context context, SourceContext sourceContext, @Nullable int[][] projectFields);
+ Context context,
+ DynamicTableSource.Context sourceContext,
+ @Nullable int[][] projectFields);
/**
* Creates a {@link LogSinkProvider} instance from a {@link CatalogTable} and additional context
* information.
*/
- LogSinkProvider createSinkProvider(Context context, SinkContext sinkContext);
-
- /** Context for create runtime source. */
- interface SourceContext extends DynamicTableSource.Context {}
-
- /** Context for create runtime sink. */
- interface SinkContext extends DynamicTableSink.Context {}
+ LogSinkProvider createSinkProvider(Context context, DynamicTableSink.Context sinkContext);
// --------------------------------------------------------------------------------------------
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
index d6c849ea..972d2dc8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.log;
-import org.apache.flink.table.store.log.LogSinkProvider.WriteCallback;
+import org.apache.flink.table.store.table.sink.LogSinkFunction.WriteCallback;
import java.util.HashMap;
import java.util.Map;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index 104ae0bb..e94c38ff 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -29,9 +29,17 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+
/** Factory to create {@link FileStoreTable}. */
public class FileStoreTableFactory {
+ public static FileStoreTable create(Path path) {
+ Configuration conf = new Configuration();
+ conf.set(PATH, path.toString());
+ return create(conf);
+ }
+
public static FileStoreTable create(Configuration conf) {
return create(conf, UUID.randomUUID().toString());
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
similarity index 67%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
index effa38e8..82d28146 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
@@ -16,25 +16,14 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.log;
+package org.apache.flink.table.store.table.sink;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import java.io.Serializable;
-import java.util.function.Consumer;
+/** Log {@link SinkFunction} with {@link WriteCallback}. */
+public interface LogSinkFunction extends SinkFunction<SinkRecord> {
-/** A {@link Serializable} sink provider for log store. */
-public interface LogSinkProvider extends Serializable {
-
- /** Creates a {@link Sink} instance. */
- Sink<SinkRecord> createSink();
-
- /**
- * Create a metadata consumer for {@link Sink.InitContext#metadataConsumer()} from {@link
- * WriteCallback}.
- */
- Consumer<?> createMetadataConsumer(WriteCallback callback);
+ void setWriteCallback(WriteCallback writeCallback);
/**
* A callback interface that the user can implement to know the offset of the bucket when the
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
index ccc42576..bf0aa4c5 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -30,7 +31,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
/** A {@link KafkaRecordSerializationSchema} for the table in log store. */
-public class KafkaLogSerializationSchema implements KafkaRecordSerializationSchema<SinkRecord> {
+public class KafkaLogSerializationSchema implements KafkaSerializationSchema<SinkRecord> {
private static final long serialVersionUID = 1L;
@@ -55,9 +56,7 @@ public class KafkaLogSerializationSchema implements KafkaRecordSerializationSche
}
@Override
- public void open(
- SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
- throws Exception {
+ public void open(SerializationSchema.InitializationContext context) throws Exception {
if (primaryKeySerializer != null) {
primaryKeySerializer.open(context);
}
@@ -65,8 +64,7 @@ public class KafkaLogSerializationSchema implements KafkaRecordSerializationSche
}
@Override
- public ProducerRecord<byte[], byte[]> serialize(
- SinkRecord element, KafkaSinkContext context, Long timestamp) {
+ public ProducerRecord<byte[], byte[]> serialize(SinkRecord element, @Nullable Long timestamp) {
RowKind kind = element.row().getRowKind();
byte[] primaryKeyBytes = null;
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
index 1f1b79b8..7365e44d 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -20,22 +20,18 @@ package org.apache.flink.table.store.kafka;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.connector.base.DeliveryGuarantee;
-import org.apache.flink.connector.kafka.sink.KafkaSink;
-import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
import org.apache.flink.table.store.log.LogOptions.LogConsistency;
import org.apache.flink.table.store.log.LogSinkProvider;
-import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.RecordMetadata;
import javax.annotation.Nullable;
import java.util.Properties;
-import java.util.function.Consumer;
/** A Kafka {@link LogSinkProvider}. */
public class KafkaLogSinkProvider implements LogSinkProvider {
@@ -70,32 +66,24 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
}
@Override
- public KafkaSink<SinkRecord> createSink() {
- KafkaSinkBuilder<SinkRecord> builder = KafkaSink.builder();
+ public LogSinkFunction createSink() {
+ Semantic semantic;
switch (consistency) {
case TRANSACTIONAL:
- builder.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
- .setTransactionalIdPrefix("log-store-" + topic);
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "log-store-" + topic);
+ semantic = Semantic.EXACTLY_ONCE;
break;
case EVENTUAL:
if (primaryKeySerializer == null) {
throw new IllegalArgumentException(
"Can not use EVENTUAL consistency mode for non-pk table.");
}
- builder.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE);
+ semantic = Semantic.AT_LEAST_ONCE;
break;
+ default:
+ throw new IllegalArgumentException("Unsupported: " + consistency);
}
-
- return builder.setBootstrapServers(
- properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString())
- .setKafkaProducerConfig(properties)
- .setRecordSerializer(createSerializationSchema())
- .build();
- }
-
- @Override
- public Consumer<RecordMetadata> createMetadataConsumer(WriteCallback callback) {
- return meta -> callback.onCompletion(meta.partition(), meta.offset());
+ return new KafkaSinkFunction(topic, createSerializationSchema(), properties, semantic);
}
@VisibleForTesting
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 9828daf8..b247f575 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -25,6 +25,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
@@ -44,6 +47,7 @@ import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -182,13 +186,13 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
@Override
public KafkaLogSourceProvider createSourceProvider(
DynamicTableFactory.Context context,
- SourceContext sourceContext,
+ DynamicTableSource.Context sourceContext,
@Nullable int[][] projectFields) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
DataType physicalType = schema.toPhysicalRowDataType();
DeserializationSchema<RowData> primaryKeyDeserializer = null;
- int[] primaryKey = schema.getPrimaryKeyIndexes();
+ int[] primaryKey = getPrimaryKeyIndexes(schema);
if (primaryKey.length > 0) {
DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey);
primaryKeyDeserializer =
@@ -213,12 +217,12 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
@Override
public KafkaLogSinkProvider createSinkProvider(
- DynamicTableFactory.Context context, SinkContext sinkContext) {
+ DynamicTableFactory.Context context, DynamicTableSink.Context sinkContext) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
DataType physicalType = schema.toPhysicalRowDataType();
SerializationSchema<RowData> primaryKeySerializer = null;
- int[] primaryKey = schema.getPrimaryKeyIndexes();
+ int[] primaryKey = getPrimaryKeyIndexes(schema);
if (primaryKey.length > 0) {
DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey);
primaryKeySerializer =
@@ -237,6 +241,14 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
helper.getOptions().get(CHANGELOG_MODE));
}
+ private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
+ final List<String> columns = schema.getColumnNames();
+ return schema.getPrimaryKey()
+ .map(UniqueConstraint::getColumns)
+ .map(pkColumns -> pkColumns.stream().mapToInt(columns::indexOf).toArray())
+ .orElseGet(() -> new int[] {});
+ }
+
public static Properties toKafkaProperties(ReadableConfig options) {
Properties properties = new Properties();
Map<String, String> optionMap = ((Configuration) options).toMap();
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java
new file mode 100644
index 00000000..587a1813
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
+import org.apache.flink.table.store.table.sink.LogSinkFunction.WriteCallback;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A {@link FlinkKafkaProducer} which implements {@link LogSinkFunction} to register {@link
+ * WriteCallback}.
+ */
+public class KafkaSinkFunction extends FlinkKafkaProducer<SinkRecord> implements LogSinkFunction {
+
+ private WriteCallback writeCallback;
+
+ /**
+ * Creates a {@link KafkaSinkFunction} for a given topic. The sink produces its input to the
+ * topic. It accepts a {@link KafkaSerializationSchema} for serializing records to a {@link
+ * ProducerRecord}, including partitioning information.
+ *
+ * @param defaultTopic The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into
+ * a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * KafkaSinkFunction.Semantic}).
+ */
+ public KafkaSinkFunction(
+ String defaultTopic,
+ KafkaSerializationSchema<SinkRecord> serializationSchema,
+ Properties producerConfig,
+ KafkaSinkFunction.Semantic semantic) {
+ super(defaultTopic, serializationSchema, producerConfig, semantic);
+ }
+
+ public void setWriteCallback(WriteCallback writeCallback) {
+ this.writeCallback = writeCallback;
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ super.open(configuration);
+ Callback baseCallback = requireNonNull(callback);
+ callback =
+ (metadata, exception) -> {
+ if (writeCallback != null) {
+ writeCallback.onCompletion(metadata.partition(), metadata.offset());
+ }
+ baseCallback.onCompletion(metadata, exception);
+ };
+ }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index bb8a788f..30c66cfa 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -34,7 +34,6 @@ import org.junit.jupiter.api.Assertions;
import java.util.Comparator;
import java.util.List;
-import java.util.UUID;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;
import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
@@ -146,17 +145,16 @@ public class KafkaLogITCase extends KafkaTableTestBase {
// transactional need to commit
enableCheckpoint();
- // 1.1 sink
- String uuid = UUID.randomUUID().toString();
+ // 1 sink
env.fromElements(
testRecord(true, 2, 1, 2, RowKind.DELETE),
testRecord(true, 1, 3, 4, RowKind.INSERT),
testRecord(true, 0, 5, 6, RowKind.INSERT),
testRecord(true, 0, 7, 8, RowKind.INSERT))
- .sinkTo(new TestOffsetsLogSink<>(sinkProvider, uuid));
+ .addSink(sinkProvider.createSink());
env.execute();
- // 1.2 read
+ // 2 read
List<RowData> records =
collect(
factory.createSourceProvider(context, SOURCE_CONTEXT, null)
@@ -174,7 +172,7 @@ public class KafkaLogITCase extends KafkaTableTestBase {
assertRow(records.get(2), RowKind.INSERT, 5, 6);
assertRow(records.get(3), RowKind.INSERT, 7, 8);
- // 1.3 read with projection
+ // 3 read with projection
records =
collect(
factory.createSourceProvider(
@@ -192,24 +190,6 @@ public class KafkaLogITCase extends KafkaTableTestBase {
assertValue(records.get(1), RowKind.INSERT, 4);
assertValue(records.get(2), RowKind.INSERT, 6);
assertValue(records.get(3), RowKind.INSERT, 8);
-
- // 2.1 sink
- env.fromElements(
- testRecord(true, 0, 9, 10, RowKind.INSERT),
- testRecord(true, 1, 11, 12, RowKind.INSERT),
- testRecord(true, 2, 13, 14, RowKind.INSERT))
- .sinkTo(new TestOffsetsLogSink<>(sinkProvider, UUID.randomUUID().toString()));
- env.execute();
-
- // 2.2 read from offsets
- records =
- collect(
- factory.createSourceProvider(context, SOURCE_CONTEXT, null)
- .createSource(TestOffsetsLogSink.drainOffsets(uuid)),
- 3);
- assertRow(records.get(0), RowKind.INSERT, 9, 10);
- assertRow(records.get(1), RowKind.INSERT, 11, 12);
- assertRow(records.get(2), RowKind.INSERT, 13, 14);
} finally {
factory.onDropTable(context, true);
}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
index 37e7aeea..35ff9960 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
@@ -84,13 +84,13 @@ public class KafkaLogSerializationTest {
throws Exception {
KafkaLogSerializationSchema serializer =
createTestSerializationSchema(testContext("", mode, keyed));
- serializer.open(null, null);
+ serializer.open(null);
KafkaRecordDeserializationSchema<RowData> deserializer =
createTestDeserializationSchema(testContext("", mode, keyed));
deserializer.open(null);
SinkRecord input = testRecord(keyed, bucket, key, value, rowKind);
- ProducerRecord<byte[], byte[]> record = serializer.serialize(input, null, null);
+ ProducerRecord<byte[], byte[]> record = serializer.serialize(input, null);
assertThat(record.partition().intValue()).isEqualTo(bucket);
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index 4e920f04..ac225429 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -65,8 +65,8 @@ import static org.apache.flink.table.store.log.LogOptions.LogConsistency;
/** Utils for the test of {@link KafkaLogStoreFactory}. */
public class KafkaLogTestUtils {
- public static final LogStoreTableFactory.SourceContext SOURCE_CONTEXT =
- new LogStoreTableFactory.SourceContext() {
+ public static final DynamicTableSource.Context SOURCE_CONTEXT =
+ new DynamicTableSource.Context() {
@Override
public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
return createTypeInformation(
@@ -87,8 +87,8 @@ public class KafkaLogTestUtils {
}
};
- public static final LogStoreTableFactory.SinkContext SINK_CONTEXT =
- new LogStoreTableFactory.SinkContext() {
+ public static final DynamicTableSink.Context SINK_CONTEXT =
+ new DynamicTableSink.Context() {
@Override
public boolean isBounded() {
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
deleted file mode 100644
index fa4dbdd1..00000000
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.kafka;
-
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.table.store.log.LogInitContext;
-import org.apache.flink.table.store.log.LogSinkProvider;
-import org.apache.flink.table.store.table.sink.SinkRecord;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-
-/** Test kafka {@link Sink}. */
-public class TestOffsetsLogSink<
- WriterT extends
- StatefulSinkWriter<SinkRecord, WriterStateT>
- & PrecommittingSinkWriter<SinkRecord, CommT>,
- CommT,
- WriterStateT>
- implements StatefulSink<SinkRecord, WriterStateT>,
- TwoPhaseCommittingSink<SinkRecord, CommT> {
-
- private static final Map<String, Map<Integer, Long>> GLOBAL_OFFSETS = new ConcurrentHashMap<>();
-
- private final LogSinkProvider sinkProvider;
- private final String uuid;
- private final Sink<SinkRecord> sink;
-
- public TestOffsetsLogSink(LogSinkProvider sinkProvider, String uuid) {
- this.sinkProvider = sinkProvider;
- this.uuid = uuid;
- this.sink = sinkProvider.createSink();
- }
-
- public static Map<Integer, Long> drainOffsets(String uuid) {
- return GLOBAL_OFFSETS.remove(uuid);
- }
-
- @Override
- public WriterT createWriter(InitContext initContext) throws IOException {
- return (WriterT) sink.createWriter(wrapContext(initContext));
- }
-
- private InitContext wrapContext(InitContext initContext) {
- Consumer<?> consumer =
- sinkProvider.createMetadataConsumer(
- (bucket, offset) -> {
- Map<Integer, Long> offsets =
- GLOBAL_OFFSETS.computeIfAbsent(
- uuid, k -> new ConcurrentHashMap<>());
- long nextOffset = offset + 1;
- offsets.compute(
- bucket,
- (k, v) -> v == null ? nextOffset : Math.max(v, nextOffset));
- });
- return new LogInitContext(initContext, consumer);
- }
-
- @Override
- public StatefulSinkWriter<SinkRecord, WriterStateT> restoreWriter(
- InitContext initContext, Collection<WriterStateT> collection) throws IOException {
- return ((StatefulSink<SinkRecord, WriterStateT>) sink)
- .restoreWriter(wrapContext(initContext), collection);
- }
-
- @Override
- public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
- return ((StatefulSink<SinkRecord, WriterStateT>) sink).getWriterStateSerializer();
- }
-
- @Override
- public Committer<CommT> createCommitter() throws IOException {
- return ((TwoPhaseCommittingSink<SinkRecord, CommT>) sink).createCommitter();
- }
-
- @Override
- public SimpleVersionedSerializer<CommT> getCommittableSerializer() {
- return ((TwoPhaseCommittingSink<SinkRecord, CommT>) sink).getCommittableSerializer();
- }
-}