You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/01/09 00:55:58 UTC
[incubator-iceberg] branch master updated: Use the FileIO submodule
in Spark writers and readers. (#52)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 67fbe5d Use the FileIO submodule in Spark writers and readers. (#52)
67fbe5d is described below
commit 67fbe5da00c1e3da5893c462b4b40e274b4b1fba
Author: mccheah <mc...@palantir.com>
AuthorDate: Tue Jan 8 16:55:54 2019 -0800
Use the FileIO submodule in Spark writers and readers. (#52)
---
api/src/main/java/com/netflix/iceberg/Table.java | 7 +
.../main/java/com/netflix/iceberg/io}/FileIO.java | 2 +-
.../iceberg/BaseMetastoreTableOperations.java | 1 +
.../main/java/com/netflix/iceberg/BaseTable.java | 6 +
.../java/com/netflix/iceberg/BaseTransaction.java | 6 +
.../java/com/netflix/iceberg/TableOperations.java | 5 +-
.../com/netflix/iceberg/hadoop/HadoopFileIO.java | 4 +-
.../iceberg/hadoop/HadoopTableOperations.java | 2 +-
.../com/netflix/iceberg/LocalTableOperations.java | 1 +
.../test/java/com/netflix/iceberg/TestTables.java | 1 +
.../iceberg/spark/source/IcebergSource.java | 5 +-
.../com/netflix/iceberg/spark/source/Reader.java | 41 +++---
.../com/netflix/iceberg/spark/source/Writer.java | 153 ++++++++++-----------
.../netflix/iceberg/spark/source/TestTables.java | 6 +-
14 files changed, 125 insertions(+), 115 deletions(-)
diff --git a/api/src/main/java/com/netflix/iceberg/Table.java b/api/src/main/java/com/netflix/iceberg/Table.java
index fe19fa2..9acb50d 100644
--- a/api/src/main/java/com/netflix/iceberg/Table.java
+++ b/api/src/main/java/com/netflix/iceberg/Table.java
@@ -19,6 +19,7 @@
package com.netflix.iceberg;
+import com.netflix.iceberg.io.FileIO;
import java.util.Map;
/**
@@ -171,4 +172,10 @@ public interface Table {
* @return a new {@link Transaction}
*/
Transaction newTransaction();
+
+ /**
+ * @return a {@link FileIO} to read and write table data and metadata files
+ */
+ FileIO io();
+
}
diff --git a/core/src/main/java/com/netflix/iceberg/FileIO.java b/api/src/main/java/com/netflix/iceberg/io/FileIO.java
similarity index 98%
rename from core/src/main/java/com/netflix/iceberg/FileIO.java
rename to api/src/main/java/com/netflix/iceberg/io/FileIO.java
index fdba7af..ed859b9 100644
--- a/core/src/main/java/com/netflix/iceberg/FileIO.java
+++ b/api/src/main/java/com/netflix/iceberg/io/FileIO.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package com.netflix.iceberg;
+package com.netflix.iceberg.io;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
diff --git a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
index b107d0b..3e9b420 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
@@ -22,6 +22,7 @@ package com.netflix.iceberg;
import com.google.common.base.Objects;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.hadoop.HadoopFileIO;
+import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.util.Tasks;
import org.apache.hadoop.conf.Configuration;
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTable.java b/core/src/main/java/com/netflix/iceberg/BaseTable.java
index da11b55..7d48ef2 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTable.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTable.java
@@ -19,6 +19,7 @@
package com.netflix.iceberg;
+import com.netflix.iceberg.io.FileIO;
import java.util.Map;
/**
@@ -136,6 +137,11 @@ public class BaseTable implements Table, HasTableOperations {
}
@Override
+ public FileIO io() {
+ return operations().io();
+ }
+
+ @Override
public String toString() {
return name;
}
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
index 1a56b7e..b7c3a32 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.iceberg.exceptions.CommitFailedException;
+import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.util.Tasks;
import java.util.List;
import java.util.Map;
@@ -365,5 +366,10 @@ class BaseTransaction implements Transaction {
public Transaction newTransaction() {
throw new UnsupportedOperationException("Cannot create a transaction within a transaction");
}
+
+ @Override
+ public FileIO io() {
+ return transactionOps.io();
+ }
}
}
diff --git a/core/src/main/java/com/netflix/iceberg/TableOperations.java b/core/src/main/java/com/netflix/iceberg/TableOperations.java
index 19fc386..974d5e2 100644
--- a/core/src/main/java/com/netflix/iceberg/TableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/TableOperations.java
@@ -19,10 +19,9 @@
package com.netflix.iceberg;
+import com.netflix.iceberg.io.FileIO;
import java.util.UUID;
-import com.netflix.iceberg.io.OutputFile;
-
/**
* SPI interface to abstract table metadata access and updates.
*/
@@ -57,7 +56,7 @@ public interface TableOperations {
void commit(TableMetadata base, TableMetadata metadata);
/**
- * @return a {@link com.netflix.iceberg.FileIO} to read and write table data and metadata files
+ * @return a {@link FileIO} to read and write table data and metadata files
*/
FileIO io();
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
index 586942c..7e1d004 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
@@ -1,6 +1,6 @@
package com.netflix.iceberg.hadoop;
-import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
@@ -9,8 +9,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
public class HadoopFileIO implements FileIO {
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
index 4aa19f4..1a3b0fd 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
@@ -19,7 +19,7 @@
package com.netflix.iceberg.hadoop;
-import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.TableMetadata;
import com.netflix.iceberg.TableMetadataParser;
import com.netflix.iceberg.TableOperations;
diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
index 1508ee8..baa286f 100644
--- a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
+++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
@@ -21,6 +21,7 @@ package com.netflix.iceberg;
import com.google.common.collect.Maps;
import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.FileIO;
import java.util.Map;
import org.junit.rules.TemporaryFolder;
diff --git a/core/src/test/java/com/netflix/iceberg/TestTables.java b/core/src/test/java/com/netflix/iceberg/TestTables.java
index f1dbe4a..fbb58d4 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTables.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTables.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
import com.netflix.iceberg.exceptions.AlreadyExistsException;
import com.netflix.iceberg.exceptions.CommitFailedException;
import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import java.io.File;
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
index cd1a0af..1991d29 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
@@ -59,8 +59,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
public DataSourceReader createReader(DataSourceOptions options) {
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
-
- return new Reader(table, conf);
+ return new Reader(table);
}
@Override
@@ -92,7 +91,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
.toUpperCase(Locale.ENGLISH));
}
- return Optional.of(new Writer(table, conf, format));
+ return Optional.of(new Writer(table, format));
}
protected Table findTable(DataSourceOptions options, Configuration conf) {
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
index 4a008ee..33b95c1 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
@@ -22,6 +22,7 @@ package com.netflix.iceberg.spark.source;
import com.google.common.collect.Lists;
import com.netflix.iceberg.CombinedScanTask;
import com.netflix.iceberg.DataFile;
+import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.FileScanTask;
import com.netflix.iceberg.PartitionField;
import com.netflix.iceberg.PartitionSpec;
@@ -34,7 +35,6 @@ import com.netflix.iceberg.avro.Avro;
import com.netflix.iceberg.common.DynMethods;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.expressions.Expression;
-import com.netflix.iceberg.hadoop.HadoopInputFile;
import com.netflix.iceberg.io.CloseableIterable;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.parquet.Parquet;
@@ -44,7 +44,6 @@ import com.netflix.iceberg.spark.data.SparkAvroReader;
import com.netflix.iceberg.spark.data.SparkParquetReaders;
import com.netflix.iceberg.types.TypeUtil;
import com.netflix.iceberg.types.Types;
-import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
@@ -67,7 +66,6 @@ import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
-import org.apache.spark.util.SerializableConfiguration;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
@@ -89,7 +87,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
private static final Filter[] NO_FILTERS = new Filter[0];
private final Table table;
- private final SerializableConfiguration conf;
+ private final FileIO fileIo;
private StructType requestedSchema = null;
private List<Expression> filterExpressions = null;
private Filter[] pushedFilters = NO_FILTERS;
@@ -99,10 +97,10 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
private StructType type = null; // cached because Spark accesses it multiple times
private List<CombinedScanTask> tasks = null; // lazy cache of tasks
- Reader(Table table, Configuration conf) {
+ Reader(Table table) {
this.table = table;
- this.conf = new SerializableConfiguration(conf);
this.schema = table.schema();
+ this.fileIo = table.io();
}
private Schema lazySchema() {
@@ -135,7 +133,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
- readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, conf));
+ readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo));
}
return readTasks;
@@ -228,22 +226,22 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
private final CombinedScanTask task;
private final String tableSchemaString;
private final String expectedSchemaString;
- private final SerializableConfiguration conf;
+ private final FileIO fileIo;
private transient Schema tableSchema = null;
private transient Schema expectedSchema = null;
- private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
- SerializableConfiguration conf) {
+ private ReadTask(
+ CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo) {
this.task = task;
this.tableSchemaString = tableSchemaString;
this.expectedSchemaString = expectedSchemaString;
- this.conf = conf;
+ this.fileIo = fileIo;
}
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
- return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), conf.value());
+ return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo);
}
private Schema lazyTableSchema() {
@@ -270,18 +268,18 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
private final Iterator<FileScanTask> tasks;
private final Schema tableSchema;
private final Schema expectedSchema;
- private final Configuration conf;
+ private final FileIO fileIo;
private Iterator<InternalRow> currentIterator = null;
private Closeable currentCloseable = null;
private InternalRow current = null;
- public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, Configuration conf) {
+ public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo) {
+ this.fileIo = fileIo;
this.tasks = task.files().iterator();
this.tableSchema = tableSchema;
this.expectedSchema = expectedSchema;
- this.conf = conf;
- // open last because the schemas and conf must be set
+ // open last because the schemas and fileIo must be set
this.currentIterator = open(tasks.next());
}
@@ -346,17 +344,17 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
// create joined rows and project from the joined schema to the final schema
iterSchema = TypeUtil.join(readSchema, partitionSchema);
- iter = transform(open(task, readSchema, conf), joined::withLeft);
+ iter = transform(open(task, readSchema), joined::withLeft);
} else if (hasExtraFilterColumns) {
// add projection to the final schema
iterSchema = requiredSchema;
- iter = open(task, requiredSchema, conf);
+ iter = open(task, requiredSchema);
} else {
// return the base iterator
iterSchema = finalSchema;
- iter = open(task, finalSchema, conf);
+ iter = open(task, finalSchema);
}
// TODO: remove the projection by reporting the iterator's schema back to Spark
@@ -386,9 +384,8 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
asScalaBufferConverter(attrs).asScala().toSeq());
}
- private Iterator<InternalRow> open(FileScanTask task, Schema readSchema,
- Configuration conf) {
- InputFile location = HadoopInputFile.fromLocation(task.file().path(), conf);
+ private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
+ InputFile location = fileIo.newInputFile(task.file().path().toString());
CloseableIterable<InternalRow> iter;
switch (task.file().format()) {
case PARQUET:
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
index c9d3a7b..902ba80 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
@@ -28,6 +28,7 @@ import com.netflix.iceberg.AppendFiles;
import com.netflix.iceberg.DataFile;
import com.netflix.iceberg.DataFiles;
import com.netflix.iceberg.FileFormat;
+import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.Metrics;
import com.netflix.iceberg.PartitionSpec;
import com.netflix.iceberg.Schema;
@@ -35,8 +36,6 @@ import com.netflix.iceberg.Table;
import com.netflix.iceberg.TableProperties;
import com.netflix.iceberg.avro.Avro;
import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.hadoop.HadoopInputFile;
-import com.netflix.iceberg.hadoop.HadoopOutputFile;
import com.netflix.iceberg.io.FileAppender;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
@@ -46,8 +45,6 @@ import com.netflix.iceberg.transforms.Transform;
import com.netflix.iceberg.transforms.Transforms;
import com.netflix.iceberg.types.Types.StringType;
import com.netflix.iceberg.util.Tasks;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -55,7 +52,6 @@ import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
@@ -89,18 +85,18 @@ class Writer implements DataSourceWriter {
private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
private final Table table;
- private final Configuration conf;
private final FileFormat format;
+ private final FileIO fileIo;
- Writer(Table table, Configuration conf, FileFormat format) {
+ Writer(Table table, FileFormat format) {
this.table = table;
- this.conf = conf;
this.format = format;
+ this.fileIo = table.io();
}
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
- return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), conf);
+ return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), fileIo);
}
@Override
@@ -122,13 +118,6 @@ class Writer implements DataSourceWriter {
@Override
public void abort(WriterCommitMessage[] messages) {
- FileSystem fs;
- try {
- fs = new Path(table.location()).getFileSystem(conf);
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
-
Tasks.foreach(files(messages))
.retry(propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
.exponentialBackoff(
@@ -138,11 +127,7 @@ class Writer implements DataSourceWriter {
2.0 /* exponential */ )
.throwFailureWhenFinished()
.run(file -> {
- try {
- fs.delete(new Path(file.path().toString()), false /* not recursive */ );
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
+ fileIo.deleteFile(file.path().toString());
});
}
@@ -165,9 +150,10 @@ class Writer implements DataSourceWriter {
}
private String dataLocation() {
- return table.properties().getOrDefault(
- TableProperties.WRITE_NEW_DATA_LOCATION,
- new Path(new Path(table.location()), "data").toString());
+ return stripTrailingSlash(
+ table.properties().getOrDefault(
+ TableProperties.WRITE_NEW_DATA_LOCATION,
+ String.format("%s/data", table.location())));
}
@Override
@@ -202,18 +188,16 @@ class Writer implements DataSourceWriter {
private final FileFormat format;
private final String dataLocation;
private final Map<String, String> properties;
- private final SerializableConfiguration conf;
private final String uuid = UUID.randomUUID().toString();
-
- private transient Path dataPath = null;
+ private final FileIO fileIo;
WriterFactory(PartitionSpec spec, FileFormat format, String dataLocation,
- Map<String, String> properties, Configuration conf) {
+ Map<String, String> properties, FileIO fileIo) {
this.spec = spec;
this.format = format;
this.dataLocation = dataLocation;
this.properties = properties;
- this.conf = new SerializableConfiguration(conf);
+ this.fileIo = fileIo;
}
@Override
@@ -221,12 +205,10 @@ class Writer implements DataSourceWriter {
String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid));
AppenderFactory<InternalRow> factory = new SparkAppenderFactory();
if (spec.fields().isEmpty()) {
- return new UnpartitionedWriter(lazyDataPath(), filename, format, conf.value(), factory);
-
+ return new UnpartitionedWriter(dataLocation, filename, format, factory, fileIo);
} else {
- Path baseDataPath = lazyDataPath(); // avoid calling this in the output path function
- Function<PartitionKey, Path> outputPathFunc = key ->
- new Path(new Path(baseDataPath, key.toPath()), filename);
+ Function<PartitionKey, String> outputPathFunc = key ->
+ String.format("%s/%s/%s", dataLocation, key.toPath(), filename);
boolean useObjectStorage = (
Boolean.parseBoolean(properties.get(OBJECT_STORE_ENABLED)) ||
@@ -235,43 +217,46 @@ class Writer implements DataSourceWriter {
if (useObjectStorage) {
// try to get db and table portions of the path for context in the object store
- String context = pathContext(baseDataPath);
- String objectStore = properties.get(OBJECT_STORE_PATH);
+ String context = pathContext(new Path(dataLocation));
+ String objectStore = stripTrailingSlash(properties.get(OBJECT_STORE_PATH));
Preconditions.checkNotNull(objectStore,
"Cannot use object storage, missing location: " + OBJECT_STORE_PATH);
- Path objectStorePath = new Path(objectStore);
outputPathFunc = key -> {
- String partitionAndFilename = key.toPath() + "/" + filename;
+ String partitionAndFilename = String.format("%s/%s", key.toPath(), filename);
int hash = HASH_FUNC.apply(partitionAndFilename);
- return new Path(objectStorePath,
- String.format("%08x/%s/%s", hash, context, partitionAndFilename));
+ return String.format(
+ "%s/%08x/%s/%s/%s",
+ objectStore,
+ hash,
+ context,
+ key.toPath(),
+ filename);
};
}
- return new PartitionedWriter(spec, format, conf.value(), factory, outputPathFunc);
+ return new PartitionedWriter(spec, format, factory, outputPathFunc, fileIo);
}
}
private static String pathContext(Path dataPath) {
Path parent = dataPath.getParent();
+ String resolvedContext;
if (parent != null) {
// remove the data folder
if (dataPath.getName().equals("data")) {
- return pathContext(parent);
+ resolvedContext = pathContext(parent);
+ } else {
+ resolvedContext = String.format("%s/%s", parent.getName(), dataPath.getName());
}
-
- return parent.getName() + "/" + dataPath.getName();
+ } else {
+ resolvedContext = dataPath.getName();
}
- return dataPath.getName();
- }
-
- private Path lazyDataPath() {
- if (dataPath == null) {
- this.dataPath = new Path(dataLocation);
- }
- return dataPath;
+ Preconditions.checkState(
+ !resolvedContext.endsWith("/"),
+ "Path context must not end with a slash.");
+ return resolvedContext;
}
private class SparkAppenderFactory implements AppenderFactory<InternalRow> {
@@ -314,16 +299,20 @@ class Writer implements DataSourceWriter {
}
private static class UnpartitionedWriter implements DataWriter<InternalRow>, Closeable {
- private final Path file;
- private final Configuration conf;
+ private final FileIO fileIo;
+ private final String file;
private FileAppender<InternalRow> appender = null;
private Metrics metrics = null;
- UnpartitionedWriter(Path dataPath, String filename, FileFormat format,
- Configuration conf, AppenderFactory<InternalRow> factory) {
- this.file = new Path(dataPath, filename);
- this.appender = factory.newAppender(HadoopOutputFile.fromPath(file, conf), format);
- this.conf = conf;
+ UnpartitionedWriter(
+ String dataPath,
+ String filename,
+ FileFormat format,
+ AppenderFactory<InternalRow> factory,
+ FileIO fileIo) {
+ this.file = String.format("%s/%s", dataPath, filename);
+ this.fileIo = fileIo;
+ this.appender = factory.newAppender(fileIo.newOutputFile(file), format);
}
@Override
@@ -338,12 +327,11 @@ class Writer implements DataSourceWriter {
close();
if (metrics.recordCount() == 0L) {
- FileSystem fs = file.getFileSystem(conf);
- fs.delete(file, false);
+ fileIo.deleteFile(file);
return new TaskCommit();
}
- InputFile inFile = HadoopInputFile.fromPath(file, conf);
+ InputFile inFile = fileIo.newInputFile(file);
DataFile dataFile = DataFiles.fromInputFile(inFile, null, metrics);
return new TaskCommit(dataFile);
@@ -354,9 +342,7 @@ class Writer implements DataSourceWriter {
Preconditions.checkArgument(appender != null, "Abort called on a closed writer: %s", this);
close();
-
- FileSystem fs = file.getFileSystem(conf);
- fs.delete(file, false);
+ fileIo.deleteFile(file);
}
@Override
@@ -374,24 +360,27 @@ class Writer implements DataSourceWriter {
private final List<DataFile> completedFiles = Lists.newArrayList();
private final PartitionSpec spec;
private final FileFormat format;
- private final Configuration conf;
private final AppenderFactory<InternalRow> factory;
- private final Function<PartitionKey, Path> outputPathFunc;
+ private final Function<PartitionKey, String> outputPathFunc;
private final PartitionKey key;
+ private final FileIO fileIo;
private PartitionKey currentKey = null;
private FileAppender<InternalRow> currentAppender = null;
- private Path currentPath = null;
-
- PartitionedWriter(PartitionSpec spec, FileFormat format, Configuration conf,
- AppenderFactory<InternalRow> factory,
- Function<PartitionKey, Path> outputPathFunc) {
+ private String currentPath = null;
+
+ PartitionedWriter(
+ PartitionSpec spec,
+ FileFormat format,
+ AppenderFactory<InternalRow> factory,
+ Function<PartitionKey, String> outputPathFunc,
+ FileIO fileIo) {
this.spec = spec;
this.format = format;
- this.conf = conf;
this.factory = factory;
this.outputPathFunc = outputPathFunc;
this.key = new PartitionKey(spec);
+ this.fileIo = fileIo;
}
@Override
@@ -410,7 +399,7 @@ class Writer implements DataSourceWriter {
this.currentKey = key.copy();
this.currentPath = outputPathFunc.apply(currentKey);
- OutputFile file = HadoopOutputFile.fromPath(currentPath, conf);
+ OutputFile file = fileIo.newOutputFile(currentPath.toString());
this.currentAppender = factory.newAppender(file, format);
}
@@ -425,18 +414,16 @@ class Writer implements DataSourceWriter {
@Override
public void abort() throws IOException {
- FileSystem fs = currentPath.getFileSystem(conf);
-
// clean up files created by this writer
Tasks.foreach(completedFiles)
.throwFailureWhenFinished()
.noRetry()
- .run(file -> fs.delete(new Path(file.path().toString())), IOException.class);
+ .run(file -> fileIo.deleteFile(file.path().toString()));
if (currentAppender != null) {
currentAppender.close();
this.currentAppender = null;
- fs.delete(currentPath);
+ fileIo.deleteFile(currentPath);
}
}
@@ -447,7 +434,7 @@ class Writer implements DataSourceWriter {
Metrics metrics = currentAppender.metrics();
this.currentAppender = null;
- InputFile inFile = HadoopInputFile.fromPath(currentPath, conf);
+ InputFile inFile = fileIo.newInputFile(currentPath);
DataFile dataFile = DataFiles.builder(spec)
.withInputFile(inFile)
.withPartition(currentKey)
@@ -459,4 +446,12 @@ class Writer implements DataSourceWriter {
}
}
}
+
+ private static String stripTrailingSlash(String path) {
+ String result = path;
+ while (result.endsWith("/")) {
+ result = result.substring(0, path.length() - 1);
+ }
+ return result;
+ }
}
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
index 90b6dc8..c18636f 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
@@ -21,7 +21,7 @@ package com.netflix.iceberg.spark.source;
import com.google.common.collect.Maps;
import com.netflix.iceberg.BaseTable;
-import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.Files;
import com.netflix.iceberg.PartitionSpec;
import com.netflix.iceberg.Schema;
@@ -34,7 +34,6 @@ import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import java.io.File;
-import java.io.IOException;
import java.util.Map;
// TODO: Use the copy of this from core.
@@ -73,7 +72,8 @@ class TestTables {
this.ops = ops;
}
- TestTableOperations ops() {
+ @Override
+ public TestTableOperations operations() {
return ops;
}
}