You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by GitBox <gi...@apache.org> on 2018/12/11 20:37:01 UTC

[GitHub] rdblue closed pull request #6: Support customizing the location where data is written in Spark

rdblue closed pull request #6: Support customizing the location where data is written in Spark
URL: https://github.com/apache/incubator-iceberg/pull/6
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java
index e522f84..0d99c7e 100644
--- a/core/src/main/java/com/netflix/iceberg/TableProperties.java
+++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java
@@ -67,6 +67,11 @@
 
   public static final String OBJECT_STORE_PATH = "write.object-storage.path";
 
+  // This only applies to files written after this property is set. Files previously written aren't relocated to
+  // reflect this parameter.
+  // If not set, defaults to a "data" folder underneath the root path of the table.
+  public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path";
+
   public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
   public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false;
 }
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
index bed2cf6..c9d3a7b 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
@@ -32,6 +32,7 @@
 import com.netflix.iceberg.PartitionSpec;
 import com.netflix.iceberg.Schema;
 import com.netflix.iceberg.Table;
+import com.netflix.iceberg.TableProperties;
 import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.hadoop.HadoopInputFile;
@@ -164,7 +165,9 @@ private int propertyAsInt(String property, int defaultValue) {
   }
 
   private String dataLocation() {
-    return new Path(new Path(table.location()), "data").toString();
+    return table.properties().getOrDefault(
+        TableProperties.WRITE_NEW_DATA_LOCATION,
+        new Path(new Path(table.location()), "data").toString());
   }
 
   @Override
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java b/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java
index f84c6fe..bc74908 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java
@@ -38,7 +38,7 @@
 
   protected abstract void writeAndValidate(Schema schema) throws IOException;
 
-  private static final StructType SUPPORTED_PRIMITIVES = StructType.of(
+  protected static final StructType SUPPORTED_PRIMITIVES = StructType.of(
       required(100, "id", LongType.get()),
       optional(101, "data", Types.StringType.get()),
       required(102, "b", Types.BooleanType.get()),
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java
index 3b0d32b..05f8f80 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java
@@ -32,10 +32,12 @@
 import com.netflix.iceberg.spark.data.AvroDataTest;
 import com.netflix.iceberg.spark.data.RandomData;
 import com.netflix.iceberg.spark.data.SparkAvroReader;
+import com.netflix.iceberg.types.Types;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameWriter;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -43,10 +45,12 @@
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.List;
 
 import static com.netflix.iceberg.spark.SparkSchemaUtil.convert;
@@ -57,7 +61,7 @@
 public class TestDataFrameWrites extends AvroDataTest {
   private static final Configuration CONF = new Configuration();
 
-  private String format = null;
+  private final String format;
 
   @Parameterized.Parameters
   public static Object[][] parameters() {
@@ -90,23 +94,43 @@ public static void stopSpark() {
 
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
+    File location = createTableFolder();
+    Table table = createTable(schema, location);
+    writeAndValidateWithLocations(table, location, new File(location, "data"));
+  }
+
+  @Test
+  public void testWriteWithCustomDataLocation() throws IOException {
+    File location = createTableFolder();
+    File tablePropertyDataLocation = temp.newFolder("test-table-property-data-dir");
+    Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location);
+    table.updateProperties().set(
+        TableProperties.WRITE_NEW_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()).commit();
+    writeAndValidateWithLocations(table, location, tablePropertyDataLocation);
+  }
+
+  private File createTableFolder() throws IOException {
     File parent = temp.newFolder("parquet");
     File location = new File(parent, "test");
     Assert.assertTrue("Mkdir should succeed", location.mkdirs());
+    return location;
+  }
 
+  private Table createTable(Schema schema, File location) {
     HadoopTables tables = new HadoopTables(CONF);
-    Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
+    return tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
+  }
+
+  private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir) throws IOException {
     Schema tableSchema = table.schema(); // use the table schema because ids are reassigned
 
     table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
 
     List<Record> expected = RandomData.generateList(tableSchema, 100, 0L);
     Dataset<Row> df = createDataset(expected, tableSchema);
+    DataFrameWriter<?> writer = df.write().format("iceberg").mode("append");
 
-    df.write()
-        .format("iceberg")
-        .mode("append")
-        .save(location.toString());
+    writer.save(location.toString());
 
     table.refresh();
 
@@ -120,6 +144,14 @@ protected void writeAndValidate(Schema schema) throws IOException {
     for (int i = 0; i < expected.size(); i += 1) {
       assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i));
     }
+
+    table.currentSnapshot().addedFiles().forEach(dataFile ->
+        Assert.assertTrue(
+            String.format(
+                "File should have the parent directory %s, but has: %s.",
+                expectedDataDir.getAbsolutePath(),
+                dataFile.path()),
+            URI.create(dataFile.path().toString()).getPath().startsWith(expectedDataDir.getAbsolutePath())));
   }
 
   private Dataset<Row> createDataset(List<Record> records, Schema schema) throws IOException {
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java
index 4f71ead..a2d105d 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java
@@ -71,7 +71,6 @@ public static void stopSpark() {
   public void testBasicWrite() throws IOException {
     File parent = temp.newFolder("parquet");
     File location = new File(parent, "test");
-    location.mkdirs();
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services