You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/04/08 16:19:23 UTC

[incubator-iceberg] branch master updated: Fix binary partition values in Spark. (#146)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 664c8e9  Fix binary partition values in Spark. (#146)
664c8e9 is described below

commit 664c8e93b3ae2383c970c9968f446dd94e391cce
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Mon Apr 8 09:19:18 2019 -0700

    Fix binary partition values in Spark. (#146)
---
 .../org/apache/iceberg/MergingSnapshotUpdate.java  |   2 +-
 .../java/org/apache/iceberg/PartitionData.java     |  13 +++
 .../apache/iceberg/spark/source/PartitionKey.java  |  17 ++++
 .../org/apache/iceberg/spark/source/Reader.java    |   4 +-
 .../iceberg/spark/source/TestPartitionValues.java  | 101 ++++++++++++++++++++-
 5 files changed, 130 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
index bec57c8..8594429 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
@@ -305,7 +305,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
 
   @Override
   protected void cleanUncommitted(Set<ManifestFile> committed) {
-    if (!committed.contains(newManifest)) {
+    if (newManifest != null && !committed.contains(newManifest)) {
       deleteFile(newManifest.path());
       this.newManifest = null;
     }
diff --git a/core/src/main/java/org/apache/iceberg/PartitionData.java b/core/src/main/java/org/apache/iceberg/PartitionData.java
index 614cf19..62e01cc 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionData.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionData.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
@@ -30,6 +31,7 @@ import org.apache.avro.util.Utf8;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
 
 class PartitionData
     implements IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {
@@ -122,6 +124,12 @@ class PartitionData
     if (value instanceof Utf8) {
       // Utf8 is not Serializable
       data[pos] = value.toString();
+    } else if (value instanceof ByteBuffer) {
+      // ByteBuffer is not Serializable
+      ByteBuffer buffer = (ByteBuffer) value;
+      byte[] bytes = new byte[buffer.remaining()];
+      buffer.duplicate().get(bytes);
+      data[pos] = bytes;
     } else {
       data[pos] = value;
     }
@@ -137,6 +145,11 @@ class PartitionData
     if (i >= data.length) {
       return null;
     }
+
+    if (data[i] instanceof byte[]) {
+      return ByteBuffer.wrap((byte[]) data[i]);
+    }
+
     return data[i];
   }
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
index 7f4b1fc..fa3f139 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
 
 import com.google.common.collect.Maps;
 import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -170,6 +171,8 @@ class PartitionKey implements StructLike {
         return new StringAccessor(p, convert(type));
       case DECIMAL:
         return new DecimalAccessor(p, convert(type));
+      case BINARY:
+        return new BytesAccessor(p, convert(type));
       default:
         return new PositionAccessor(p, convert(type));
     }
@@ -276,6 +279,20 @@ class PartitionKey implements StructLike {
     }
   }
 
+  private static class BytesAccessor extends PositionAccessor {
+    private BytesAccessor(int p, DataType type) {
+      super(p, type);
+    }
+
+    @Override
+    public Object get(InternalRow row) {
+      if (row.isNullAt(p)) {
+        return null;
+      }
+      return ByteBuffer.wrap((byte[]) row.get(p, type));
+    }
+  }
+
   private static class Position2Accessor implements Accessor<InternalRow> {
     private final int p0;
     private final int size0;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 0fde726..d241370 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -58,6 +58,7 @@ import org.apache.iceberg.spark.data.SparkAvroReader;
 import org.apache.iceberg.spark.data.SparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.catalyst.expressions.AttributeReference;
@@ -521,8 +522,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
       if (type instanceof StringType) {
         return UTF8String.fromString(value.toString());
       } else if (type instanceof BinaryType) {
-        ByteBuffer buffer = (ByteBuffer) value;
-        return buffer.get(new byte[buffer.remaining()]);
+        return ByteBuffers.toByteArray((ByteBuffer) value);
       } else if (type instanceof DecimalType) {
         return Decimal.fromDecimal(value);
       }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index 40222ca..0dcf3aa 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -22,11 +22,19 @@ package org.apache.iceberg.spark.source;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Files;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
@@ -42,6 +50,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
 
 @RunWith(Parameterized.class)
 public class TestPartitionValues {
@@ -53,11 +62,28 @@ public class TestPartitionValues {
     };
   }
 
-  private static final Schema SCHEMA = new Schema(
+  private static final Schema SUPPORTED_PRIMITIVES = new Schema(
+      required(100, "id", Types.LongType.get()),
+      required(101, "data", Types.StringType.get()),
+      required(102, "b", Types.BooleanType.get()),
+      required(103, "i", Types.IntegerType.get()),
+      required(104, "l", Types.LongType.get()),
+      required(105, "f", Types.FloatType.get()),
+      required(106, "d", Types.DoubleType.get()),
+      required(107, "date", Types.DateType.get()),
+      required(108, "ts", Types.TimestampType.withZone()),
+      required(110, "s", Types.StringType.get()),
+      required(113, "bytes", Types.BinaryType.get()),
+      required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
+      required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
+      required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // spark's maximum precision
+  );
+
+  private static final Schema SIMPLE_SCHEMA = new Schema(
       optional(1, "id", Types.IntegerType.get()),
       optional(2, "data", Types.StringType.get()));
 
-  private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+  private static final PartitionSpec SPEC = PartitionSpec.builderFor(SIMPLE_SCHEMA)
       .identity("data")
       .build();
 
@@ -93,7 +119,7 @@ public class TestPartitionValues {
     Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
 
     HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
-    Table table = tables.create(SCHEMA, SPEC, location.toString());
+    Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString());
     table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
 
     List<SimpleRecord> expected = Lists.newArrayList(
@@ -116,7 +142,10 @@ public class TestPartitionValues {
           .format("iceberg")
           .load(location.toString());
 
-      List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+      List<SimpleRecord> actual = result
+          .orderBy("id")
+          .as(Encoders.bean(SimpleRecord.class))
+          .collectAsList();
 
       Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
       Assert.assertEquals("Result rows should match", expected, actual);
@@ -126,4 +155,68 @@ public class TestPartitionValues {
     }
   }
 
+  @Test
+  public void testPartitionValueTypes() throws Exception {
+    String[] columnNames = new String[] {
+        "b", "i", "l", "f", "d", "date", "ts", "s", "bytes", "dec_9_0", "dec_11_2", "dec_38_10"
+    };
+
+    HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+
+    // create a table around the source data
+    String sourceLocation = temp.newFolder("source_table").toString();
+    Table source = tables.create(SUPPORTED_PRIMITIVES, sourceLocation);
+
+    // write out an Avro data file with all of the data types for source data
+    List<GenericData.Record> expected = RandomData.generateList(source.schema(), 2, 128735L);
+    File avroData = temp.newFile("data.avro");
+    Assert.assertTrue(avroData.delete());
+    try (FileAppender<GenericData.Record> appender = Avro.write(Files.localOutput(avroData))
+        .schema(source.schema())
+        .build()) {
+      appender.addAll(expected);
+    }
+
+    // add the Avro data file to the source table
+    source.newAppend()
+        .appendFile(DataFiles.fromInputFile(Files.localInput(avroData), 10))
+        .commit();
+
+    Dataset<Row> sourceDF = spark.read().format("iceberg").load(sourceLocation);
+
+    try {
+      for (String column : columnNames) {
+        String desc = "partition_by_" + SUPPORTED_PRIMITIVES.findType(column).toString();
+
+        File parent = temp.newFolder(desc);
+        File location = new File(parent, "test");
+        File dataFolder = new File(location, "data");
+        Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
+
+        PartitionSpec spec = PartitionSpec.builderFor(SUPPORTED_PRIMITIVES).identity(column).build();
+
+        Table table = tables.create(SUPPORTED_PRIMITIVES, spec, location.toString());
+        table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
+
+        sourceDF.write()
+            .format("iceberg")
+            .mode("append")
+            .save(location.toString());
+
+        List<Row> actual = spark.read()
+            .format("iceberg")
+            .load(location.toString())
+            .collectAsList();
+
+        Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+
+        for (int i = 0; i < expected.size(); i += 1) {
+          TestHelpers.assertEqualsSafe(
+              SUPPORTED_PRIMITIVES.asStruct(), expected.get(i), actual.get(i));
+        }
+      }
+    } finally {
+      TestTables.clearTables();
+    }
+  }
 }