You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/12/08 23:45:56 UTC

parquet-mr git commit: PARQUET-382: Add methods to append encoded data to files.

Repository: parquet-mr
Updated Branches:
  refs/heads/master 56326400f -> b45c4bdb4


PARQUET-382: Add methods to append encoded data to files.

This allows appending encoded data blocks to open ParquetFileWriters,
which makes it possible to merge multiple Parquet files without
re-encoding all of the records.

This works by finding the column chunk for each column in the file
schema and then streaming the encoded data from one file to the other.
New starting offsets are tracked and the column chunk metadata in the
footer is updated with the new starting positions.

Author: Ryan Blue <bl...@apache.org>

Closes #278 from rdblue/PARQUET-382-append-encoded-blocks and squashes the following commits:

cb98552 [Ryan Blue] PARQUET-382: Add methods to append encoded data to files.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/b45c4bdb
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/b45c4bdb
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/b45c4bdb

Branch: refs/heads/master
Commit: b45c4bdb496381b5f90df6872edca12e0a2e68ca
Parents: 5632640
Author: Ryan Blue <bl...@apache.org>
Authored: Tue Dec 8 14:45:48 2015 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Tue Dec 8 14:45:48 2015 -0800

----------------------------------------------------------------------
 .../parquet/hadoop/ParquetFileReader.java       |   9 +
 .../parquet/hadoop/ParquetFileWriter.java       | 160 +++++++++-
 .../hadoop/TestParquetWriterAppendBlocks.java   | 310 +++++++++++++++++++
 3 files changed, 468 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b45c4bdb/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index c54b2b2..55ed5ee 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -458,6 +458,12 @@ public class ParquetFileReader implements Closeable {
     }
   }
 
+  static ParquetFileReader open(Configuration conf, Path file) throws IOException {
+    ParquetMetadata footer = readFooter(conf, file, NO_FILTER);
+    return new ParquetFileReader(conf, footer.getFileMetaData(), file,
+        footer.getBlocks(), footer.getFileMetaData().getSchema().getColumns());
+  }
+
   private final CodecFactory codecFactory;
   private final List<BlockMetaData> blocks;
   private final FSDataInputStream f;
@@ -502,6 +508,9 @@ public class ParquetFileReader implements Closeable {
     this.allocator = new HeapByteBufferAllocator();
   }
 
+  public void appendTo(ParquetFileWriter writer) throws IOException {
+    writer.appendRowGroups(f, blocks, true);
+  }
 
   /**
    * Reads all the columns requested from the row group at the current file position.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b45c4bdb/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 8683a18..b17b936 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -24,6 +24,7 @@ import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
 import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,12 +35,14 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.Preconditions;
+import org.apache.parquet.Strings;
 import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
@@ -103,21 +106,29 @@ public class ParquetFileWriter {
   private final MessageType schema;
   private final FSDataOutputStream out;
   private final AlignmentStrategy alignment;
-  private BlockMetaData currentBlock;
-  private long currentRecordCount;
+
+  // file data
   private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+
+  // row group data
+  private BlockMetaData currentBlock; // appended to by endColumn
+
+  // row group data set at the start of a row group
+  private long currentRecordCount; // set in startBlock
+
+  // column chunk data accumulated as pages are written
+  private Set<Encoding> currentEncodings;
   private long uncompressedLength;
   private long compressedLength;
-  private Set<Encoding> currentEncodings;
+  private Statistics currentStatistics; // accumulated in writePage(s)
 
-  private CompressionCodecName currentChunkCodec;
-  private ColumnPath currentChunkPath;
-  private PrimitiveTypeName currentChunkType;
-  private long currentChunkFirstDataPage;
-  private long currentChunkDictionaryPageOffset;
-  private long currentChunkValueCount;
-
-  private Statistics currentStatistics;
+  // column chunk data set at the start of a column
+  private CompressionCodecName currentChunkCodec; // set in startColumn
+  private ColumnPath currentChunkPath;            // set in startColumn
+  private PrimitiveTypeName currentChunkType;     // set in startColumn
+  private long currentChunkValueCount;            // set in startColumn
+  private long currentChunkFirstDataPage;         // set in startColumn (out.pos())
+  private long currentChunkDictionaryPageOffset;  // set in writeDictionaryPage
 
   /**
    * Captures the order in which methods should be called
@@ -458,6 +469,133 @@ public class ParquetFileWriter {
     currentBlock = null;
   }
 
+  public void appendFile(Configuration conf, Path file) throws IOException {
+    ParquetFileReader.open(conf, file).appendTo(this);
+  }
+
+  public void appendRowGroups(FSDataInputStream file,
+                              List<BlockMetaData> rowGroups,
+                              boolean dropColumns) throws IOException {
+    for (BlockMetaData block : rowGroups) {
+      appendRowGroup(file, block, dropColumns);
+    }
+  }
+
+  public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup,
+                             boolean dropColumns) throws IOException {
+    startBlock(rowGroup.getRowCount());
+
+    Map<String, ColumnChunkMetaData> columnsToCopy =
+        new HashMap<String, ColumnChunkMetaData>();
+    for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
+      columnsToCopy.put(chunk.getPath().toDotString(), chunk);
+    }
+
+    List<ColumnChunkMetaData> columnsInOrder =
+        new ArrayList<ColumnChunkMetaData>();
+
+    for (ColumnDescriptor descriptor : schema.getColumns()) {
+      String path = ColumnPath.get(descriptor.getPath()).toDotString();
+      ColumnChunkMetaData chunk = columnsToCopy.remove(path);
+      if (chunk != null) {
+        columnsInOrder.add(chunk);
+      } else {
+        throw new IllegalArgumentException(String.format(
+            "Missing column '%s', cannot copy row group: %s", path, rowGroup));
+      }
+    }
+
+    // complain if some columns would be dropped and that's not okay
+    if (!dropColumns && !columnsToCopy.isEmpty()) {
+      throw new IllegalArgumentException(String.format(
+          "Columns cannot be copied (missing from target schema): %s",
+          Strings.join(columnsToCopy.keySet(), ", ")));
+    }
+
+    // copy the data for all chunks
+    long start = -1;
+    long length = 0;
+    long blockCompressedSize = 0;
+    for (int i = 0; i < columnsInOrder.size(); i += 1) {
+      ColumnChunkMetaData chunk = columnsInOrder.get(i);
+
+      // get this chunk's start position in the new file
+      long newChunkStart = out.getPos() + length;
+
+      // add this chunk to be copied with any previous chunks
+      if (start < 0) {
+        // no previous chunk included, start at this chunk's starting pos
+        start = chunk.getStartingPos();
+      }
+      length += chunk.getTotalSize();
+
+      if ((i + 1) == columnsInOrder.size() ||
+          columnsInOrder.get(i + 1).getStartingPos() != (start + length)) {
+        // not contiguous. do the copy now.
+        copy(from, out, start, length);
+        // reset to start at the next column chunk
+        start = -1;
+        length = 0;
+      }
+
+      currentBlock.addColumn(ColumnChunkMetaData.get(
+          chunk.getPath(),
+          chunk.getType(),
+          chunk.getCodec(),
+          chunk.getEncodings(),
+          chunk.getStatistics(),
+          newChunkStart,
+          newChunkStart,
+          chunk.getValueCount(),
+          chunk.getTotalSize(),
+          chunk.getTotalUncompressedSize()));
+
+      blockCompressedSize += chunk.getTotalSize();
+    }
+
+    currentBlock.setTotalByteSize(blockCompressedSize);
+
+    endBlock();
+  }
+
+  // Buffers for the copy function.
+  private static final ThreadLocal<byte[]> COPY_BUFFER =
+      new ThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+          return new byte[8192];
+        }
+      };
+
+  /**
+   * Copy from a FS input stream to an output stream. Thread-safe
+   *
+   * @param from a {@link FSDataInputStream}
+   * @param to any {@link OutputStream}
+   * @param start where in the from stream to start copying
+   * @param length the number of bytes to copy
+   * @throws IOException
+   */
+  private static void copy(FSDataInputStream from, FSDataOutputStream to,
+                          long start, long length) throws IOException{
+    if (DEBUG) LOG.debug(
+        "Copying " + length + " bytes at " + start + " to " + to.getPos());
+    from.seek(start);
+    long bytesCopied = 0;
+    byte[] buffer = COPY_BUFFER.get();
+    while (bytesCopied < length) {
+      long bytesLeft = length - bytesCopied;
+      int bytesRead = from.read(buffer, 0,
+          (buffer.length < bytesLeft ? buffer.length : (int) bytesLeft));
+      if (bytesRead < 0) {
+        throw new IllegalArgumentException(
+            "Unexpected end of input file at " + start + bytesCopied);
+      }
+      to.write(buffer, 0, bytesRead);
+      bytesCopied += bytesRead;
+    }
+  }
+
   /**
    * ends a file once all blocks have been written.
    * closes the file.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b45c4bdb/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
new file mode 100644
index 0000000..ae37f63
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
@@ -0,0 +1,310 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+
+public class TestParquetWriterAppendBlocks {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static final int FILE_SIZE = 10000;
+  public static final Configuration CONF = new Configuration();
+  public static final Map<String, String> EMPTY_METADATA =
+      new HashMap<String, String>();
+  public static final MessageType FILE_SCHEMA = Types.buildMessage()
+      .required(INT32).named("id")
+      .required(BINARY).as(UTF8).named("string")
+      .named("AppendTest");
+  public static final SimpleGroupFactory GROUP_FACTORY =
+      new SimpleGroupFactory(FILE_SCHEMA);
+
+  public Path file1;
+  public List<Group> file1content = new ArrayList<Group>();
+  public Path file2;
+  public List<Group> file2content = new ArrayList<Group>();
+
+  @Before
+  public void createSourceData() throws IOException {
+    this.file1 = newTemp();
+    this.file2 = newTemp();
+
+    ParquetWriter<Group> writer1 = ExampleParquetWriter.builder(file1)
+        .withType(FILE_SCHEMA)
+        .build();
+    ParquetWriter<Group> writer2 = ExampleParquetWriter.builder(file2)
+        .withType(FILE_SCHEMA)
+        .build();
+
+    for (int i = 0; i < FILE_SIZE; i += 1) {
+      Group group1 = GROUP_FACTORY.newGroup();
+      group1.add("id", i);
+      group1.add("string", UUID.randomUUID().toString());
+      writer1.write(group1);
+      file1content.add(group1);
+
+      Group group2 = GROUP_FACTORY.newGroup();
+      group2.add("id", FILE_SIZE+i);
+      group2.add("string", UUID.randomUUID().toString());
+      writer2.write(group2);
+      file2content.add(group2);
+    }
+
+    writer1.close();
+    writer2.close();
+  }
+
+  @Test
+  public void testBasicBehavior() throws IOException {
+    Path combinedFile = newTemp();
+    ParquetFileWriter writer = new ParquetFileWriter(
+        CONF, FILE_SCHEMA, combinedFile);
+    writer.start();
+    writer.appendFile(CONF, file1);
+    writer.appendFile(CONF, file2);
+    writer.end(EMPTY_METADATA);
+
+    LinkedList<Group> expected = new LinkedList<Group>();
+    expected.addAll(file1content);
+    expected.addAll(file2content);
+
+    ParquetReader<Group> reader = ParquetReader
+        .builder(new GroupReadSupport(), combinedFile)
+        .build();
+
+    Group next;
+    while ((next = reader.read()) != null) {
+      Group expectedNext = expected.removeFirst();
+      // check each value; equals is not supported for simple records
+      Assert.assertEquals("Each id should match",
+          expectedNext.getInteger("id", 0), next.getInteger("id", 0));
+      Assert.assertEquals("Each string should match",
+          expectedNext.getString("string", 0), next.getString("string", 0));
+    }
+
+    Assert.assertEquals("All records should be present", 0, expected.size());
+  }
+
+  @Test
+  public void testMergedMetadata() throws IOException {
+    Path combinedFile = newTemp();
+    ParquetFileWriter writer = new ParquetFileWriter(
+        CONF, FILE_SCHEMA, combinedFile);
+    writer.start();
+    writer.appendFile(CONF, file1);
+    writer.appendFile(CONF, file2);
+    writer.end(EMPTY_METADATA);
+
+    ParquetMetadata combinedFooter = ParquetFileReader.readFooter(
+        CONF, combinedFile, NO_FILTER);
+    ParquetMetadata f1Footer = ParquetFileReader.readFooter(
+        CONF, file1, NO_FILTER);
+    ParquetMetadata f2Footer = ParquetFileReader.readFooter(
+        CONF, file2, NO_FILTER);
+
+    LinkedList<BlockMetaData> expectedRowGroups = new LinkedList<BlockMetaData>();
+    expectedRowGroups.addAll(f1Footer.getBlocks());
+    expectedRowGroups.addAll(f2Footer.getBlocks());
+
+    Assert.assertEquals("Combined should have the right number of row groups",
+        expectedRowGroups.size(),
+        combinedFooter.getBlocks().size());
+
+    long nextStart = 4;
+    for (BlockMetaData rowGroup : combinedFooter.getBlocks()) {
+      BlockMetaData expected = expectedRowGroups.removeFirst();
+      Assert.assertEquals("Row count should match",
+          expected.getRowCount(), rowGroup.getRowCount());
+      Assert.assertEquals("Compressed size should match",
+          expected.getCompressedSize(), rowGroup.getCompressedSize());
+      Assert.assertEquals("Total size should match",
+          expected.getTotalByteSize(), rowGroup.getTotalByteSize());
+      Assert.assertEquals("Start pos should be at the last row group's end",
+          nextStart, rowGroup.getStartingPos());
+      assertColumnsEquivalent(expected.getColumns(), rowGroup.getColumns());
+      nextStart = rowGroup.getStartingPos() + rowGroup.getTotalByteSize();
+    }
+  }
+
+  public void assertColumnsEquivalent(List<ColumnChunkMetaData> expected,
+                                      List<ColumnChunkMetaData> actual) {
+    Assert.assertEquals("Should have the expected columns",
+        expected.size(), actual.size());
+    for (int i = 0; i < actual.size(); i += 1) {
+      ColumnChunkMetaData current = actual.get(i);
+      if (i != 0) {
+        ColumnChunkMetaData previous = actual.get(i - 1);
+        long expectedStart = previous.getStartingPos() + previous.getTotalSize();
+        Assert.assertEquals("Should start after the previous column",
+            expectedStart, current.getStartingPos());
+      }
+
+      assertColumnMetadataEquivalent(expected.get(i), current);
+    }
+  }
+
+  public void assertColumnMetadataEquivalent(ColumnChunkMetaData expected,
+                                             ColumnChunkMetaData actual) {
+    Assert.assertEquals("Should be the expected column",
+        expected.getPath(), expected.getPath());
+    Assert.assertEquals("Primitive type should not change",
+        expected.getType(), actual.getType());
+    Assert.assertEquals("Compression codec should not change",
+        expected.getCodec(), actual.getCodec());
+    Assert.assertEquals("Data encodings should not change",
+        expected.getEncodings(), actual.getEncodings());
+    Assert.assertEquals("Statistics should not change",
+        expected.getStatistics(), actual.getStatistics());
+    Assert.assertEquals("Uncompressed size should not change",
+        expected.getTotalUncompressedSize(), actual.getTotalUncompressedSize());
+    Assert.assertEquals("Compressed size should not change",
+        expected.getTotalSize(), actual.getTotalSize());
+    Assert.assertEquals("Number of values should not change",
+        expected.getValueCount(), actual.getValueCount());
+
+  }
+
+  @Test
+  public void testAllowDroppingColumns() throws IOException {
+    MessageType droppedColumnSchema = Types.buildMessage()
+        .required(BINARY).as(UTF8).named("string")
+        .named("AppendTest");
+
+    Path droppedColumnFile = newTemp();
+    ParquetFileWriter writer = new ParquetFileWriter(
+        CONF, droppedColumnSchema, droppedColumnFile);
+    writer.start();
+    writer.appendFile(CONF, file1);
+    writer.appendFile(CONF, file2);
+    writer.end(EMPTY_METADATA);
+
+    LinkedList<Group> expected = new LinkedList<Group>();
+    expected.addAll(file1content);
+    expected.addAll(file2content);
+
+    ParquetMetadata footer = ParquetFileReader.readFooter(
+        CONF, droppedColumnFile, NO_FILTER);
+    for (BlockMetaData rowGroup : footer.getBlocks()) {
+      Assert.assertEquals("Should have only the string column",
+          1, rowGroup.getColumns().size());
+    }
+
+    ParquetReader<Group> reader = ParquetReader
+        .builder(new GroupReadSupport(), droppedColumnFile)
+        .build();
+
+    Group next;
+    while ((next = reader.read()) != null) {
+      Group expectedNext = expected.removeFirst();
+      Assert.assertEquals("Each string should match",
+          expectedNext.getString("string", 0), next.getString("string", 0));
+    }
+
+    Assert.assertEquals("All records should be present", 0, expected.size());
+  }
+
+  @Test
+  public void testFailDroppingColumns() throws IOException {
+    MessageType droppedColumnSchema = Types.buildMessage()
+        .required(BINARY).as(UTF8).named("string")
+        .named("AppendTest");
+
+    final ParquetMetadata footer = ParquetFileReader.readFooter(
+        CONF, file1, NO_FILTER);
+    final FSDataInputStream incoming = file1.getFileSystem(CONF).open(file1);
+
+    Path droppedColumnFile = newTemp();
+    final ParquetFileWriter writer = new ParquetFileWriter(
+        CONF, droppedColumnSchema, droppedColumnFile);
+    writer.start();
+
+    TestUtils.assertThrows("Should complain that id column is dropped",
+        IllegalArgumentException.class,
+        new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            writer.appendRowGroups(incoming, footer.getBlocks(), false);
+            return null;
+          }
+        });
+  }
+
+  @Test
+  public void testFailMissingColumn() throws IOException {
+    MessageType fileSchema = Types.buildMessage()
+        .required(INT32).named("id")
+        .required(BINARY).as(UTF8).named("string")
+        .required(FLOAT).named("value")
+        .named("AppendTest");
+
+    Path missingColumnFile = newTemp();
+    final ParquetFileWriter writer = new ParquetFileWriter(
+        CONF, fileSchema, missingColumnFile);
+    writer.start();
+
+    TestUtils.assertThrows("Should complain that value column is missing",
+        IllegalArgumentException.class,
+        new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            writer.appendFile(CONF, file1);
+            return null;
+          }
+        });
+  }
+
+  private Path newTemp() throws IOException {
+    File file = temp.newFile();
+    Preconditions.checkArgument(file.delete(), "Could not remove temp file");
+    return new Path(file.toString());
+  }
+}