You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2019/01/09 00:55:56 UTC

[GitHub] rdblue closed pull request #52: Use the FileIO submodule in Spark writers and readers.

rdblue closed pull request #52: Use the FileIO submodule in Spark writers and readers.
URL: https://github.com/apache/incubator-iceberg/pull/52
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/api/src/main/java/com/netflix/iceberg/Table.java b/api/src/main/java/com/netflix/iceberg/Table.java
index fe19fa2..9acb50d 100644
--- a/api/src/main/java/com/netflix/iceberg/Table.java
+++ b/api/src/main/java/com/netflix/iceberg/Table.java
@@ -19,6 +19,7 @@
 
 package com.netflix.iceberg;
 
+import com.netflix.iceberg.io.FileIO;
 import java.util.Map;
 
 /**
@@ -171,4 +172,10 @@ default AppendFiles newFastAppend() {
    * @return a new {@link Transaction}
    */
   Transaction newTransaction();
+
+  /**
+   * @return a {@link FileIO} to read and write table data and metadata files
+   */
+  FileIO io();
+
 }
diff --git a/core/src/main/java/com/netflix/iceberg/FileIO.java b/api/src/main/java/com/netflix/iceberg/io/FileIO.java
similarity index 98%
rename from core/src/main/java/com/netflix/iceberg/FileIO.java
rename to api/src/main/java/com/netflix/iceberg/io/FileIO.java
index fdba7af..ed859b9 100644
--- a/core/src/main/java/com/netflix/iceberg/FileIO.java
+++ b/api/src/main/java/com/netflix/iceberg/io/FileIO.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package com.netflix.iceberg;
+package com.netflix.iceberg.io;
 
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
diff --git a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
index b107d0b..3e9b420 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
@@ -22,6 +22,7 @@
 import com.google.common.base.Objects;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.hadoop.HadoopFileIO;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.util.Tasks;
 import org.apache.hadoop.conf.Configuration;
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTable.java b/core/src/main/java/com/netflix/iceberg/BaseTable.java
index da11b55..7d48ef2 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTable.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTable.java
@@ -19,6 +19,7 @@
 
 package com.netflix.iceberg;
 
+import com.netflix.iceberg.io.FileIO;
 import java.util.Map;
 
 /**
@@ -135,6 +136,11 @@ public Transaction newTransaction() {
     return BaseTransaction.newTransaction(ops);
   }
 
+  @Override
+  public FileIO io() {
+    return operations().io();
+  }
+
   @Override
   public String toString() {
     return name;
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
index 1a56b7e..b7c3a32 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.CommitFailedException;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.util.Tasks;
 import java.util.List;
 import java.util.Map;
@@ -365,5 +366,10 @@ public Rollback rollback() {
     public Transaction newTransaction() {
       throw new UnsupportedOperationException("Cannot create a transaction within a transaction");
     }
+
+    @Override
+    public FileIO io() {
+      return transactionOps.io();
+    }
   }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/TableOperations.java b/core/src/main/java/com/netflix/iceberg/TableOperations.java
index 19fc386..974d5e2 100644
--- a/core/src/main/java/com/netflix/iceberg/TableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/TableOperations.java
@@ -19,10 +19,9 @@
 
 package com.netflix.iceberg;
 
+import com.netflix.iceberg.io.FileIO;
 import java.util.UUID;
 
-import com.netflix.iceberg.io.OutputFile;
-
 /**
  * SPI interface to abstract table metadata access and updates.
  */
@@ -57,7 +56,7 @@
   void commit(TableMetadata base, TableMetadata metadata);
 
   /**
-   * @return a {@link com.netflix.iceberg.FileIO} to read and write table data and metadata files
+   * @return a {@link FileIO} to read and write table data and metadata files
    */
   FileIO io();
 
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
index 586942c..7e1d004 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
@@ -1,6 +1,6 @@
 package com.netflix.iceberg.hadoop;
 
-import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
@@ -9,8 +9,6 @@
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 
 public class HadoopFileIO implements FileIO {
 
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
index 4aa19f4..1a3b0fd 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
@@ -19,7 +19,7 @@
 
 package com.netflix.iceberg.hadoop;
 
-import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.TableMetadata;
 import com.netflix.iceberg.TableMetadataParser;
 import com.netflix.iceberg.TableOperations;
diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
index 1508ee8..baa286f 100644
--- a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
+++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
@@ -21,6 +21,7 @@
 
 import com.google.common.collect.Maps;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.FileIO;
 import java.util.Map;
 import org.junit.rules.TemporaryFolder;
 
diff --git a/core/src/test/java/com/netflix/iceberg/TestTables.java b/core/src/test/java/com/netflix/iceberg/TestTables.java
index f1dbe4a..fbb58d4 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTables.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTables.java
@@ -24,6 +24,7 @@
 import com.netflix.iceberg.exceptions.AlreadyExistsException;
 import com.netflix.iceberg.exceptions.CommitFailedException;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import java.io.File;
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
index cd1a0af..1991d29 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
@@ -59,8 +59,7 @@ public String shortName() {
   public DataSourceReader createReader(DataSourceOptions options) {
     Configuration conf = new Configuration(lazyBaseConf());
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
-
-    return new Reader(table, conf);
+    return new Reader(table);
   }
 
   @Override
@@ -92,7 +91,7 @@ public DataSourceReader createReader(DataSourceOptions options) {
           .toUpperCase(Locale.ENGLISH));
     }
 
-    return Optional.of(new Writer(table, conf, format));
+    return Optional.of(new Writer(table, format));
   }
 
   protected Table findTable(DataSourceOptions options, Configuration conf) {
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
index 4a008ee..33b95c1 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Lists;
 import com.netflix.iceberg.CombinedScanTask;
 import com.netflix.iceberg.DataFile;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.FileScanTask;
 import com.netflix.iceberg.PartitionField;
 import com.netflix.iceberg.PartitionSpec;
@@ -34,7 +35,6 @@
 import com.netflix.iceberg.common.DynMethods;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.expressions.Expression;
-import com.netflix.iceberg.hadoop.HadoopInputFile;
 import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.parquet.Parquet;
@@ -44,7 +44,6 @@
 import com.netflix.iceberg.spark.data.SparkParquetReaders;
 import com.netflix.iceberg.types.TypeUtil;
 import com.netflix.iceberg.types.Types;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.catalyst.expressions.AttributeReference;
@@ -67,7 +66,6 @@
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
-import org.apache.spark.util.SerializableConfiguration;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
@@ -89,7 +87,7 @@
   private static final Filter[] NO_FILTERS = new Filter[0];
 
   private final Table table;
-  private final SerializableConfiguration conf;
+  private final FileIO fileIo;
   private StructType requestedSchema = null;
   private List<Expression> filterExpressions = null;
   private Filter[] pushedFilters = NO_FILTERS;
@@ -99,10 +97,10 @@
   private StructType type = null; // cached because Spark accesses it multiple times
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
 
-  Reader(Table table, Configuration conf) {
+  Reader(Table table) {
     this.table = table;
-    this.conf = new SerializableConfiguration(conf);
     this.schema = table.schema();
+    this.fileIo = table.io();
   }
 
   private Schema lazySchema() {
@@ -135,7 +133,7 @@ public StructType readSchema() {
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, conf));
+      readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo));
     }
 
     return readTasks;
@@ -228,22 +226,22 @@ public String toString() {
     private final CombinedScanTask task;
     private final String tableSchemaString;
     private final String expectedSchemaString;
-    private final SerializableConfiguration conf;
+    private final FileIO fileIo;
 
     private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
 
-    private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
-                     SerializableConfiguration conf) {
+    private ReadTask(
+        CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo) {
       this.task = task;
       this.tableSchemaString = tableSchemaString;
       this.expectedSchemaString = expectedSchemaString;
-      this.conf = conf;
+      this.fileIo = fileIo;
     }
 
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
-      return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), conf.value());
+      return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo);
     }
 
     private Schema lazyTableSchema() {
@@ -270,18 +268,18 @@ private Schema lazyExpectedSchema() {
     private final Iterator<FileScanTask> tasks;
     private final Schema tableSchema;
     private final Schema expectedSchema;
-    private final Configuration conf;
+    private final FileIO fileIo;
 
     private Iterator<InternalRow> currentIterator = null;
     private Closeable currentCloseable = null;
     private InternalRow current = null;
 
-    public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, Configuration conf) {
+    public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo) {
+      this.fileIo = fileIo;
       this.tasks = task.files().iterator();
       this.tableSchema = tableSchema;
       this.expectedSchema = expectedSchema;
-      this.conf = conf;
-      // open last because the schemas and conf must be set
+      // open last because the schemas and fileIo must be set
       this.currentIterator = open(tasks.next());
     }
 
@@ -346,17 +344,17 @@ public void close() throws IOException {
 
         // create joined rows and project from the joined schema to the final schema
         iterSchema = TypeUtil.join(readSchema, partitionSchema);
-        iter = transform(open(task, readSchema, conf), joined::withLeft);
+        iter = transform(open(task, readSchema), joined::withLeft);
 
       } else if (hasExtraFilterColumns) {
         // add projection to the final schema
         iterSchema = requiredSchema;
-        iter = open(task, requiredSchema, conf);
+        iter = open(task, requiredSchema);
 
       } else {
         // return the base iterator
         iterSchema = finalSchema;
-        iter = open(task, finalSchema, conf);
+        iter = open(task, finalSchema);
       }
 
       // TODO: remove the projection by reporting the iterator's schema back to Spark
@@ -386,9 +384,8 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema
           asScalaBufferConverter(attrs).asScala().toSeq());
     }
 
-    private Iterator<InternalRow> open(FileScanTask task, Schema readSchema,
-                                       Configuration conf) {
-      InputFile location = HadoopInputFile.fromLocation(task.file().path(), conf);
+    private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
+      InputFile location = fileIo.newInputFile(task.file().path().toString());
       CloseableIterable<InternalRow> iter;
       switch (task.file().format()) {
         case PARQUET:
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
index c9d3a7b..902ba80 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
@@ -28,6 +28,7 @@
 import com.netflix.iceberg.DataFile;
 import com.netflix.iceberg.DataFiles;
 import com.netflix.iceberg.FileFormat;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.Metrics;
 import com.netflix.iceberg.PartitionSpec;
 import com.netflix.iceberg.Schema;
@@ -35,8 +36,6 @@
 import com.netflix.iceberg.TableProperties;
 import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.hadoop.HadoopInputFile;
-import com.netflix.iceberg.hadoop.HadoopOutputFile;
 import com.netflix.iceberg.io.FileAppender;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
@@ -46,8 +45,6 @@
 import com.netflix.iceberg.transforms.Transforms;
 import com.netflix.iceberg.types.Types.StringType;
 import com.netflix.iceberg.util.Tasks;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -55,7 +52,6 @@
 import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-import org.apache.spark.util.SerializableConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.Closeable;
@@ -89,18 +85,18 @@
   private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
 
   private final Table table;
-  private final Configuration conf;
   private final FileFormat format;
+  private final FileIO fileIo;
 
-  Writer(Table table, Configuration conf, FileFormat format) {
+  Writer(Table table, FileFormat format) {
     this.table = table;
-    this.conf = conf;
     this.format = format;
+    this.fileIo = table.io();
   }
 
   @Override
   public DataWriterFactory<InternalRow> createWriterFactory() {
-    return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), conf);
+    return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), fileIo);
   }
 
   @Override
@@ -122,13 +118,6 @@ public void commit(WriterCommitMessage[] messages) {
 
   @Override
   public void abort(WriterCommitMessage[] messages) {
-    FileSystem fs;
-    try {
-      fs = new Path(table.location()).getFileSystem(conf);
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
-
     Tasks.foreach(files(messages))
         .retry(propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
         .exponentialBackoff(
@@ -138,11 +127,7 @@ public void abort(WriterCommitMessage[] messages) {
             2.0 /* exponential */ )
         .throwFailureWhenFinished()
         .run(file -> {
-          try {
-            fs.delete(new Path(file.path().toString()), false /* not recursive */ );
-          } catch (IOException e) {
-            throw new RuntimeIOException(e);
-          }
+          fileIo.deleteFile(file.path().toString());
         });
   }
 
@@ -165,9 +150,10 @@ private int propertyAsInt(String property, int defaultValue) {
   }
 
   private String dataLocation() {
-    return table.properties().getOrDefault(
-        TableProperties.WRITE_NEW_DATA_LOCATION,
-        new Path(new Path(table.location()), "data").toString());
+    return stripTrailingSlash(
+        table.properties().getOrDefault(
+            TableProperties.WRITE_NEW_DATA_LOCATION,
+            String.format("%s/data", table.location())));
   }
 
   @Override
@@ -202,18 +188,16 @@ public String toString() {
     private final FileFormat format;
     private final String dataLocation;
     private final Map<String, String> properties;
-    private final SerializableConfiguration conf;
     private final String uuid = UUID.randomUUID().toString();
-
-    private transient Path dataPath = null;
+    private final FileIO fileIo;
 
     WriterFactory(PartitionSpec spec, FileFormat format, String dataLocation,
-                  Map<String, String> properties, Configuration conf) {
+                  Map<String, String> properties, FileIO fileIo) {
       this.spec = spec;
       this.format = format;
       this.dataLocation = dataLocation;
       this.properties = properties;
-      this.conf = new SerializableConfiguration(conf);
+      this.fileIo = fileIo;
     }
 
     @Override
@@ -221,12 +205,10 @@ public String toString() {
       String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid));
       AppenderFactory<InternalRow> factory = new SparkAppenderFactory();
       if (spec.fields().isEmpty()) {
-        return new UnpartitionedWriter(lazyDataPath(), filename, format, conf.value(), factory);
-
+        return new UnpartitionedWriter(dataLocation, filename, format, factory, fileIo);
       } else {
-        Path baseDataPath = lazyDataPath(); // avoid calling this in the output path function
-        Function<PartitionKey, Path> outputPathFunc = key ->
-            new Path(new Path(baseDataPath, key.toPath()), filename);
+        Function<PartitionKey, String> outputPathFunc = key ->
+            String.format("%s/%s/%s", dataLocation, key.toPath(), filename);
 
         boolean useObjectStorage = (
             Boolean.parseBoolean(properties.get(OBJECT_STORE_ENABLED)) ||
@@ -235,43 +217,46 @@ public String toString() {
 
         if (useObjectStorage) {
           // try to get db and table portions of the path for context in the object store
-          String context = pathContext(baseDataPath);
-          String objectStore = properties.get(OBJECT_STORE_PATH);
+          String context = pathContext(new Path(dataLocation));
+          String objectStore = stripTrailingSlash(properties.get(OBJECT_STORE_PATH));
           Preconditions.checkNotNull(objectStore,
               "Cannot use object storage, missing location: " + OBJECT_STORE_PATH);
-          Path objectStorePath = new Path(objectStore);
 
           outputPathFunc = key -> {
-            String partitionAndFilename = key.toPath() + "/" + filename;
+            String partitionAndFilename = String.format("%s/%s", key.toPath(), filename);
             int hash = HASH_FUNC.apply(partitionAndFilename);
-            return new Path(objectStorePath,
-                String.format("%08x/%s/%s", hash, context, partitionAndFilename));
+            return String.format(
+                "%s/%08x/%s/%s/%s",
+                objectStore,
+                hash,
+                context,
+                key.toPath(),
+                filename);
           };
         }
 
-        return new PartitionedWriter(spec, format, conf.value(), factory, outputPathFunc);
+        return new PartitionedWriter(spec, format, factory, outputPathFunc, fileIo);
       }
     }
 
     private static String pathContext(Path dataPath) {
       Path parent = dataPath.getParent();
+      String resolvedContext;
       if (parent != null) {
         // remove the data folder
         if (dataPath.getName().equals("data")) {
-          return pathContext(parent);
+          resolvedContext = pathContext(parent);
+        } else {
+          resolvedContext = String.format("%s/%s", parent.getName(), dataPath.getName());
         }
-
-        return parent.getName() + "/" + dataPath.getName();
+      } else {
+        resolvedContext = dataPath.getName();
       }
 
-      return dataPath.getName();
-    }
-
-    private Path lazyDataPath() {
-      if (dataPath == null) {
-        this.dataPath = new Path(dataLocation);
-      }
-      return dataPath;
+      Preconditions.checkState(
+          !resolvedContext.endsWith("/"),
+          "Path context must not end with a slash.");
+      return resolvedContext;
     }
 
     private class SparkAppenderFactory implements AppenderFactory<InternalRow> {
@@ -314,16 +299,20 @@ private Path lazyDataPath() {
   }
 
   private static class UnpartitionedWriter implements DataWriter<InternalRow>, Closeable {
-    private final Path file;
-    private final Configuration conf;
+    private final FileIO fileIo;
+    private final String file;
     private FileAppender<InternalRow> appender = null;
     private Metrics metrics = null;
 
-    UnpartitionedWriter(Path dataPath, String filename, FileFormat format,
-                        Configuration conf, AppenderFactory<InternalRow> factory) {
-      this.file = new Path(dataPath, filename);
-      this.appender = factory.newAppender(HadoopOutputFile.fromPath(file, conf), format);
-      this.conf = conf;
+    UnpartitionedWriter(
+        String dataPath,
+        String filename,
+        FileFormat format,
+        AppenderFactory<InternalRow> factory,
+        FileIO fileIo) {
+      this.file = String.format("%s/%s", dataPath, filename);
+      this.fileIo = fileIo;
+      this.appender = factory.newAppender(fileIo.newOutputFile(file), format);
     }
 
     @Override
@@ -338,12 +327,11 @@ public WriterCommitMessage commit() throws IOException {
       close();
 
       if (metrics.recordCount() == 0L) {
-        FileSystem fs = file.getFileSystem(conf);
-        fs.delete(file, false);
+        fileIo.deleteFile(file);
         return new TaskCommit();
       }
 
-      InputFile inFile = HadoopInputFile.fromPath(file, conf);
+      InputFile inFile = fileIo.newInputFile(file);
       DataFile dataFile = DataFiles.fromInputFile(inFile, null, metrics);
 
       return new TaskCommit(dataFile);
@@ -354,9 +342,7 @@ public void abort() throws IOException {
       Preconditions.checkArgument(appender != null, "Abort called on a closed writer: %s", this);
 
       close();
-
-      FileSystem fs = file.getFileSystem(conf);
-      fs.delete(file, false);
+      fileIo.deleteFile(file);
     }
 
     @Override
@@ -374,24 +360,27 @@ public void close() throws IOException {
     private final List<DataFile> completedFiles = Lists.newArrayList();
     private final PartitionSpec spec;
     private final FileFormat format;
-    private final Configuration conf;
     private final AppenderFactory<InternalRow> factory;
-    private final Function<PartitionKey, Path> outputPathFunc;
+    private final Function<PartitionKey, String> outputPathFunc;
     private final PartitionKey key;
+    private final FileIO fileIo;
 
     private PartitionKey currentKey = null;
     private FileAppender<InternalRow> currentAppender = null;
-    private Path currentPath = null;
-
-    PartitionedWriter(PartitionSpec spec, FileFormat format, Configuration conf,
-                      AppenderFactory<InternalRow> factory,
-                      Function<PartitionKey, Path> outputPathFunc) {
+    private String currentPath = null;
+
+    PartitionedWriter(
+        PartitionSpec spec,
+        FileFormat format,
+        AppenderFactory<InternalRow> factory,
+        Function<PartitionKey, String> outputPathFunc,
+        FileIO fileIo) {
       this.spec = spec;
       this.format = format;
-      this.conf = conf;
       this.factory = factory;
       this.outputPathFunc = outputPathFunc;
       this.key = new PartitionKey(spec);
+      this.fileIo = fileIo;
     }
 
     @Override
@@ -410,7 +399,7 @@ public void write(InternalRow row) throws IOException {
 
         this.currentKey = key.copy();
         this.currentPath = outputPathFunc.apply(currentKey);
-        OutputFile file = HadoopOutputFile.fromPath(currentPath, conf);
+        OutputFile file = fileIo.newOutputFile(currentPath.toString());
         this.currentAppender = factory.newAppender(file, format);
       }
 
@@ -425,18 +414,16 @@ public WriterCommitMessage commit() throws IOException {
 
     @Override
     public void abort() throws IOException {
-      FileSystem fs = currentPath.getFileSystem(conf);
-
       // clean up files created by this writer
       Tasks.foreach(completedFiles)
           .throwFailureWhenFinished()
           .noRetry()
-          .run(file -> fs.delete(new Path(file.path().toString())), IOException.class);
+          .run(file -> fileIo.deleteFile(file.path().toString()));
 
       if (currentAppender != null) {
         currentAppender.close();
         this.currentAppender = null;
-        fs.delete(currentPath);
+        fileIo.deleteFile(currentPath);
       }
     }
 
@@ -447,7 +434,7 @@ private void closeCurrent() throws IOException {
         Metrics metrics = currentAppender.metrics();
         this.currentAppender = null;
 
-        InputFile inFile = HadoopInputFile.fromPath(currentPath, conf);
+        InputFile inFile = fileIo.newInputFile(currentPath);
         DataFile dataFile = DataFiles.builder(spec)
             .withInputFile(inFile)
             .withPartition(currentKey)
@@ -459,4 +446,12 @@ private void closeCurrent() throws IOException {
       }
     }
   }
+
+  private static String stripTrailingSlash(String path) {
+    String result = path;
+    while (result.endsWith("/")) {
+      result = result.substring(0, path.length() - 1);
+    }
+    return result;
+  }
 }
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
index 90b6dc8..c18636f 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
@@ -21,7 +21,7 @@
 
 import com.google.common.collect.Maps;
 import com.netflix.iceberg.BaseTable;
-import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.Files;
 import com.netflix.iceberg.PartitionSpec;
 import com.netflix.iceberg.Schema;
@@ -34,7 +34,6 @@
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import java.io.File;
-import java.io.IOException;
 import java.util.Map;
 
 // TODO: Use the copy of this from core.
@@ -73,7 +72,8 @@ private TestTable(TestTableOperations ops, String name) {
       this.ops = ops;
     }
 
-    TestTableOperations ops() {
+    @Override
+    public TestTableOperations operations() {
       return ops;
     }
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org