You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/04/08 16:19:23 UTC
[incubator-iceberg] branch master updated: Fix binary partition
values in Spark. (#146)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 664c8e9 Fix binary partition values in Spark. (#146)
664c8e9 is described below
commit 664c8e93b3ae2383c970c9968f446dd94e391cce
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Mon Apr 8 09:19:18 2019 -0700
Fix binary partition values in Spark. (#146)
---
.../org/apache/iceberg/MergingSnapshotUpdate.java | 2 +-
.../java/org/apache/iceberg/PartitionData.java | 13 +++
.../apache/iceberg/spark/source/PartitionKey.java | 17 ++++
.../org/apache/iceberg/spark/source/Reader.java | 4 +-
.../iceberg/spark/source/TestPartitionValues.java | 101 ++++++++++++++++++++-
5 files changed, 130 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
index bec57c8..8594429 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotUpdate.java
@@ -305,7 +305,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
- if (!committed.contains(newManifest)) {
+ if (newManifest != null && !committed.contains(newManifest)) {
deleteFile(newManifest.path());
this.newManifest = null;
}
diff --git a/core/src/main/java/org/apache/iceberg/PartitionData.java b/core/src/main/java/org/apache/iceberg/PartitionData.java
index 614cf19..62e01cc 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionData.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionData.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
@@ -30,6 +31,7 @@ import org.apache.avro.util.Utf8;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
class PartitionData
implements IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {
@@ -122,6 +124,12 @@ class PartitionData
if (value instanceof Utf8) {
// Utf8 is not Serializable
data[pos] = value.toString();
+ } else if (value instanceof ByteBuffer) {
+ // ByteBuffer is not Serializable
+ ByteBuffer buffer = (ByteBuffer) value;
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.duplicate().get(bytes);
+ data[pos] = bytes;
} else {
data[pos] = value;
}
@@ -137,6 +145,11 @@ class PartitionData
if (i >= data.length) {
return null;
}
+
+ if (data[i] instanceof byte[]) {
+ return ByteBuffer.wrap((byte[]) data[i]);
+ }
+
return data[i];
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
index 7f4b1fc..fa3f139 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
import com.google.common.collect.Maps;
import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -170,6 +171,8 @@ class PartitionKey implements StructLike {
return new StringAccessor(p, convert(type));
case DECIMAL:
return new DecimalAccessor(p, convert(type));
+ case BINARY:
+ return new BytesAccessor(p, convert(type));
default:
return new PositionAccessor(p, convert(type));
}
@@ -276,6 +279,20 @@ class PartitionKey implements StructLike {
}
}
+ private static class BytesAccessor extends PositionAccessor {
+ private BytesAccessor(int p, DataType type) {
+ super(p, type);
+ }
+
+ @Override
+ public Object get(InternalRow row) {
+ if (row.isNullAt(p)) {
+ return null;
+ }
+ return ByteBuffer.wrap((byte[]) row.get(p, type));
+ }
+ }
+
private static class Position2Accessor implements Accessor<InternalRow> {
private final int p0;
private final int size0;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 0fde726..d241370 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -58,6 +58,7 @@ import org.apache.iceberg.spark.data.SparkAvroReader;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
@@ -521,8 +522,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
if (type instanceof StringType) {
return UTF8String.fromString(value.toString());
} else if (type instanceof BinaryType) {
- ByteBuffer buffer = (ByteBuffer) value;
- return buffer.get(new byte[buffer.remaining()]);
+ return ByteBuffers.toByteArray((ByteBuffer) value);
} else if (type instanceof DecimalType) {
return Decimal.fromDecimal(value);
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index 40222ca..0dcf3aa 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -22,11 +22,19 @@ package org.apache.iceberg.spark.source;
import com.google.common.collect.Lists;
import java.io.File;
import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -42,6 +50,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
@RunWith(Parameterized.class)
public class TestPartitionValues {
@@ -53,11 +62,28 @@ public class TestPartitionValues {
};
}
- private static final Schema SCHEMA = new Schema(
+ private static final Schema SUPPORTED_PRIMITIVES = new Schema(
+ required(100, "id", Types.LongType.get()),
+ required(101, "data", Types.StringType.get()),
+ required(102, "b", Types.BooleanType.get()),
+ required(103, "i", Types.IntegerType.get()),
+ required(104, "l", Types.LongType.get()),
+ required(105, "f", Types.FloatType.get()),
+ required(106, "d", Types.DoubleType.get()),
+ required(107, "date", Types.DateType.get()),
+ required(108, "ts", Types.TimestampType.withZone()),
+ required(110, "s", Types.StringType.get()),
+ required(113, "bytes", Types.BinaryType.get()),
+ required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
+ required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
+ required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // spark's maximum precision
+ );
+
+ private static final Schema SIMPLE_SCHEMA = new Schema(
optional(1, "id", Types.IntegerType.get()),
optional(2, "data", Types.StringType.get()));
- private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+ private static final PartitionSpec SPEC = PartitionSpec.builderFor(SIMPLE_SCHEMA)
.identity("data")
.build();
@@ -93,7 +119,7 @@ public class TestPartitionValues {
Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
- Table table = tables.create(SCHEMA, SPEC, location.toString());
+ Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString());
table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
List<SimpleRecord> expected = Lists.newArrayList(
@@ -116,7 +142,10 @@ public class TestPartitionValues {
.format("iceberg")
.load(location.toString());
- List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ List<SimpleRecord> actual = result
+ .orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);
@@ -126,4 +155,68 @@ public class TestPartitionValues {
}
}
+ @Test
+ public void testPartitionValueTypes() throws Exception {
+ String[] columnNames = new String[] {
+ "b", "i", "l", "f", "d", "date", "ts", "s", "bytes", "dec_9_0", "dec_11_2", "dec_38_10"
+ };
+
+ HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+
+ // create a table around the source data
+ String sourceLocation = temp.newFolder("source_table").toString();
+ Table source = tables.create(SUPPORTED_PRIMITIVES, sourceLocation);
+
+ // write out an Avro data file with all of the data types for source data
+ List<GenericData.Record> expected = RandomData.generateList(source.schema(), 2, 128735L);
+ File avroData = temp.newFile("data.avro");
+ Assert.assertTrue(avroData.delete());
+ try (FileAppender<GenericData.Record> appender = Avro.write(Files.localOutput(avroData))
+ .schema(source.schema())
+ .build()) {
+ appender.addAll(expected);
+ }
+
+ // add the Avro data file to the source table
+ source.newAppend()
+ .appendFile(DataFiles.fromInputFile(Files.localInput(avroData), 10))
+ .commit();
+
+ Dataset<Row> sourceDF = spark.read().format("iceberg").load(sourceLocation);
+
+ try {
+ for (String column : columnNames) {
+ String desc = "partition_by_" + SUPPORTED_PRIMITIVES.findType(column).toString();
+
+ File parent = temp.newFolder(desc);
+ File location = new File(parent, "test");
+ File dataFolder = new File(location, "data");
+ Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
+
+ PartitionSpec spec = PartitionSpec.builderFor(SUPPORTED_PRIMITIVES).identity(column).build();
+
+ Table table = tables.create(SUPPORTED_PRIMITIVES, spec, location.toString());
+ table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
+
+ sourceDF.write()
+ .format("iceberg")
+ .mode("append")
+ .save(location.toString());
+
+ List<Row> actual = spark.read()
+ .format("iceberg")
+ .load(location.toString())
+ .collectAsList();
+
+ Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+
+ for (int i = 0; i < expected.size(); i += 1) {
+ TestHelpers.assertEqualsSafe(
+ SUPPORTED_PRIMITIVES.asStruct(), expected.get(i), actual.get(i));
+ }
+ }
+ } finally {
+ TestTables.clearTables();
+ }
+ }
}