You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/17 08:00:07 UTC

[GitHub] [flink] lirui-apache commented on a change in pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

lirui-apache commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426141927



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveRowDataPartitionComputer.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.filesystem.RowDataPartitionComputer;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+
+/**
+ * A {@link RowDataPartitionComputer} that converts Flink objects to Hive objects before computing the partition value strings.
+ */
+public class HiveRowDataPartitionComputer extends RowDataPartitionComputer {
+
+	private final DataFormatConverters.DataFormatConverter[] partitionConverters;
+	private final HiveObjectConversion[] partColConversions;

Review comment:
       We already have `partitionConverters`. I think we can just name this one as `hiveObjectConversions`.

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/write/HadoopPathBasedPartFileWriterTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing writing data to the hadoop file system with different configurations.
+ */
+public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
+	@Rule
+	public final Timeout timeoutPerTest = Timeout.seconds(2000);
+
+	@Test
+	public void testWriteFile() throws Exception {
+		File file = TEMPORARY_FOLDER.newFolder();
+		Path basePath = new Path(file.toURI());
+
+		List<String> data = Arrays.asList(
+			"first line",
+			"second line",
+			"third line");
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.enableCheckpointing(100);
+
+		DataStream<String> stream = env.addSource(
+			new FiniteTestSource<>(data), TypeInformation.of(String.class));
+		Configuration configuration = new Configuration();
+
+		HadoopPathBasedBulkFormatBuilder<String, String, ?> builder =
+			new HadoopPathBasedBulkFormatBuilder<>(
+				basePath,
+				new TestHadoopPathBasedBulkWriterFactory(),
+				configuration,
+				new DateTimeBucketAssigner<>());
+		TestStreamingFileSinkFactory<String> streamingFileSinkFactory = new TestStreamingFileSinkFactory<>();
+		stream.addSink(streamingFileSinkFactory.createSink(builder, 1000));
+
+		env.execute();
+		validateResult(data, configuration, basePath);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void validateResult(List<String> expected, Configuration config, Path basePath) throws IOException {
+		FileSystem fileSystem = FileSystem.get(basePath.toUri(), config);
+		FileStatus[] buckets = fileSystem.listStatus(basePath);
+		assertNotNull(buckets);
+		assertEquals(1, buckets.length);
+
+		FileStatus[] partFiles = fileSystem.listStatus(buckets[0].getPath());
+		assertNotNull(partFiles);
+		assertEquals(2, partFiles.length);

Review comment:
       Why will there be two files?

##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveRecordWriterFactory.java
##########
@@ -167,56 +151,65 @@ public HiveOutputFormat createOutputFormat(Path outPath) {
 				}
 			}
 
-			RecordWriter recordWriter = hiveShim.getHiveRecordWriter(
+			return hiveShim.getHiveRecordWriter(
 					conf,
 					hiveOutputFormatClz,
 					recordSerDe.getSerializedClass(),
 					isCompressed,
 					tableProperties,
-					HadoopFileSystem.toHadoopPath(outPath));
-			return new HiveOutputFormat(recordWriter);
+					path);
 		} catch (Exception e) {
 			throw new FlinkHiveException(e);
 		}
 	}
 
-	private class HiveOutputFormat implements org.apache.flink.api.common.io.OutputFormat<Row> {
-
-		private final RecordWriter recordWriter;
+	public JobConf getJobConf() {
+		return confWrapper.conf();
+	}
 
-		private HiveOutputFormat(RecordWriter recordWriter) {
-			this.recordWriter = recordWriter;
-		}
+	private void initialize() throws Exception {
+		JobConf jobConf = confWrapper.conf();
+		Object serdeLib = Class.forName(serDeInfo.getSerializationLib()).newInstance();
+		Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
+				"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+						+ serdeLib.getClass().getName());
+		this.recordSerDe = (Serializer) serdeLib;
+		ReflectionUtils.setConf(recordSerDe, jobConf);
 
-		// converts a Row to a list of Hive objects so that Hive can serialize it
-		private Object getConvertedRow(Row record) {
-			List<Object> res = new ArrayList<>(numNonPartitionColumns);
-			for (int i = 0; i < numNonPartitionColumns; i++) {
-				res.add(hiveConversions[i].toHiveObject(record.getField(i)));
-			}
-			return res;
-		}
+		// TODO: support partition properties, for now assume they're same as table properties
+		SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
 
-		@Override
-		public void configure(Configuration parameters) {
+		this.formatFields = allColumns.length - partitionColumns.length;
+		this.hiveConversions = new HiveObjectConversion[formatFields];
+		this.converters = new DataFormatConverter[formatFields];
+		List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
+		for (int i = 0; i < formatFields; i++) {
+			DataType type = allTypes[i];
+			ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type);
+			objectInspectors.add(objectInspector);
+			hiveConversions[i] = HiveInspectors.getConversion(
+					objectInspector, type.getLogicalType(), hiveShim);
+			converters[i] = DataFormatConverters.getConverterForDataType(type);
 		}
 
-		@Override
-		public void open(int taskNumber, int numTasks) throws IOException {
-		}
+		this.formatInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+				Arrays.asList(allColumns).subList(0, formatFields),
+				objectInspectors);
+	}
 
-		@Override
-		public void writeRecord(Row record) throws IOException {
-			try {
-				recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector));
-			} catch (SerDeException e) {
-				throw new IOException(e);
-			}
+	Writable toHiveWritable(Row row) throws SerDeException {

Review comment:
       It seems strange that a "record writer factory" is also responsible for converting data between Flink and Hive. Can we move these methods to somewhere else? Or perhaps this class shouldn't be a factory in the first place.

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/write/HadoopPathBasedPartFileWriterTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing writing data to the hadoop file system with different configurations.

Review comment:
       Doesn't seem to be a **base** class

##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.connectors.hive.write.DefaultHadoopFileCommitterFactory;
+import org.apache.flink.connectors.hive.write.HadoopFileCommitterFactory;
+import org.apache.flink.connectors.hive.write.HadoopPathBasedBulkWriterFactory;
+import org.apache.flink.connectors.hive.write.HadoopPathBasedPartFileWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Buckets builder to create buckets that use {@link HadoopPathBasedPartFileWriter}.
+ */
+public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPathBasedBulkFormatBuilder<IN, BucketID, T>>

Review comment:
       Why is this a "format builder" instead of a "buckets builder"?

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
##########
@@ -213,6 +228,95 @@ public void testWriteNullValues() throws Exception {
 		}
 	}
 
+	@Test(timeout = 120000)
+	public void testPartStreamingWrite() throws Exception {
+		testStreamingWrite(true, (path) -> {
+			File basePath = new File(path, "d=2020-05-03");
+			Assert.assertEquals(5, basePath.list().length);
+			Assert.assertTrue(new File(new File(basePath, "e=7"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=8"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=9"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=10"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=11"), "_MY_SUCCESS").exists());
+		});
+	}
+
+	@Test(timeout = 120000)
+	public void testNonPartStreamingWrite() throws Exception {
+		testStreamingWrite(false, (p) -> {});

Review comment:
       Does this mean success file is not written for non-partitioned tables?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
##########
@@ -20,122 +20,172 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 
 /**
- * An abstract writer for the currently open part file in a specific {@link Bucket}.
- *
- * <p>Currently, there are two subclasses, of this class:
- * <ol>
- *     <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
- *     <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
- * </ol>
- *
- * <p>This also implements the {@link PartFileInfo}.
+ * The {@link Bucket} uses the {@link PartFileWriter} to write element to a part file.
  */
 @Internal
-abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
+interface PartFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
 
-	private final BucketID bucketId;
+	/**
+	 * Write a element to the part file.
+	 * @param element the element to be written.
+	 * @param currentTime the writing time.
+	 * @throws IOException Thrown if writing the element fails.
+	 */
+	void write(final IN element, final long currentTime) throws IOException;
 
-	private final long creationTime;
+	/**
+	 * @return The state of the current part file.
+	 * @throws IOException Thrown if persisting the part file fails.
+	 */
+	InProgressFileRecoverable persist() throws IOException;
 
-	protected final RecoverableFsDataOutputStream currentPartStream;
 
-	private long lastUpdateTime;
+	/**
+	 * @return The state of the pending part file. {@link Bucket} uses this to commit the pending file.
+	 * @throws IOException Thrown if an I/O error occurs.
+	 */
+	PendingFileRecoverable closeForCommit() throws IOException;
 
-	protected PartFileWriter(
-			final BucketID bucketId,
-			final RecoverableFsDataOutputStream currentPartStream,
-			final long creationTime) {
+	/**
+	 * Dispose the part file.
+	 */
+	void dispose();
 
-		Preconditions.checkArgument(creationTime >= 0L);
-		this.bucketId = Preconditions.checkNotNull(bucketId);
-		this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
-		this.creationTime = creationTime;
-		this.lastUpdateTime = creationTime;
-	}
+	// ------------------------------------------------------------------------
 
-	abstract void write(IN element, long currentTime) throws IOException;
+	/**
+	 * An interface for factories that create the different {@link PartFileWriter writers}.
+	 */
+	interface PartFileFactory<IN, BucketID> {
 
-	RecoverableWriter.ResumeRecoverable persist() throws IOException {
-		return currentPartStream.persist();
-	}
+		/**
+		 * Used to create a new {@link PartFileWriter}.
+		 * @param bucketID the id of the bucket this writer is writing to.
+		 * @param path the path this writer will write to.
+		 * @param creationTime the creation time of the file.
+		 * @return the new {@link PartFileWriter}
+		 * @throws IOException Thrown if creating a writer fails.
+		 */
+		PartFileWriter<IN, BucketID> openNew(
+			final BucketID bucketID,
+			final Path path,
+			final long creationTime) throws IOException;
 
-	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
-		return currentPartStream.closeForCommit().getRecoverable();
-	}
+		/**
+		 * Used to resume a {@link PartFileWriter} from a {@link InProgressFileRecoverable}.
+		 * @param bucketID the id of the bucket this writer is writing to.
+		 * @param inProgressFileSnapshot the state of the part file.
+		 * @param creationTime the creation time of the file.
+		 * @return the resumed {@link PartFileWriter}
+		 * @throws IOException Thrown if resuming a writer fails.
+		 */
+		PartFileWriter<IN, BucketID> resumeFrom(
+			final BucketID bucketID,
+			final InProgressFileRecoverable inProgressFileSnapshot,
+			final long creationTime) throws IOException;
 
-	void dispose() {
-		// we can suppress exceptions here, because we do not rely on close() to
-		// flush or persist any data
-		IOUtils.closeQuietly(currentPartStream);
-	}
+		/**
+		 * Recovers a pending file for finalizing and committing.
+		 * @param pendingFileRecoverable The handle with the recovery information.
+		 * @return A pending file
+		 * @throws IOException Thrown if recovering a pending file fails.
+		 */
+		PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable) throws IOException;
 
-	void markWrite(long now) {
-		this.lastUpdateTime = now;
-	}
+		/**
+		 * Marks if requiring to do any additional cleanup/freeing of resources occupied
+		 * as part of a {@link InProgressFileRecoverable}.
+		 *
+		 * <p>In case cleanup is required, then {@link #cleanupInProgressFileRecoverable(InProgressFileRecoverable)} should
+		 * be called.
+		 *
+		 * @return {@code true} if cleanup is required, {@code false} otherwise.
+		 */
+		boolean requiresCleanupOfInProgressFileRecoverableState();
 
-	@Override
-	public BucketID getBucketId() {
-		return bucketId;
-	}
+		/**
+		 * Frees up any resources that were previously occupied in order to be able to
+		 * recover from a (potential) failure.
+		 *
+		 * <p><b>NOTE:</b> This operation should not throw an exception if the {@link InProgressFileRecoverable} has already
+		 * been cleaned up and the resources have been freed. But the contract is that it will throw
+		 * an {@link UnsupportedOperationException} if it is called for a {@link PartFileFactory}
+		 * whose {@link #requiresCleanupOfInProgressFileRecoverableState()} returns {@code false}.
+		 *
+		 * @param inProgressFileRecoverable the {@link InProgressFileRecoverable} whose state we want to clean-up.
+		 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
+		 * (e.g. the file to be deleted was not there for any reason - already deleted or never created).
+		 * @throws IOException if an I/O error occurs
+		 */
+		boolean cleanupInProgressFileRecoverable(final InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
 
-	@Override
-	public long getCreationTime() {
-		return creationTime;
-	}
 
-	@Override
-	public long getSize() throws IOException {
-		return currentPartStream.getPos();
-	}
+		/**
+		 * @return the serializer for the {@link PendingFileRecoverable}.
+		 */
+		SimpleVersionedSerializer<? extends PendingFileRecoverable> getPendingFileRecoverableSerializer();
 
-	@Override
-	public long getLastUpdateTime() {
-		return lastUpdateTime;
+		/**
+		 * @return the serializer for the {@link InProgressFileRecoverable}.
+		 */
+		SimpleVersionedSerializer<? extends InProgressFileRecoverable> getInProgressFileRecoverableSerializer();
+
+		/**
+		 * Checks whether the {@link PartFileWriter} supports resuming (appending to) files after
+		 * recovery (via the {@link #resumeFrom(Object, InProgressFileRecoverable, long)} method).
+		 *
+		 * <p>If true, then this writer supports the {@link #resumeFrom(Object, InProgressFileRecoverable, long)} method.
+		 * If false, then that method may not be supported and file can only be recovered via
+		 * {@link #recoverPendingFile(PendingFileRecoverable)}.
+		 */
+		boolean supportsResume();
 	}
 
-	// ------------------------------------------------------------------------
+	 /**
+	 * A handle can be used to recover in-progress file..
+	 */
+	interface InProgressFileRecoverable extends PendingFileRecoverable {}

Review comment:
       Can we have some comments to explain the difference between `pending file` and `in progress file`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org