You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/11 12:22:43 UTC

[GitHub] [flink] lirui-apache commented on a change in pull request #12062: [FLINK-17587][filesystem] Filesystem streaming sink support commit success file

lirui-apache commented on a change in pull request #12062:
URL: https://github.com/apache/flink/pull/12062#discussion_r422836028



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
##########
@@ -467,7 +483,11 @@ public void onProcessingTime(long timestamp) throws Exception {
 
 	@Override
 	public void invoke(IN value, SinkFunction.Context context) throws Exception {
-		buckets.onElement(value, context);
+		buckets.onElement(

Review comment:
       Why do we need this change?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive) connector.
+ */
+public class FileSystemOptions {
+
+	public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_TYPE =
+			key("partition.time-extractor.type")
+					.stringType()
+					.defaultValue("default")
+					.withDescription("Time extractor to extract time from partition values. Only be" +
+							" used if order is set to partition-time. Support default and custom." +
+							" For default, can configure timestamp pattern." +
+							" For custom, should configure extractor class.");
+
+	public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_CLASS =
+			key("partition.time-extractor.class")
+					.stringType()
+					.noDefaultValue()
+					.withDescription("The extractor class for implement PartitionTimeExtractor interface.");
+
+	public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN =
+			key("partition.time-extractor.timestamp-pattern")
+					.stringType()
+					.noDefaultValue()
+					.withDescription("The 'default' construction way allows users to use partition" +
+							" fields to get a legal timestamp pattern." +
+							" Default support 'yyyy-mm-dd hh:mm:ss' from first field." +
+							" If timestamp in partition is single field 'dt', can configure: '$dt'." +
+							" If timestamp in partition is year, month, day, hour," +
+							" can configure: '$year-$month-$day $hour:00:00'." +
+							" If timestamp in partition is dt and hour, can configure: '$dt $hour:00:00'.");
+
+	public static final ConfigOption<Duration> PARTITION_TIME_INTERVAL =
+			key("partition.time-interval")
+					.durationType()
+					.noDefaultValue()
+					.withDescription("Interval time of partition," +
+							" if it is a day partition, should be '1 d'," +
+							" if it is a hour partition, should be '1 h'");
+
+	public static final ConfigOption<String> PARTITION_COMMIT_POLICY_TYPE =
+			key("partition.commit-policy.type")
+					.stringType()
+					.noDefaultValue()
+					.withDescription("");

Review comment:
       add descriptions

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionCommitManager.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.util.StringUtils;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_INTERVAL;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Manage partition and watermark information.
+ */
+public class PartitionCommitManager {
+
+	private static final ListStateDescriptor<Map<Long, Long>> WATERMARKS_STATE_DESC =
+			new ListStateDescriptor<>(
+					"checkpoint-id-to-watermark",
+					new MapSerializer<>(LongSerializer.INSTANCE, LongSerializer.INSTANCE));
+	private static final ListStateDescriptor<List<String>> PENDING_PARTITIONS_STATE_DESC =
+			new ListStateDescriptor<>(
+					"pending-partitions",
+					new ListSerializer<>(StringSerializer.INSTANCE));
+
+	private final ListState<Map<Long, Long>> watermarksState;
+	private final ListState<List<String>> pendingPartitionsState;
+	private final TreeMap<Long, Long> watermarks;
+	private final Set<String> pendingPartitions;
+	private final PartitionTimeExtractor extractor;
+	private final long timeIntervalMills;
+	private final List<String> partitionKeys;
+
+	public PartitionCommitManager(
+			boolean isRestored,
+			OperatorStateStore operatorStateStore,
+			ClassLoader userCodeClassLoader,
+			List<String> partitionKeys,
+			Configuration conf) throws Exception {
+		this.partitionKeys = partitionKeys;
+		String extractorKind = conf.get(PARTITION_TIME_EXTRACTOR_KIND);
+		String extractorClass = conf.get(PARTITION_TIME_EXTRACTOR_CLASS);
+		String extractorPattern = conf.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+		this.timeIntervalMills = conf.getOptional(PARTITION_TIME_INTERVAL)
+				.map(Duration::toMillis)
+				.orElse(Long.MAX_VALUE);
+		this.extractor = PartitionTimeExtractor.create(
+				userCodeClassLoader,
+				extractorKind,
+				extractorClass,
+				extractorPattern);
+
+		this.watermarksState = operatorStateStore.getListState(WATERMARKS_STATE_DESC);
+		this.pendingPartitionsState = operatorStateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
+
+		this.watermarks = new TreeMap<>();
+		this.pendingPartitions = new HashSet<>();
+		if (isRestored) {
+			watermarks.putAll(watermarksState.get().iterator().next());
+			pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
+		}
+	}
+
+	/**
+	 * Add a pending partition.
+	 */
+	public void addPartition(String partition) {
+		if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
+			this.pendingPartitions.add(partition);
+		}
+	}
+
+	/**
+	 * Trigger commit of pending partitions, and cleanup useless watermarks and partitions.
+	 */
+	public List<String> triggerCommit(long checkpointId) {

Review comment:
       I don't think this method really "triggers" the commit. Seems it just decides which partitions should be committed. So maybe rename to `getPartitionsToCommit`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_TYPE;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath;
+import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
+
+/**
+ * Committer for {@link StreamingFileWriter}. This is the single (non-parallel) task.
+ */
+public class StreamingFileCommitter extends AbstractStreamOperator<Void>
+		implements OneInputStreamOperator<StreamingFileCommitter.CommitMessage, Void> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Configuration conf;
+
+	private final List<String> partitionKeys;
+
+	private final TableMetaStoreFactory metaStoreFactory;
+
+	private transient PartitionCommitManager commitManager;
+
+	private transient TaskTracker taskTracker;
+
+	private transient long currentWatermark = Long.MIN_VALUE;
+
+	private transient List<PartitionCommitPolicy> policies;
+
+	public StreamingFileCommitter(
+			List<String> partitionKeys, TableMetaStoreFactory metaStoreFactory, Configuration conf) {
+		this.partitionKeys = partitionKeys;
+		this.metaStoreFactory = metaStoreFactory;
+		this.conf = conf;
+	}
+
+	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		super.initializeState(context);
+		this.commitManager = new PartitionCommitManager(
+				context.isRestored(),
+				context.getOperatorStateStore(),
+				getUserCodeClassloader(),
+				partitionKeys,
+				conf);
+		this.policies = PartitionCommitPolicy.createCommitChain(
+				conf.get(PARTITION_COMMIT_POLICY_TYPE),
+				conf.get(PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME));
+	}
+
+	@Override
+	public void processElement(StreamRecord<CommitMessage> element) throws Exception {
+		CommitMessage message = element.getValue();
+		for (String partition : message.partitions) {
+			commitManager.addPartition(partition);
+		}
+
+		if (taskTracker == null) {
+			taskTracker = new TaskTracker(message.numberOfTasks);
+		}
+		boolean needCommit = taskTracker.add(message.checkpointId, message.taskId);
+		if (needCommit) {
+			commitPartitions(commitManager.triggerCommit(message.checkpointId));
+		}
+	}
+
+	private void commitPartitions(List<String> partitions) throws Exception {
+		try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {
+			FileSystem fs = metaStore.getLocationPath().getFileSystem();
+			for (String partition : partitions) {
+				LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition));
+				Path path = new Path(metaStore.getLocationPath(), generatePartitionPath(partSpec));
+				for (PartitionCommitPolicy policy : policies) {
+					policy.commit(partSpec, path, fs, metaStore);
+				}
+			}
+		}
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		super.processWatermark(mark);
+		this.currentWatermark = mark.getTimestamp();
+	}
+
+	@Override
+	public void snapshotState(StateSnapshotContext context) throws Exception {
+		super.snapshotState(context);
+		commitManager.snapshotState(context.getCheckpointId(), currentWatermark);
+	}
+
+	/**
+	 * The message sent upstream.
+	 */
+	public static class CommitMessage implements Serializable {
+
+		public long checkpointId;
+		public int taskId;
+		public int numberOfTasks;
+		public List<String> partitions;

Review comment:
       Can we have some comments about which partitions should be in this list? My understanding is it should include partitions for which some files should be committed, right?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionCommitPolicy.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Policy for partition commit.
+ */
+public interface PartitionCommitPolicy extends Serializable {
+
+	/**
+	 * Commit partition by partitionSpec and path.
+	 */
+	void commit(
+			LinkedHashMap<String, String> partitionSpec,
+			Path partitionPath,
+			FileSystem fileSystem,
+			TableMetaStoreFactory.TableMetaStore metaStore) throws Exception;
+
+	static List<PartitionCommitPolicy> createCommitChain(String policy, String successFileName) {
+		if (policy == null) {
+			return Collections.emptyList();
+		}
+		String[] policyStrings = policy.split(",");
+		return Arrays.stream(policyStrings).map(name -> {
+			switch (name) {
+				case "metastore":

Review comment:
       add constants for them

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Operator for file system sink.
+ */
+public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
+		implements OneInputStreamOperator<RowData, CommitMessage>, ProcessingTimeCallback {
+
+	private static final long serialVersionUID = 1L;
+
+	// -------------------------- state descriptors ---------------------------
+
+	private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC =
+			new ListStateDescriptor<>("bucket-states", BytePrimitiveArraySerializer.INSTANCE);
+
+	private static final ListStateDescriptor<Long> MAX_PART_COUNTER_STATE_DESC =
+			new ListStateDescriptor<>("max-part-counter", LongSerializer.INSTANCE);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final long bucketCheckInterval;
+
+	private final StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
+			StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
+
+	private final FileSystemBucketListener listener;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private transient Buckets<RowData, ?> buckets;
+
+	private transient ProcessingTimeService processingTimeService;
+
+	private transient long currentWatermark = Long.MIN_VALUE;
+
+	private transient Set<String> inactivePartitions;
+
+	// --------------------------- State Related Fields -----------------------------
+
+	private transient ListState<byte[]> bucketStates;
+
+	private transient ListState<Long> maxPartCountersState;
+
+	public StreamingFileWriter(
+			long bucketCheckInterval,
+			StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
+					StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
+			FileSystemBucketListener listener) {
+		this.bucketCheckInterval = bucketCheckInterval;
+		this.bucketsBuilder = bucketsBuilder;
+		this.listener = listener;
+	}
+
+	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		super.initializeState(context);
+		final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+		this.buckets = bucketsBuilder.createBuckets(subtaskIndex);
+
+		final OperatorStateStore stateStore = context.getOperatorStateStore();
+		bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
+		maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+
+		if (context.isRestored()) {
+			buckets.initializeState(bucketStates, maxPartCountersState);
+		}
+		inactivePartitions = new HashSet<>();
+		listener.setInactiveConsumer(b -> inactivePartitions.add(b));
+	}
+
+	@Override
+	public void snapshotState(StateSnapshotContext context) throws Exception {
+		super.snapshotState(context);
+		Preconditions.checkState(bucketStates != null && maxPartCountersState != null, "sink has not been initialized");
+		buckets.snapshotState(
+				context.getCheckpointId(),
+				bucketStates,
+				maxPartCountersState);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		this.processingTimeService = getRuntimeContext().getProcessingTimeService();
+		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
+		processingTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
+	}
+
+	@Override
+	public void onProcessingTime(long timestamp) throws Exception {
+		final long currentTime = processingTimeService.getCurrentProcessingTime();
+		buckets.onProcessingTime(currentTime);
+		processingTimeService.registerTimer(currentTime + bucketCheckInterval, this);
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		super.processWatermark(mark);
+		this.currentWatermark = mark.getTimestamp();
+	}
+
+	@Override
+	public void processElement(StreamRecord<RowData> element) throws Exception {
+		buckets.onElement(
+				element.getValue(),
+				getProcessingTimeService().getCurrentProcessingTime(),
+				element.getTimestamp(),
+				currentWatermark);
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		super.notifyCheckpointComplete(checkpointId);
+		buckets.commitUpToCheckpoint(checkpointId);
+		CommitMessage message = new CommitMessage(
+				checkpointId,
+				getRuntimeContext().getIndexOfThisSubtask(),
+				getRuntimeContext().getNumberOfParallelSubtasks(),
+				new ArrayList<>(inactivePartitions));
+		output.collect(new StreamRecord<>(message));
+		inactivePartitions.clear();
+	}
+
+	@Override
+	public void close() throws Exception {

Review comment:
       Is it possible that there's some pending data between `close` and the last `notifyCheckpointComplete`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_TYPE;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath;
+import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
+
+/**
+ * Committer for {@link StreamingFileWriter}. This is the single (non-parallel) task.
+ */
+public class StreamingFileCommitter extends AbstractStreamOperator<Void>
+		implements OneInputStreamOperator<StreamingFileCommitter.CommitMessage, Void> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Configuration conf;
+
+	private final List<String> partitionKeys;
+
+	private final TableMetaStoreFactory metaStoreFactory;
+
+	private transient PartitionCommitManager commitManager;
+
+	private transient TaskTracker taskTracker;
+
+	private transient long currentWatermark = Long.MIN_VALUE;
+
+	private transient List<PartitionCommitPolicy> policies;
+
+	public StreamingFileCommitter(
+			List<String> partitionKeys, TableMetaStoreFactory metaStoreFactory, Configuration conf) {
+		this.partitionKeys = partitionKeys;
+		this.metaStoreFactory = metaStoreFactory;
+		this.conf = conf;
+	}
+
+	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		super.initializeState(context);
+		this.commitManager = new PartitionCommitManager(
+				context.isRestored(),
+				context.getOperatorStateStore(),
+				getUserCodeClassloader(),
+				partitionKeys,
+				conf);
+		this.policies = PartitionCommitPolicy.createCommitChain(
+				conf.get(PARTITION_COMMIT_POLICY_TYPE),
+				conf.get(PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME));
+	}
+
+	@Override
+	public void processElement(StreamRecord<CommitMessage> element) throws Exception {
+		CommitMessage message = element.getValue();
+		for (String partition : message.partitions) {
+			commitManager.addPartition(partition);
+		}
+
+		if (taskTracker == null) {
+			taskTracker = new TaskTracker(message.numberOfTasks);
+		}
+		boolean needCommit = taskTracker.add(message.checkpointId, message.taskId);
+		if (needCommit) {
+			commitPartitions(commitManager.triggerCommit(message.checkpointId));
+		}
+	}
+
+	private void commitPartitions(List<String> partitions) throws Exception {
+		try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {

Review comment:
       Does this invoke traffic to HMS? If so maybe we should only do it when `partitions` is not empty.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/SuccessFileCommitPolicy.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.util.LinkedHashMap;
+
+/**
+ * Partition commit policy to update metastore.

Review comment:
       incorrect java doc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org