You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/14 11:50:20 UTC

[GitHub] [flink] JingsongLi commented on a change in pull request #12134: [FLINK-17594][filesystem] Support Hadoop path-based part-file writer.

JingsongLi commented on a change in pull request #12134:
URL: https://github.com/apache/flink/pull/12134#discussion_r425002427



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HadoopPathBasedBulkWriterFactory.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Factory for path-based bulk file writer that writes to the specific hadoop path.
+ */
+public abstract class HadoopPathBasedBulkWriterFactory<T> implements PathBasedBulkWriter.Factory<T> {
+
+	@Override
+	public final PathBasedBulkWriter<T> create(
+		org.apache.flink.core.fs.Path targetPath,
+		org.apache.flink.core.fs.Path inProgressPath) {
+
+		return create(
+			new Path(targetPath.toUri()),
+			new Path(inProgressPath.toUri()));
+	}
+
+	/**
+	 * Creates a path-based writer that writes to the <tt>inProgressPath</tt> first
+	 * and commits to <tt>targetPath</tt> finally.
+	 *
+	 * @param targetPath     The final hadoop path to commit to.
+	 * @param inProgressPath The intermediate hadop path to write to before committing.
+	 * @return The created writer.
+	 */
+	public abstract PathBasedBulkWriter<T> create(Path targetPath, Path inProgressPath);

Review comment:
       `targetPath` -> `targetFilePath`
   `inProgressPath` -> `inProgressFilePath`
   Otherwise confused with directory.

##########
File path: flink-core/src/main/java/org/apache/flink/api/common/serialization/PathBasedBulkWriter.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Specialized {@link BulkWriter} which is expected to write to specified
+ * {@link Path} instead of output stream.
+ */
+@Internal
+public interface PathBasedBulkWriter<T> extends BulkWriter<T> {
+
+	/**
+	 * Gets the size written by the current writer.
+	 *
+	 * @return The size written by the current writer.
+	 */
+	long getSize() throws IOException;
+
+	/**
+	 * Disposes the writer on failures. Unlike output-stream-based writers which
+	 * could handled uniformly by closing the underlying output stream, the path-
+	 * based writers need to be disposed explicitly.
+	 */
+	void dispose();
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A factory that creates a {@link PathBasedBulkWriter}.
+	 *
+	 * @param <T> The type of record to write.
+	 */
+	@FunctionalInterface
+	public interface Factory<T> extends Serializable {

Review comment:
       remove `public`

##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/committer/HadoopRenameFileCommitter.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write.committer;
+
+import org.apache.flink.connectors.hive.write.HadoopFileCommitter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The Hadoop file committer that directly rename the in-progress file
+ * to the target file. For FileSystem like S3, renaming may lead to
+ * additional copies.
+ */
+public class HadoopRenameFileCommitter implements HadoopFileCommitter {
+
+	private final Configuration configuration;
+
+	private final Path targetPath;
+
+	private final Path inProgressFilePath;
+
+	public HadoopRenameFileCommitter(Configuration configuration, Path targetPath) {
+		this.configuration = configuration;
+		this.targetPath = targetPath;
+		this.inProgressFilePath = generateInProgressFilePath();
+	}
+
+	@Override
+	public Path getTargetPath() {
+		return targetPath;
+	}
+
+	@Override
+	public Path getInProgressFilePath() {
+		return inProgressFilePath;
+	}
+
+	@Override
+	public void preCommit() {
+		// Do nothing.
+	}
+
+	@Override
+	public void commit() throws IOException {
+		FileSystem fileSystem = FileSystem.get(targetPath.toUri(), configuration);
+
+		try {
+			fileSystem.rename(inProgressFilePath, targetPath);
+		} catch (IOException e) {
+			throw new IOException(
+				String.format("Could not commit file from %s to %s", inProgressFilePath, targetPath),
+				e);
+		}
+	}
+
+	@Override
+	public void commitAfterRecovery() throws IOException {

Review comment:
       `commitAfterRecovery` is same to `commit`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PathBasedPartFileWriter.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Specialized part-file writer that writes to the specified path.
+ *
+ * @param <IN> the element type.
+ * @param <BucketID> the bucket type.
+ */
+public abstract class PathBasedPartFileWriter<IN, BucketID> extends AbstractPartFileWriter<IN, BucketID> {

Review comment:
       Move to hive now?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PathBasedPartFileWriter.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Specialized part-file writer that writes to the specified path.
+ *
+ * @param <IN> the element type.
+ * @param <BucketID> the bucket type.
+ */
+public abstract class PathBasedPartFileWriter<IN, BucketID> extends AbstractPartFileWriter<IN, BucketID> {
+
+	protected final PathBasedBulkWriter<IN> writer;
+
+	public PathBasedPartFileWriter(
+		final BucketID bucketID,
+		PathBasedBulkWriter<IN> writer,
+		long createTime) {
+
+		super(bucketID, createTime);
+
+		this.writer = writer;
+	}
+
+	@Override
+	public void write(IN element, long currentTime) throws IOException {
+		writer.addElement(element);
+		markWrite(currentTime);
+	}
+
+	@Override
+	public InProgressFileRecoverable persist() {
+		throw new UnsupportedOperationException("The path based writers do not support persisting");
+	}
+
+	@Override
+	public void dispose() {
+		writer.dispose();
+	}
+
+	@Override
+	public long getSize() throws IOException {
+		return writer.getSize();
+	}
+
+	/**
+	 * The handles to recover path-based pending files.
+	 */
+	public static class PathBasedPendingFileRecoverable implements PendingFileRecoverable {
+		private final Path path;
+
+		public PathBasedPendingFileRecoverable(Path path) {
+			this.path = path;
+		}
+
+		public Path getPath() {
+			return path;
+		}
+	}
+
+	private static class PathBasedPendingFileRecoverableSerializer
+		implements SimpleVersionedSerializer<PathBasedPendingFileRecoverable> {
+
+		static final PathBasedPendingFileRecoverableSerializer INSTANCE = new PathBasedPendingFileRecoverableSerializer();
+
+		private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+		private static final int MAGIC_NUMBER = 0x2c853c90;
+
+		@Override
+		public int getVersion() {
+			return 1;
+		}
+
+		@Override
+		public byte[] serialize(PathBasedPendingFileRecoverable pendingFileRecoverable) throws IOException {
+			byte[] pathBytes = pendingFileRecoverable.getPath().toString().getBytes(CHARSET);
+
+			byte[] targetBytes = new byte[8 + pathBytes.length];
+			ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+			bb.putInt(MAGIC_NUMBER);
+			bb.putInt(pathBytes.length);
+			bb.put(pathBytes);
+
+			return targetBytes;
+		}
+
+		@Override
+		public PathBasedPendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
+			switch (version) {
+				case 1:
+					return deserializeV1(serialized);
+				default:
+					throw new IOException("Unrecognized version or corrupt state: " + version);
+			}
+		}
+
+		private PathBasedPendingFileRecoverable deserializeV1(byte[] serialized) throws IOException {
+			final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+			if (bb.getInt() != MAGIC_NUMBER) {
+				throw new IOException("Corrupt data: Unexpected magic number.");
+			}
+
+			byte[] pathBytes = new byte[bb.getInt()];
+			bb.get(pathBytes);
+			String targetPath = new String(pathBytes, CHARSET);
+
+			return new PathBasedPendingFileRecoverable(new Path(targetPath));
+		}
+	}
+
+	private static class EmptyInProgressFileRecoverableSerializable

Review comment:
       `UnsupportedInProgressFileSerializer`?

##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/committer/HadoopRenameFileCommitter.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write.committer;
+
+import org.apache.flink.connectors.hive.write.HadoopFileCommitter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The Hadoop file committer that directly rename the in-progress file
+ * to the target file. For FileSystem like S3, renaming may lead to
+ * additional copies.
+ */
+public class HadoopRenameFileCommitter implements HadoopFileCommitter {
+
+	private final Configuration configuration;
+
+	private final Path targetPath;
+
+	private final Path inProgressFilePath;
+
+	public HadoopRenameFileCommitter(Configuration configuration, Path targetPath) {
+		this.configuration = configuration;
+		this.targetPath = targetPath;
+		this.inProgressFilePath = generateInProgressFilePath();
+	}
+
+	@Override
+	public Path getTargetPath() {
+		return targetPath;
+	}
+
+	@Override
+	public Path getInProgressFilePath() {
+		return inProgressFilePath;
+	}
+
+	@Override
+	public void preCommit() {
+		// Do nothing.
+	}
+
+	@Override
+	public void commit() throws IOException {
+		FileSystem fileSystem = FileSystem.get(targetPath.toUri(), configuration);

Review comment:
       `FileSystem.get(targetPath.toUri(), configuration)` is different from Flink file system, it is very heavy, can we reuse this?

##########
File path: flink-core/src/main/java/org/apache/flink/api/common/serialization/PathBasedBulkWriter.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Specialized {@link BulkWriter} which is expected to write to specified
+ * {@link Path} instead of output stream.
+ */
+@Internal
+public interface PathBasedBulkWriter<T> extends BulkWriter<T> {

Review comment:
       Move to hive?
   Do we need this one? Just `HadoopPathBasedBulkWriter`? Because no one invoke `Factory.create`

##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/DefaultHadoopFileCommitterFactory.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+
+import org.apache.flink.connectors.hive.write.committer.HadoopRenameFileCommitter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * The default hadoop file committer factory which always use {@link HadoopRenameFileCommitter}.
+ */
+public class DefaultHadoopFileCommitterFactory implements HadoopFileCommitterFactory {
+

Review comment:
       ser id.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
##########
@@ -128,24 +128,12 @@
 	private transient ListState<Long> maxPartCountersState;
 
 	/**
-	 * Creates a new {@code StreamingFileSink} that writes files in row-based format to the given base directory.
+	 * Creates a new {@code StreamingFileSink} that writes files to the given base directory

Review comment:
       Does it affect compatibility?

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/write/HadoopPathBasedPartFileWriterTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing writing data to the hadoop file system with different configurations.
+ */
+public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
+	@Rule
+	public final Timeout timeoutPerTest = Timeout.seconds(2000);
+
+	@Test
+	public void testWriteFile() throws Exception {
+		File file = TEMPORARY_FOLDER.newFolder();
+		Path basePath = new Path(file.toURI());
+
+		List<String> data = Arrays.asList(
+			"first line",
+			"second line",
+			"third line");
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.enableCheckpointing(100);
+
+		DataStream<String> stream = env.addSource(
+			new FiniteTestSource<>(data), TypeInformation.of(String.class));
+		Configuration configuration = new Configuration();
+
+		HadoopPathBasedBulkFormatBuilder<String, String, ?> builder =
+			new HadoopPathBasedBulkFormatBuilder<>(
+				basePath,
+				new TestHadoopPathBasedBulkWriterFactory(),
+				configuration,
+				new DateTimeBucketAssigner<>());
+		TestStreamingFileSinkFactory<String> streamingFileSinkFactory = new TestStreamingFileSinkFactory<>();
+		stream.addSink(streamingFileSinkFactory.createSink(builder, 1000));
+
+		env.execute();
+		validateResult(data, configuration, basePath);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void validateResult(List<String> expected, Configuration config, Path basePath) throws IOException {
+		FileSystem fileSystem = FileSystem.get(basePath.toUri(), config);
+		FileStatus[] buckets = fileSystem.listStatus(basePath);
+		assertNotNull(buckets);
+		assertEquals(1, buckets.length);
+
+		FileStatus[] partFiles = fileSystem.listStatus(buckets[0].getPath());
+		assertNotNull(partFiles);
+		assertEquals(2, partFiles.length);
+
+		for (FileStatus partFile : partFiles) {
+			assertTrue(partFile.getLen() > 0);
+
+			List<String> fileContent = readHadoopPath(fileSystem, partFile.getPath());
+			assertEquals(expected, fileContent);
+		}
+	}
+
+	private List<String> readHadoopPath(FileSystem fileSystem, Path partFile) throws IOException {
+		try (FSDataInputStream dataInputStream = fileSystem.open(partFile)) {
+			List<String> lines = new ArrayList<>();
+			BufferedReader reader = new BufferedReader(new InputStreamReader(dataInputStream));
+			String line = null;
+			while ((line = reader.readLine()) != null) {
+				lines.add(line);
+			}
+
+			return lines;
+		}
+	}
+
+	private static class TestHadoopPathBasedBulkWriterFactory extends HadoopPathBasedBulkWriterFactory<String> {
+
+		@Override
+		public PathBasedBulkWriter<String> create(Path targetPath, Path inProgressPath) {
+			try {
+				FileSystem fileSystem = FileSystem.get(inProgressPath.toUri(), new Configuration());
+				FSDataOutputStream output = fileSystem.create(inProgressPath);
+				return new FSDataOutputStreamBulkWriter(output);
+			} catch (IOException e) {
+				ExceptionUtils.rethrow(e);
+			}
+
+			return null;
+		}
+	}
+
+	private static class FSDataOutputStreamBulkWriter implements PathBasedBulkWriter<String> {
+		private final FSDataOutputStream outputStream;
+
+		public FSDataOutputStreamBulkWriter(FSDataOutputStream outputStream) {
+			this.outputStream = outputStream;
+		}
+
+		@Override
+		public long getSize() throws IOException {
+			return outputStream.getPos();
+		}
+
+		@Override
+		public void dispose() {
+			try {
+				outputStream.close();

Review comment:
       `IOUtils.closeQuietly`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org