You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by GitBox <gi...@apache.org> on 2018/12/11 17:14:47 UTC

[GitHub] rdblue closed pull request #14: Pluggable file I/O submodule in TableOperations

rdblue closed pull request #14: Pluggable file I/O submodule in TableOperations
URL: https://github.com/apache/incubator-iceberg/pull/14
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/api/src/main/java/com/netflix/iceberg/Files.java b/api/src/main/java/com/netflix/iceberg/Files.java
index f739751..e85825a 100644
--- a/api/src/main/java/com/netflix/iceberg/Files.java
+++ b/api/src/main/java/com/netflix/iceberg/Files.java
@@ -29,6 +29,7 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.file.Paths;
 
 public class Files {
 
@@ -37,7 +38,7 @@ public static OutputFile localOutput(File file) {
   }
 
   public static OutputFile localOutput(String file) {
-    return localOutput(new File(file));
+    return localOutput(Paths.get(file).toAbsolutePath().toFile());
   }
 
   private static class LocalOutputFile implements OutputFile {
@@ -53,6 +54,13 @@ public PositionOutputStream create() {
         throw new AlreadyExistsException("File already exists: %s", file);
       }
 
+      if (!file.getParentFile().isDirectory() && !file.getParentFile().mkdirs()) {
+        throw new RuntimeIOException(
+            String.format(
+                "Failed to create the file's directory at %s.",
+                file.getParentFile().getAbsolutePath()));
+      }
+
       try {
         return new PositionFileOutputStream(new RandomAccessFile(file, "rw"));
       } catch (FileNotFoundException e) {
diff --git a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
index 81452d4..79a1337 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
@@ -20,10 +20,8 @@
 package com.netflix.iceberg;
 
 import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.hadoop.HadoopOutputFile;
-import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.hadoop.HadoopFileIO;
 import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.util.Tasks;
 import org.apache.hadoop.conf.Configuration;
@@ -53,6 +51,7 @@
   private static final String HIVE_LOCATION_FOLDER_NAME = "empty";
 
   private final Configuration conf;
+  private final FileIO fileIo;
 
   private TableMetadata currentMetadata = null;
   private String currentMetadataLocation = null;
@@ -62,6 +61,7 @@
 
   protected BaseMetastoreTableOperations(Configuration conf) {
     this.conf = conf;
+    this.fileIo = new HadoopFileIO(conf);
   }
 
   @Override
@@ -88,22 +88,18 @@ public String hiveTableLocation() {
     return String.format("%s/%s", baseLocation, HIVE_LOCATION_FOLDER_NAME);
   }
 
-  public String dataLocation() {
-    return String.format("%s/%s", baseLocation, DATA_FOLDER_NAME);
-  }
-
   protected String writeNewMetadata(TableMetadata metadata, int version) {
     if (baseLocation == null) {
       baseLocation = metadata.location();
     }
 
-    String newFilename = newTableMetadataFilename(baseLocation, version);
-    OutputFile newMetadataLocation = HadoopOutputFile.fromPath(new Path(newFilename), conf);
+    String newTableMetadataFilePath = newTableMetadataFilePath(baseLocation, version);
+    OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath);
 
     // write the new metadata
     TableMetadataParser.write(metadata, newMetadataLocation);
 
-    return newFilename;
+    return newTableMetadataFilePath;
   }
 
   protected void refreshFromMetadataLocation(String newLocation) {
@@ -129,24 +125,13 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
   }
 
   @Override
-  public InputFile newInputFile(String path) {
-    return fromLocation(path, conf);
+  public String metadataFileLocation(String fileName) {
+    return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, fileName);
   }
 
   @Override
-  public OutputFile newMetadataFile(String filename) {
-    return HadoopOutputFile.fromPath(
-        new Path(newMetadataLocation(baseLocation, filename)), conf);
-  }
-
-  @Override
-  public void deleteFile(String file) {
-    Path path = new Path(file);
-    try {
-      getFS(path, conf).delete(path, false /* should be a file, not recursive */ );
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
+  public FileIO io() {
+    return fileIo;
   }
 
   @Override
@@ -154,7 +139,7 @@ public long newSnapshotId() {
     return System.currentTimeMillis();
   }
 
-  private String newTableMetadataFilename(String baseLocation, int newVersion) {
+  private String newTableMetadataFilePath(String baseLocation, int newVersion) {
     return String.format("%s/%s/%05d-%s%s",
             baseLocation,
             METADATA_FOLDER_NAME,
@@ -163,22 +148,6 @@ private String newTableMetadataFilename(String baseLocation, int newVersion) {
             getFileExtension(this.conf));
   }
 
-  private static String newMetadataLocation(String baseLocation, String filename) {
-    return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, filename);
-  }
-
-  private static String parseBaseLocation(String metadataLocation) {
-    int lastSlash = metadataLocation.lastIndexOf('/');
-    int secondToLastSlash = metadataLocation.lastIndexOf('/', lastSlash);
-
-    // verify that the metadata file was contained in a "metadata" folder
-    String parentFolderName = metadataLocation.substring(secondToLastSlash + 1, lastSlash);
-    Preconditions.checkArgument(METADATA_FOLDER_NAME.equals(parentFolderName),
-        "Invalid metadata location, not in metadata/ folder: %s", metadataLocation);
-
-    return metadataLocation.substring(0, secondToLastSlash);
-  }
-
   private static int parseVersion(String metadataLocation) {
     int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
     int versionEnd = metadataLocation.indexOf('-', versionStart);
diff --git a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
index 945ddbb..36a873a 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
@@ -50,7 +50,7 @@
                String... manifestFiles) {
     this(ops, snapshotId, null, System.currentTimeMillis(),
         Lists.transform(Arrays.asList(manifestFiles),
-            path -> new GenericManifestFile(ops.newInputFile(path), 0)));
+            path -> new GenericManifestFile(ops.io().newInputFile(path), 0)));
   }
 
   BaseSnapshot(TableOperations ops,
@@ -139,7 +139,7 @@ private void cacheChanges() {
     // accumulate adds and deletes from all manifests.
     // because manifests can be reused in newer snapshots, filter the changes by snapshot id.
     for (String manifest : Iterables.transform(manifests, ManifestFile::path)) {
-      try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
+      try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest))) {
         for (ManifestEntry add : reader.addedFiles()) {
           if (add.snapshotId() == snapshotId) {
             adds.add(add.file().copy());
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
index ad20780..8915461 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
@@ -34,7 +34,6 @@
 import com.netflix.iceberg.expressions.Expression;
 import com.netflix.iceberg.expressions.Expressions;
 import com.netflix.iceberg.expressions.InclusiveManifestEvaluator;
-import com.netflix.iceberg.expressions.Projections;
 import com.netflix.iceberg.expressions.ResidualEvaluator;
 import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.types.TypeUtil;
@@ -177,7 +176,7 @@ public InclusiveManifestEvaluator load(Integer specId) {
       Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
           matchingManifests,
           manifest -> {
-            ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
+            ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()));
             toClose.add(reader);
             String schemaString = SchemaParser.toJson(reader.spec().schema());
             String specString = PartitionSpecParser.toJson(reader.spec());
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
index a860117..1a56b7e 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
@@ -23,8 +23,6 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.CommitFailedException;
-import com.netflix.iceberg.io.InputFile;
-import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.util.Tasks;
 import java.util.List;
 import java.util.Map;
@@ -263,18 +261,13 @@ public void commit(TableMetadata base, TableMetadata metadata) {
     }
 
     @Override
-    public InputFile newInputFile(String path) {
-      return ops.newInputFile(path);
+    public FileIO io() {
+      return ops.io();
     }
 
     @Override
-    public OutputFile newMetadataFile(String filename) {
-      return ops.newMetadataFile(filename);
-    }
-
-    @Override
-    public void deleteFile(String path) {
-      ops.deleteFile(path);
+    public String metadataFileLocation(String fileName) {
+      return ops.metadataFileLocation(fileName);
     }
 
     @Override
diff --git a/core/src/main/java/com/netflix/iceberg/FileIO.java b/core/src/main/java/com/netflix/iceberg/FileIO.java
new file mode 100644
index 0000000..fdba7af
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/FileIO.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.io.OutputFile;
+
+import java.io.Serializable;
+
+/**
+ * Pluggable module for reading, writing, and deleting files.
+ * <p>
+ * Both table metadata files and data files can be written and read by this module. Implementations
+ * must be serializable because various clients of Spark tables may initialize this once and pass
+ * it off to a separate module that would then interact with the streams.
+ */
+public interface FileIO extends Serializable {
+
+  /**
+   * Get a {@link InputFile} instance to read bytes from the file at the given path.
+   */
+  InputFile newInputFile(String path);
+
+  /**
+   * Get a {@link OutputFile} instance to write bytes to the file at the given path.
+   */
+  OutputFile newOutputFile(String path);
+
+  /**
+   * Delete the file at the given path.
+   */
+  void deleteFile(String path);
+}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
index 19d993f..d05ceca 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
@@ -107,7 +107,7 @@ public ManifestGroup select(String... columns) {
     Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
         matchingManifests,
         manifest -> {
-          ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
+          ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()));
           FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
           toClose.add(reader);
           return Iterables.filter(
diff --git a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
index 8878d4c..156f9ea 100644
--- a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
@@ -316,7 +316,7 @@ private ManifestFile filterManifest(Expression deleteExpression,
       return manifest;
     }
 
-    try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+    try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
       Expression inclusiveExpr = Projections
           .inclusive(reader.spec())
           .project(deleteExpression);
@@ -463,7 +463,7 @@ private ManifestFile createManifest(int specId, List<ManifestFile> bin) throws I
     try {
 
       for (ManifestFile manifest : bin) {
-        try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+        try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
           for (ManifestEntry entry : reader.entries()) {
             if (entry.status() == Status.DELETED) {
               // suppress deletes from previous snapshots. only files deleted by this snapshot
diff --git a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
index 9ce6981..541cc5f 100644
--- a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
@@ -50,7 +50,7 @@
   private final Consumer<String> defaultDelete = new Consumer<String>() {
     @Override
     public void accept(String file) {
-      ops.deleteFile(file);
+      ops.io().deleteFile(file);
     }
   };
 
@@ -164,7 +164,7 @@ public void commit() {
         ).run(manifest -> {
           // even if the manifest is still used, it may contain files that can be deleted
           // TODO: eliminate manifests with no deletes without scanning
-          try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+          try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
             for (ManifestEntry entry : reader.entries()) {
               // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
               if (entry.status() == ManifestEntry.Status.DELETED &&
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
index a5ce08c..d73da8a 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
@@ -22,12 +22,9 @@
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.util.JsonUtil;
-import com.netflix.iceberg.util.Tasks;
-import com.netflix.iceberg.util.ThreadPools;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.List;
@@ -92,13 +89,13 @@ static Snapshot fromJson(TableOperations ops, JsonNode node) {
     if (node.has(MANIFEST_LIST)) {
       // the manifest list is stored in a manifest list file
       String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
-      return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.newInputFile(manifestList));
+      return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.io().newInputFile(manifestList));
 
     } else {
       // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
       // loaded lazily, if it is needed
       List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
-          location -> new GenericManifestFile(ops.newInputFile(location), 0));
+          location -> new GenericManifestFile(ops.io().newInputFile(location), 0));
       return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
     }
   }
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
index 54c0483..ce9d59c 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
@@ -132,7 +132,7 @@ public Snapshot apply() {
 
       return new BaseSnapshot(ops,
           snapshotId(), parentSnapshotId, System.currentTimeMillis(),
-          ops.newInputFile(manifestList.location()));
+          ops.io().newInputFile(manifestList.location()));
 
     } else {
       return new BaseSnapshot(ops,
@@ -188,16 +188,17 @@ protected void cleanAll() {
   }
 
   protected void deleteFile(String path) {
-    ops.deleteFile(path);
+    ops.io().deleteFile(path);
   }
 
   protected OutputFile manifestListPath() {
-    return ops.newMetadataFile(FileFormat.AVRO.addExtension(
-        String.format("snap-%d-%s", snapshotId(), commitUUID)));
+    return ops.io().newOutputFile(ops.metadataFileLocation(FileFormat.AVRO.addExtension(
+        String.format("snap-%d-%s", snapshotId(), commitUUID))));
   }
 
   protected OutputFile manifestPath(int i) {
-    return ops.newMetadataFile(FileFormat.AVRO.addExtension(commitUUID + "-m" + i));
+    return ops.io().newOutputFile(
+        ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + i)));
   }
 
   protected long snapshotId() {
@@ -208,7 +209,7 @@ protected long snapshotId() {
   }
 
   private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
-    try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+    try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
       PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
       int addedFiles = 0;
       int existingFiles = 0;
diff --git a/core/src/main/java/com/netflix/iceberg/TableOperations.java b/core/src/main/java/com/netflix/iceberg/TableOperations.java
index a0c94b8..e9d4388 100644
--- a/core/src/main/java/com/netflix/iceberg/TableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/TableOperations.java
@@ -19,7 +19,6 @@
 
 package com.netflix.iceberg;
 
-import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 
 /**
@@ -56,27 +55,18 @@
   void commit(TableMetadata base, TableMetadata metadata);
 
   /**
-   * Create a new {@link InputFile} for a path.
-   *
-   * @param path a string file path
-   * @return an InputFile instance for the path
+   * @return a {@link com.netflix.iceberg.FileIO} to read and write table data and metadata files
    */
-  InputFile newInputFile(String path);
+  FileIO io();
 
   /**
-   * Create a new {@link OutputFile} in the table's metadata store.
-   *
-   * @param filename a string file name, not a full path
-   * @return an OutputFile instance for the path
-   */
-  OutputFile newMetadataFile(String filename);
-
-  /**
-   * Delete a file.
-   *
-   * @param path path to the file
+   * Given the name of a metadata file, obtain the full path of that file using an appropriate base
+   * location of the implementation's choosing.
+   * <p>
+   * The file may not exist yet, in which case the path should be returned as if it were to be created
+   * by e.g. {@link FileIO#newOutputFile(String)}.
    */
-  void deleteFile(String path);
+  String metadataFileLocation(String fileName);
 
   /**
    * Create a new ID for a Snapshot
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
new file mode 100644
index 0000000..586942c
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
@@ -0,0 +1,43 @@
+package com.netflix.iceberg.hadoop;
+
+import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.io.OutputFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+public class HadoopFileIO implements FileIO {
+
+  private final SerializableConfiguration hadoopConf;
+
+  public HadoopFileIO(Configuration hadoopConf) {
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return HadoopInputFile.fromLocation(path, hadoopConf.get());
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get());
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    Path toDelete = new Path(path);
+    FileSystem fs = Util.getFS(toDelete, hadoopConf.get());
+    try {
+      fs.delete(toDelete, false /* not recursive */);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to delete file: %s", path);
+    }
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
index 875643e..d953056 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
@@ -19,14 +19,13 @@
 
 package com.netflix.iceberg.hadoop;
 
+import com.netflix.iceberg.FileIO;
 import com.netflix.iceberg.TableMetadata;
 import com.netflix.iceberg.TableMetadataParser;
 import com.netflix.iceberg.TableOperations;
 import com.netflix.iceberg.exceptions.CommitFailedException;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.exceptions.ValidationException;
-import com.netflix.iceberg.io.InputFile;
-import com.netflix.iceberg.io.OutputFile;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -53,6 +52,7 @@
   private TableMetadata currentMetadata = null;
   private Integer version = null;
   private boolean shouldRefresh = true;
+  private HadoopFileIO defaultFileIo = null;
 
   protected HadoopTableOperations(Path location, Configuration conf) {
     this.conf = conf;
@@ -91,7 +91,7 @@ public TableMetadata refresh() {
     }
     this.version = ver;
     this.currentMetadata = TableMetadataParser.read(this,
-        HadoopInputFile.fromPath(metadataFile, conf));
+        io().newInputFile(metadataFile.toString()));
     this.shouldRefresh = false;
     return currentMetadata;
   }
@@ -108,7 +108,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
     }
 
     Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf));
-    TableMetadataParser.write(metadata, HadoopOutputFile.fromPath(tempMetadataFile, conf));
+    TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
 
     int nextVersion = (version != null ? version : 0) + 1;
     Path finalMetadataFile = metadataFile(nextVersion);
@@ -142,24 +142,16 @@ public void commit(TableMetadata base, TableMetadata metadata) {
   }
 
   @Override
-  public InputFile newInputFile(String path) {
-    return HadoopInputFile.fromPath(new Path(path), conf);
-  }
-
-  @Override
-  public OutputFile newMetadataFile(String filename) {
-    return HadoopOutputFile.fromPath(metadataPath(filename), conf);
+  public FileIO io() {
+    if (defaultFileIo == null) {
+      defaultFileIo = new HadoopFileIO(conf);
+    }
+    return defaultFileIo;
   }
 
   @Override
-  public void deleteFile(String path) {
-    Path toDelete = new Path(path);
-    FileSystem fs = getFS(toDelete, conf);
-    try {
-      fs.delete(toDelete, false /* not recursive */ );
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to delete file: %s", path);
-    }
+  public String metadataFileLocation(String fileName) {
+    return metadataPath(fileName).toString();
   }
 
   @Override
@@ -194,7 +186,7 @@ private void writeVersionHint(int version) {
   private int readVersionHint() {
     Path versionHintFile = versionHintFile();
     try {
-      FileSystem fs = versionHintFile.getFileSystem(conf);
+      FileSystem fs = Util.getFS(versionHintFile, conf);
       if (!fs.exists(versionHintFile)) {
         return 0;
       }
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/SerializableConfiguration.java b/core/src/main/java/com/netflix/iceberg/hadoop/SerializableConfiguration.java
new file mode 100644
index 0000000..30c7563
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/SerializableConfiguration.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.hadoop;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Wraps a {@link Configuration} object in a {@link Serializable} layer.
+ */
+public class SerializableConfiguration implements Serializable {
+
+  private transient Configuration hadoopConf;
+
+  public SerializableConfiguration(Configuration hadoopCOnf) {
+    this.hadoopConf = hadoopCOnf;
+  }
+
+  private void writeObject(ObjectOutputStream out) throws IOException {
+    out.defaultWriteObject();
+    hadoopConf.write(out);
+  }
+
+  private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
+    in.defaultReadObject();
+    hadoopConf = new Configuration(false);
+    hadoopConf.readFields(in);
+  }
+
+  public Configuration get() {
+    return hadoopConf;
+  }
+}
diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
index 27a01fc..1508ee8 100644
--- a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
+++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
@@ -19,21 +19,22 @@
 
 package com.netflix.iceberg;
 
+import com.google.common.collect.Maps;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.io.InputFile;
-import com.netflix.iceberg.io.OutputFile;
+import java.util.Map;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.io.IOException;
 
-import static com.netflix.iceberg.Files.localInput;
-
 class LocalTableOperations implements TableOperations {
   private final TemporaryFolder temp;
+  private final FileIO io;
+
+  private final Map<String, String> createdMetadataFilePaths = Maps.newHashMap();
 
   LocalTableOperations(TemporaryFolder temp) {
     this.temp = temp;
+    this.io = new TestTables.LocalFileIO();
   }
 
   @Override
@@ -52,25 +53,19 @@ public void commit(TableMetadata base, TableMetadata metadata) {
   }
 
   @Override
-  public InputFile newInputFile(String path) {
-    return localInput(path);
-  }
-
-  @Override
-  public OutputFile newMetadataFile(String filename) {
-    try {
-      File metadataFile = temp.newFile(filename);
-      metadataFile.delete();
-      metadataFile.deleteOnExit();
-      return Files.localOutput(metadataFile);
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
+  public FileIO io() {
+    return io;
   }
 
   @Override
-  public void deleteFile(String path) {
-    new File(path).delete();
+  public String metadataFileLocation(String fileName) {
+    return createdMetadataFilePaths.computeIfAbsent(fileName, name -> {
+      try {
+        return temp.newFile(name).getAbsolutePath();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e);
+      }
+    });
   }
 
   @Override
diff --git a/core/src/test/java/com/netflix/iceberg/TestTables.java b/core/src/test/java/com/netflix/iceberg/TestTables.java
index fcb9bfc..e6aea02 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTables.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTables.java
@@ -27,6 +27,7 @@
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import java.io.File;
+import java.io.IOException;
 import java.util.Map;
 
 import static com.netflix.iceberg.TableMetadata.newTableMetadata;
@@ -173,14 +174,34 @@ public void commit(TableMetadata base, TableMetadata metadata) {
       }
     }
 
+    @Override
+    public FileIO io() {
+      return new LocalFileIO();
+    }
+
+    @Override
+    public String metadataFileLocation(String fileName) {
+      return new File(metadata, fileName).getAbsolutePath();
+    }
+
+    @Override
+    public long newSnapshotId() {
+      long nextSnapshotId = lastSnapshotId + 1;
+      this.lastSnapshotId = nextSnapshotId;
+      return nextSnapshotId;
+    }
+  }
+
+  static class LocalFileIO implements FileIO {
+
     @Override
     public InputFile newInputFile(String path) {
       return Files.localInput(path);
     }
 
     @Override
-    public OutputFile newMetadataFile(String filename) {
-      return Files.localOutput(new File(metadata, filename));
+    public OutputFile newOutputFile(String path) {
+      return Files.localOutput(path);
     }
 
     @Override
@@ -189,12 +210,5 @@ public void deleteFile(String path) {
         throw new RuntimeIOException("Failed to delete file: " + path);
       }
     }
-
-    @Override
-    public long newSnapshotId() {
-      long nextSnapshotId = lastSnapshotId + 1;
-      this.lastSnapshotId = nextSnapshotId;
-      return nextSnapshotId;
-    }
   }
 }
diff --git a/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java b/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java
index a506b4e..4f59556 100644
--- a/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java
+++ b/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java
@@ -79,7 +79,7 @@
   }
 
   private CloseableIterable<Record> open(FileScanTask task) {
-    InputFile input = ops.newInputFile(task.file().path().toString());
+    InputFile input = ops.io().newInputFile(task.file().path().toString());
 
     // TODO: join to partition data from the manifest file
     switch (task.file().format()) {
diff --git a/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
index 38a0cb0..0199e7f 100644
--- a/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
+++ b/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
@@ -161,7 +161,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
     } finally {
       if (threw) {
         // if anything went wrong, clean up the uncommitted metadata file
-        deleteFile(newMetadataLocation);
+        io().deleteFile(newMetadataLocation);
       }
       unlock(lockId);
     }
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
index a7ff513..90b6dc8 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
@@ -21,6 +21,7 @@
 
 import com.google.common.collect.Maps;
 import com.netflix.iceberg.BaseTable;
+import com.netflix.iceberg.FileIO;
 import com.netflix.iceberg.Files;
 import com.netflix.iceberg.PartitionSpec;
 import com.netflix.iceberg.Schema;
@@ -33,6 +34,7 @@
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import java.io.File;
+import java.io.IOException;
 import java.util.Map;
 
 // TODO: Use the copy of this from core.
@@ -153,16 +155,34 @@ public void commit(TableMetadata base, TableMetadata metadata) {
       }
     }
 
+    @Override
+    public FileIO io() {
+      return new LocalFileIO();
+    }
+
+    @Override
+    public String metadataFileLocation(String fileName) {
+      return new File(new File(current.location(), "metadata"), fileName).getAbsolutePath();
+    }
+
+    @Override
+    public long newSnapshotId() {
+      long nextSnapshotId = lastSnapshotId + 1;
+      this.lastSnapshotId = nextSnapshotId;
+      return nextSnapshotId;
+    }
+  }
+  
+  static class LocalFileIO implements FileIO {
+
     @Override
     public InputFile newInputFile(String path) {
       return Files.localInput(path);
     }
 
     @Override
-    public OutputFile newMetadataFile(String filename) {
-      File metadata = new File(current.location(), "metadata");
-      metadata.mkdirs();
-      return Files.localOutput(new File(metadata, filename));
+    public OutputFile newOutputFile(String path) {
+      return Files.localOutput(new File(path));
     }
 
     @Override
@@ -171,12 +191,5 @@ public void deleteFile(String path) {
         throw new RuntimeIOException("Failed to delete file: " + path);
       }
     }
-
-    @Override
-    public long newSnapshotId() {
-      long nextSnapshotId = lastSnapshotId + 1;
-      this.lastSnapshotId = nextSnapshotId;
-      return nextSnapshotId;
-    }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services