You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/26 20:11:17 UTC
[incubator-iceberg] branch vectorized-read updated: Remove all
formatting only and any other unrelated diffs between master and
vectorized-read branches (#673)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/vectorized-read by this push:
new b6ca634 Remove all formatting only and any other unrelated diffs between master and vectorized-read branches (#673)
b6ca634 is described below
commit b6ca634aff56b1917c2cf6a7eca0d3ed5a200ed0
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Tue Nov 26 12:11:10 2019 -0800
Remove all formatting only and any other unrelated diffs between master and vectorized-read branches (#673)
---
README.md | 3 -
.../java/org/apache/iceberg/expressions/False.java | 2 +-
.../java/org/apache/iceberg/expressions/True.java | 2 +-
.../apache/iceberg/transforms/ProjectionUtil.java | 4 +-
.../org/apache/iceberg/transforms/Truncate.java | 12 +-
.../iceberg/transforms/TestDatesProjection.java | 8 +-
.../transforms/TestTimestampsProjection.java | 8 +-
.../main/java/org/apache/iceberg/SchemaUpdate.java | 10 +-
.../java/org/apache/iceberg/TableMetadata.java | 42 ++--
.../org/apache/iceberg/hadoop/HadoopTables.java | 2 +-
gradle/wrapper/gradle-wrapper.properties | 24 ++-
jitpack.yml | 2 +-
.../java/org/apache/iceberg/parquet/Parquet.java | 4 +-
.../org/apache/iceberg/parquet/ParquetFilters.java | 38 ++--
.../java/org/apache/iceberg/parquet/ParquetIO.java | 6 +-
.../apache/iceberg/parquet/ParquetIterable.java | 2 +-
.../apache/iceberg/parquet/ParquetReadSupport.java | 4 +-
.../org/apache/iceberg/parquet/ParquetReader.java | 10 +-
.../org/apache/iceberg/parquet/ParquetUtil.java | 18 +-
.../iceberg/parquet/ParquetValueReaders.java | 10 +-
.../iceberg/parquet/ParquetWriteSupport.java | 4 +-
.../SparkParquetReadersFlatDataBenchmark.java | 215 +++++++++++++++++++++
.../SparkParquetReadersNestedDataBenchmark.java | 215 +++++++++++++++++++++
.../SparkParquetWritersFlatDataBenchmark.java | 123 ++++++++++++
.../SparkParquetWritersNestedDataBenchmark.java | 122 ++++++++++++
.../spark/source/IcebergSourceBenchmark.java | 4 +-
.../source/IcebergSourceFlatDataBenchmark.java | 18 +-
...cebergSourceFlatParquetDataFilterBenchmark.java | 47 +----
...IcebergSourceFlatParquetDataReadBenchmark.java} | 70 +++----
...IcebergSourceFlatParquetDataWriteBenchmark.java | 16 +-
.../iceberg/spark/data/SparkParquetReaders.java | 6 +-
.../apache/iceberg/spark/source/IcebergSource.java | 4 +-
.../org/apache/iceberg/spark/source/Reader.java | 20 +-
.../org/apache/iceberg/spark/source/Writer.java | 6 +-
.../org/apache/iceberg/spark/SparkTableUtil.scala | 112 +++++------
.../org/apache/iceberg/spark/data/RandomData.java | 5 +-
.../org/apache/iceberg/spark/data/TestHelpers.java | 3 +-
.../iceberg/spark/data/TestParquetAvroReader.java | 10 +-
.../iceberg/spark/data/TestParquetAvroWriter.java | 70 ++++---
.../iceberg/spark/data/TestSparkParquetReader.java | 67 ++++---
.../iceberg/spark/data/TestSparkParquetWriter.java | 66 ++++---
.../iceberg/spark/source/TestSparkTableUtil.java | 66 +++----
42 files changed, 1071 insertions(+), 409 deletions(-)
diff --git a/README.md b/README.md
index a518a2e..68f8bf4 100644
--- a/README.md
+++ b/README.md
@@ -57,9 +57,6 @@ Iceberg is built using Gradle 5.4.1.
* To invoke a build and run tests: `./gradlew build`
* To skip tests: `./gradlew build -x test`
-* To invoke a build and run tests: `./gradlew build`
-* To skip tests: `./gradlew build -x test`
-
Iceberg table support is organized in library modules:
* `iceberg-common` contains utility classes used in other modules
diff --git a/api/src/main/java/org/apache/iceberg/expressions/False.java b/api/src/main/java/org/apache/iceberg/expressions/False.java
index 761d054..940750a 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/False.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/False.java
@@ -25,7 +25,7 @@ import java.io.ObjectStreamException;
* An {@link Expression expression} that is always false.
*/
public class False implements Expression {
- public static final False INSTANCE = new False();
+ static final False INSTANCE = new False();
private False() {
}
diff --git a/api/src/main/java/org/apache/iceberg/expressions/True.java b/api/src/main/java/org/apache/iceberg/expressions/True.java
index 634e2c0..8f43bc0 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/True.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/True.java
@@ -25,7 +25,7 @@ import java.io.ObjectStreamException;
* An {@link Expression expression} that is always true.
*/
public class True implements Expression {
- public static final True INSTANCE = new True();
+ static final True INSTANCE = new True();
private True() {
}
diff --git a/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java b/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java
index 42b5b47..9611f56 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java
@@ -191,8 +191,8 @@ class ProjectionUtil {
return predicate(Expression.Operation.EQ, name, transform.apply(boundary));
case STARTS_WITH:
return predicate(Expression.Operation.STARTS_WITH, name, transform.apply(boundary));
- // case IN: // TODO
- // return Expressions.predicate(Operation.IN, name, transform.apply(boundary));
+// case IN: // TODO
+// return Expressions.predicate(Operation.IN, name, transform.apply(boundary));
default:
return null;
}
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
index dc3ac6a..f9fd60b 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
@@ -231,7 +231,7 @@ abstract class Truncate<T> implements Transform<T, T> {
@Override
public UnboundPredicate<CharSequence> project(String name,
- BoundPredicate<CharSequence> predicate) {
+ BoundPredicate<CharSequence> predicate) {
if (predicate.isUnaryPredicate()) {
return Expressions.predicate(predicate.op(), name);
} else if (predicate.isLiteralPredicate()) {
@@ -242,7 +242,7 @@ abstract class Truncate<T> implements Transform<T, T> {
@Override
public UnboundPredicate<CharSequence> projectStrict(String name,
- BoundPredicate<CharSequence> predicate) {
+ BoundPredicate<CharSequence> predicate) {
if (predicate instanceof BoundUnaryPredicate) {
return Expressions.predicate(predicate.op(), name);
} else if (predicate instanceof BoundLiteralPredicate) {
@@ -313,7 +313,7 @@ abstract class Truncate<T> implements Transform<T, T> {
@Override
public UnboundPredicate<ByteBuffer> project(String name,
- BoundPredicate<ByteBuffer> pred) {
+ BoundPredicate<ByteBuffer> pred) {
if (pred.isUnaryPredicate()) {
return Expressions.predicate(pred.op(), name);
} else if (pred.isLiteralPredicate()) {
@@ -324,7 +324,7 @@ abstract class Truncate<T> implements Transform<T, T> {
@Override
public UnboundPredicate<ByteBuffer> projectStrict(String name,
- BoundPredicate<ByteBuffer> pred) {
+ BoundPredicate<ByteBuffer> pred) {
if (pred.isUnaryPredicate()) {
return Expressions.predicate(pred.op(), name);
} else if (pred.isLiteralPredicate()) {
@@ -396,7 +396,7 @@ abstract class Truncate<T> implements Transform<T, T> {
@Override
public UnboundPredicate<BigDecimal> project(String name,
- BoundPredicate<BigDecimal> pred) {
+ BoundPredicate<BigDecimal> pred) {
if (pred.isUnaryPredicate()) {
return Expressions.predicate(pred.op(), name);
} else if (pred.isLiteralPredicate()) {
@@ -407,7 +407,7 @@ abstract class Truncate<T> implements Transform<T, T> {
@Override
public UnboundPredicate<BigDecimal> projectStrict(String name,
- BoundPredicate<BigDecimal> pred) {
+ BoundPredicate<BigDecimal> pred) {
if (pred.isUnaryPredicate()) {
return Expressions.predicate(pred.op(), name);
} else if (pred.isLiteralPredicate()) {
diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java b/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java
index 6f263cc..b6602a6 100644
--- a/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java
+++ b/api/src/test/java/org/apache/iceberg/transforms/TestDatesProjection.java
@@ -43,7 +43,7 @@ public class TestDatesProjection {
private static final Schema SCHEMA = new Schema(optional(1, "date", TYPE));
public void assertProjectionStrict(PartitionSpec spec, UnboundPredicate<?> filter,
- Expression.Operation expectedOp, String expectedLiteral) {
+ Expression.Operation expectedOp, String expectedLiteral) {
Expression projection = Projections.strict(spec).project(filter);
UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
@@ -57,21 +57,21 @@ public class TestDatesProjection {
}
public void assertProjectionStrictValue(PartitionSpec spec, UnboundPredicate<?> filter,
- Expression.Operation expectedOp) {
+ Expression.Operation expectedOp) {
Expression projection = Projections.strict(spec).project(filter);
Assert.assertEquals(projection.op(), expectedOp);
}
public void assertProjectionInclusiveValue(PartitionSpec spec, UnboundPredicate<?> filter,
- Expression.Operation expectedOp) {
+ Expression.Operation expectedOp) {
Expression projection = Projections.inclusive(spec).project(filter);
Assert.assertEquals(projection.op(), expectedOp);
}
public void assertProjectionInclusive(PartitionSpec spec, UnboundPredicate<?> filter,
- Expression.Operation expectedOp, String expectedLiteral) {
+ Expression.Operation expectedOp, String expectedLiteral) {
Expression projection = Projections.inclusive(spec).project(filter);
UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java b/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java
index 787ec1e..3fec404 100644
--- a/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java
+++ b/api/src/test/java/org/apache/iceberg/transforms/TestTimestampsProjection.java
@@ -43,7 +43,7 @@ public class TestTimestampsProjection {
private static final Schema SCHEMA = new Schema(optional(1, "timestamp", TYPE));
public void assertProjectionStrict(PartitionSpec spec, UnboundPredicate<?> filter,
- Expression.Operation expectedOp, String expectedLiteral) {
+ Expression.Operation expectedOp, String expectedLiteral) {
Expression projection = Projections.strict(spec).project(filter);
UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
@@ -57,21 +57,21 @@ public class TestTimestampsProjection {
}
public void assertProjectionStrictValue(PartitionSpec spec, UnboundPredicate<?> filter,
- Expression.Operation expectedOp) {
+ Expression.Operation expectedOp) {
Expression projection = Projections.strict(spec).project(filter);
Assert.assertEquals(projection.op(), expectedOp);
}
public void assertProjectionInclusiveValue(PartitionSpec spec, UnboundPredicate<?> filter,
- Expression.Operation expectedOp) {
+ Expression.Operation expectedOp) {
Expression projection = Projections.inclusive(spec).project(filter);
Assert.assertEquals(projection.op(), expectedOp);
}
public void assertProjectionInclusive(PartitionSpec spec, UnboundPredicate<?> filter,
- Expression.Operation expectedOp, String expectedLiteral) {
+ Expression.Operation expectedOp, String expectedLiteral) {
Expression projection = Projections.inclusive(spec).project(filter);
UnboundPredicate<?> predicate = assertAndUnwrapUnbound(projection);
diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
index 3e53598..a1ef08c 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
@@ -309,8 +309,8 @@ class SchemaUpdate implements UpdateSchema {
}
private static Schema applyChanges(Schema schema, List<Integer> deletes,
- Map<Integer, Types.NestedField> updates,
- Multimap<Integer, Types.NestedField> adds) {
+ Map<Integer, Types.NestedField> updates,
+ Multimap<Integer, Types.NestedField> adds) {
Types.StructType struct = TypeUtil
.visit(schema, new ApplyChanges(deletes, updates, adds))
.asNestedType().asStructType();
@@ -323,8 +323,8 @@ class SchemaUpdate implements UpdateSchema {
private final Multimap<Integer, Types.NestedField> adds;
private ApplyChanges(List<Integer> deletes,
- Map<Integer, Types.NestedField> updates,
- Multimap<Integer, Types.NestedField> adds) {
+ Map<Integer, Types.NestedField> updates,
+ Multimap<Integer, Types.NestedField> adds) {
this.deletes = deletes;
this.updates = updates;
this.adds = adds;
@@ -471,7 +471,7 @@ class SchemaUpdate implements UpdateSchema {
}
private static Types.StructType addFields(Types.StructType struct,
- Collection<Types.NestedField> adds) {
+ Collection<Types.NestedField> adds) {
List<Types.NestedField> newFields = Lists.newArrayList(struct.fields());
newFields.addAll(adds);
return Types.StructType.of(newFields);
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 899341b..a5081a6 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -47,15 +47,15 @@ public class TableMetadata {
static final int INITIAL_SPEC_ID = 0;
public static TableMetadata newTableMetadata(Schema schema,
- PartitionSpec spec,
- String location) {
+ PartitionSpec spec,
+ String location) {
return newTableMetadata(schema, spec, location, ImmutableMap.of());
}
public static TableMetadata newTableMetadata(Schema schema,
- PartitionSpec spec,
- String location,
- Map<String, String> properties) {
+ PartitionSpec spec,
+ String location,
+ Map<String, String> properties) {
// reassign all column ids to ensure consistency
AtomicInteger lastColumnId = new AtomicInteger(0);
Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
@@ -150,7 +150,7 @@ public class TableMetadata {
}
MetadataLogEntry that = (MetadataLogEntry) other;
return timestampMillis == that.timestampMillis &&
- java.util.Objects.equals(file, that.file);
+ java.util.Objects.equals(file, that.file);
}
@Override
@@ -186,18 +186,18 @@ public class TableMetadata {
private final List<MetadataLogEntry> previousFiles;
TableMetadata(InputFile file,
- String uuid,
- String location,
- long lastUpdatedMillis,
- int lastColumnId,
- Schema schema,
- int defaultSpecId,
- List<PartitionSpec> specs,
- Map<String, String> properties,
- long currentSnapshotId,
- List<Snapshot> snapshots,
- List<HistoryEntry> snapshotLog,
- List<MetadataLogEntry> previousFiles) {
+ String uuid,
+ String location,
+ long lastUpdatedMillis,
+ int lastColumnId,
+ Schema schema,
+ int defaultSpecId,
+ List<PartitionSpec> specs,
+ Map<String, String> properties,
+ long currentSnapshotId,
+ List<Snapshot> snapshots,
+ List<HistoryEntry> snapshotLog,
+ List<MetadataLogEntry> previousFiles) {
this.file = file;
this.uuid = uuid;
this.location = location;
@@ -476,7 +476,7 @@ public class TableMetadata {
}
public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec,
- Map<String, String> updatedProperties) {
+ Map<String, String> updatedProperties) {
AtomicInteger nextLastColumnId = new AtomicInteger(0);
Schema freshSchema = TypeUtil.assignFreshIds(updatedSchema, nextLastColumnId::incrementAndGet);
@@ -526,13 +526,13 @@ public class TableMetadata {
}
private List<MetadataLogEntry> addPreviousFile(InputFile previousFile, long timestampMillis,
- Map<String, String> updatedProperties) {
+ Map<String, String> updatedProperties) {
if (previousFile == null) {
return previousFiles;
}
int maxSize = Math.max(1, PropertyUtil.propertyAsInt(updatedProperties,
- TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT));
+ TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT));
List<MetadataLogEntry> newMetadataLog = null;
if (previousFiles.size() >= maxSize) {
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
index cffd6fc..9b144ef 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
@@ -126,7 +126,7 @@ public class HadoopTables implements Tables, Configurable {
*/
@Override
public Table create(Schema schema, PartitionSpec spec, Map<String, String> properties,
- String location) {
+ String location) {
Preconditions.checkNotNull(schema, "A table schema is required");
TableOperations ops = newTableOps(location);
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 768a708..1277f7d 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,24 @@
-#Wed Nov 20 14:10:01 PST 2019
-distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-zipStorePath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-bin.zip
diff --git a/jitpack.yml b/jitpack.yml
index 5fabc87..01b9d61 100644
--- a/jitpack.yml
+++ b/jitpack.yml
@@ -14,4 +14,4 @@
# limitations under the License.
install:
- - ./gradlew publishToMavenLocal
\ No newline at end of file
+ - ./gradlew publishToMavenLocal
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 0e724df..60aa877 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -314,8 +314,8 @@ public class Parquet {
return this;
}
- public ReadBuilder project(Schema sc) {
- this.schema = sc;
+ public ReadBuilder project(Schema newSchema) {
+ this.schema = newSchema;
return this;
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
index 285164e..ae40da7 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
@@ -176,25 +176,6 @@ class ParquetFilters {
}
}
- @SuppressWarnings("unchecked")
- private static <C extends Comparable<C>> C getParquetPrimitive(Literal<?> lit) {
- if (lit == null) {
- return null;
- }
-
- // TODO: this needs to convert to handle BigDecimal and UUID
- Object value = lit.value();
- if (value instanceof Number) {
- return (C) lit.value();
- } else if (value instanceof CharSequence) {
- return (C) Binary.fromString(value.toString());
- } else if (value instanceof ByteBuffer) {
- return (C) Binary.fromReusedByteBuffer((ByteBuffer) value);
- }
- throw new UnsupportedOperationException(
- "Type not supported yet: " + value.getClass().getName());
- }
-
@SuppressWarnings("checkstyle:MethodTypeParameterName")
private static <C extends Comparable<C>, COL extends Operators.Column<C> & Operators.SupportsLtGt>
FilterPredicate pred(Operation op, COL col, C value) {
@@ -220,6 +201,25 @@ class ParquetFilters {
}
}
+ @SuppressWarnings("unchecked")
+ private static <C extends Comparable<C>> C getParquetPrimitive(Literal<?> lit) {
+ if (lit == null) {
+ return null;
+ }
+
+ // TODO: this needs to convert to handle BigDecimal and UUID
+ Object value = lit.value();
+ if (value instanceof Number) {
+ return (C) lit.value();
+ } else if (value instanceof CharSequence) {
+ return (C) Binary.fromString(value.toString());
+ } else if (value instanceof ByteBuffer) {
+ return (C) Binary.fromReusedByteBuffer((ByteBuffer) value);
+ }
+ throw new UnsupportedOperationException(
+ "Type not supported yet: " + value.getClass().getName());
+ }
+
private static class AlwaysTrue implements FilterPredicate {
static final AlwaysTrue INSTANCE = new AlwaysTrue();
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
index 1afc211..d65b8d6 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java
@@ -41,11 +41,11 @@ import org.apache.parquet.io.SeekableInputStream;
/**
* Methods in this class translate from the IO API to Parquet's IO API.
*/
-public class ParquetIO {
+class ParquetIO {
private ParquetIO() {
}
- public static InputFile file(org.apache.iceberg.io.InputFile file) {
+ static InputFile file(org.apache.iceberg.io.InputFile file) {
// TODO: use reflection to avoid depending on classes from iceberg-hadoop
// TODO: use reflection to avoid depending on classes from hadoop
if (file instanceof HadoopInputFile) {
@@ -59,7 +59,7 @@ public class ParquetIO {
return new ParquetInputFile(file);
}
- public static OutputFile file(org.apache.iceberg.io.OutputFile file) {
+ static OutputFile file(org.apache.iceberg.io.OutputFile file) {
if (file instanceof HadoopOutputFile) {
HadoopOutputFile hfile = (HadoopOutputFile) file;
try {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
index 0000cb2..79d702e 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java
@@ -31,7 +31,7 @@ import org.apache.parquet.hadoop.ParquetReader;
public class ParquetIterable<T> extends CloseableGroup implements CloseableIterable<T> {
private final ParquetReader.Builder<T> builder;
- public ParquetIterable(ParquetReader.Builder<T> builder) {
+ ParquetIterable(ParquetReader.Builder<T> builder) {
this.builder = builder;
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
index 18e2c85..9588676 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
@@ -37,12 +37,12 @@ import org.apache.parquet.schema.MessageType;
*
* @param <T> Java type produced by this read support instance
*/
-public class ParquetReadSupport<T> extends ReadSupport<T> {
+class ParquetReadSupport<T> extends ReadSupport<T> {
private final Schema expectedSchema;
private final ReadSupport<T> wrapped;
private final boolean callInit;
- public ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport, boolean callInit) {
+ ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport, boolean callInit) {
this.expectedSchema = expectedSchema;
this.wrapped = readSupport;
this.callInit = callInit;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 7112f8d..80f092d 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -47,8 +47,8 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
private final boolean caseSensitive;
public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options,
- Function<MessageType, ParquetValueReader<?>> readerFunc,
- Expression filter, boolean reuseContainers, boolean caseSensitive) {
+ Function<MessageType, ParquetValueReader<?>> readerFunc,
+ Expression filter, boolean reuseContainers, boolean caseSensitive) {
this.input = input;
this.expectedSchema = expectedSchema;
this.options = options;
@@ -72,8 +72,8 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
@SuppressWarnings("unchecked")
ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
- Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
- boolean caseSensitive) {
+ Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
+ boolean caseSensitive) {
this.file = file;
this.options = options;
this.reader = newReader(file, options);
@@ -102,7 +102,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
BlockMetaData rowGroup = rowGroups.get(i);
boolean shouldRead = filter == null || (
statsFilter.shouldRead(typeWithIds, rowGroup) &&
- dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
+ dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
this.shouldSkip[i] = !shouldRead;
if (shouldRead) {
computedTotalValues += rowGroup.getRowCount();
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index 3f731de..2621f5f 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -95,7 +95,7 @@ public class ParquetUtil {
increment(columnSizes, fieldId, column.getTotalSize());
String columnName = fileSchema.findColumnName(fieldId);
- MetricsModes.MetricsMode metricsMode = metricsConfig.columnMode(columnName);
+ MetricsMode metricsMode = metricsConfig.columnMode(columnName);
if (metricsMode == MetricsModes.None.get()) {
continue;
}
@@ -134,7 +134,8 @@ public class ParquetUtil {
}
/**
- * @return a list of offsets in ascending order determined by the starting position of the row groups
+ * @return a list of offsets in ascending order determined by the starting position
+ * of the row groups
*/
public static List<Long> getSplitOffsets(ParquetMetadata md) {
List<Long> splitOffsets = new ArrayList<>(md.getBlocks().size());
@@ -172,9 +173,8 @@ public class ParquetUtil {
}
@SuppressWarnings("unchecked")
- private static <T> void updateMin(
- Map<Integer, Literal<?>> lowerBounds, int id, Type type,
- Literal<T> min, MetricsMode metricsMode) {
+ private static <T> void updateMin(Map<Integer, Literal<?>> lowerBounds, int id, Type type,
+ Literal<T> min, MetricsMode metricsMode) {
Literal<T> currentMin = (Literal<T>) lowerBounds.get(id);
if (currentMin == null || min.comparator().compare(min.value(), currentMin.value()) < 0) {
if (metricsMode == MetricsModes.Full.get()) {
@@ -198,9 +198,8 @@ public class ParquetUtil {
}
@SuppressWarnings("unchecked")
- private static <T> void updateMax(
- Map<Integer, Literal<?>> upperBounds, int id, Type type,
- Literal<T> max, MetricsMode metricsMode) {
+ private static <T> void updateMax(Map<Integer, Literal<?>> upperBounds, int id, Type type,
+ Literal<T> max, MetricsMode metricsMode) {
Literal<T> currentMax = (Literal<T>) upperBounds.get(id);
if (currentMax == null || max.comparator().compare(max.value(), currentMax.value()) > 0) {
if (metricsMode == MetricsModes.Full.get()) {
@@ -226,8 +225,7 @@ public class ParquetUtil {
private static Map<Integer, ByteBuffer> toBufferMap(Schema schema, Map<Integer, Literal<?>> map) {
Map<Integer, ByteBuffer> bufferMap = Maps.newHashMap();
for (Map.Entry<Integer, Literal<?>> entry : map.entrySet()) {
- bufferMap.put(
- entry.getKey(),
+ bufferMap.put(entry.getKey(),
Conversions.toByteBuffer(schema.findType(entry.getKey()), entry.getValue().value()));
}
return bufferMap;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
index ba18f75..aa7f572 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
@@ -41,7 +41,7 @@ public class ParquetValueReaders {
}
public static <T> ParquetValueReader<T> option(Type type, int definitionLevel,
- ParquetValueReader<T> reader) {
+ ParquetValueReader<T> reader) {
if (type.isRepetition(Type.Repetition.OPTIONAL)) {
return new OptionReader<>(definitionLevel, reader);
}
@@ -416,7 +416,7 @@ public class ParquetValueReaders {
private Iterator<E> elements = null;
public ListReader(int definitionLevel, int repetitionLevel,
- ParquetValueReader<E> reader) {
+ ParquetValueReader<E> reader) {
super(definitionLevel, repetitionLevel, reader);
}
@@ -470,7 +470,7 @@ public class ParquetValueReaders {
private final List<TripleIterator<?>> children;
protected RepeatedKeyValueReader(int definitionLevel, int repetitionLevel,
- ParquetValueReader<K> keyReader, ParquetValueReader<V> valueReader) {
+ ParquetValueReader<K> keyReader, ParquetValueReader<V> valueReader) {
this.definitionLevel = definitionLevel;
this.repetitionLevel = repetitionLevel;
this.keyReader = keyReader;
@@ -535,8 +535,8 @@ public class ParquetValueReaders {
private Iterator<Map.Entry<K, V>> pairs = null;
public MapReader(int definitionLevel, int repetitionLevel,
- ParquetValueReader<K> keyReader,
- ParquetValueReader<V> valueReader) {
+ ParquetValueReader<K> keyReader,
+ ParquetValueReader<V> valueReader) {
super(definitionLevel, repetitionLevel, keyReader, valueReader);
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
index d097c67..633f9f8 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java
@@ -26,12 +26,12 @@ import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
-public class ParquetWriteSupport<T> extends WriteSupport<T> {
+class ParquetWriteSupport<T> extends WriteSupport<T> {
private final MessageType type;
private final Map<String, String> keyValueMetadata;
private final WriteSupport<T> wrapped;
- public ParquetWriteSupport(MessageType type, Map<String, String> keyValueMetadata, WriteSupport<T> writeSupport) {
+ ParquetWriteSupport(MessageType type, Map<String, String> keyValueMetadata, WriteSupport<T> writeSupport) {
this.type = type;
this.keyValueMetadata = keyValueMetadata;
this.wrapped = writeSupport;
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersFlatDataBenchmark.java
new file mode 100644
index 0000000..7ad24a7
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersFlatDataBenchmark.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.parquet;
+
+import com.google.common.collect.Iterables;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkBenchmarkUtil;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of reading Parquet data with a flat schema using
+ * Iceberg and Spark Parquet readers.
+ *
+ * To run this benchmark:
+ * <code>
+ * ./gradlew :iceberg-spark:jmh
+ * -PjmhIncludeRegex=SparkParquetReadersFlatDataBenchmark
+ * -PjmhOutputPath=benchmark/spark-parquet-readers-flat-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetReadersFlatDataBenchmark {
+
+ private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
+ .impl(UnsafeProjection.class, InternalRow.class)
+ .build();
+ private static final Schema SCHEMA = new Schema(
+ required(1, "longCol", Types.LongType.get()),
+ required(2, "intCol", Types.IntegerType.get()),
+ required(3, "floatCol", Types.FloatType.get()),
+ optional(4, "doubleCol", Types.DoubleType.get()),
+ optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+ optional(6, "dateCol", Types.DateType.get()),
+ optional(7, "timestampCol", Types.TimestampType.withZone()),
+ optional(8, "stringCol", Types.StringType.get()));
+ private static final Schema PROJECTED_SCHEMA = new Schema(
+ required(1, "longCol", Types.LongType.get()),
+ optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+ optional(8, "stringCol", Types.StringType.get()));
+ private static final int NUM_RECORDS = 10000000;
+ private File dataFile;
+
+ @Setup
+ public void setupBenchmark() throws IOException {
+ dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet");
+ List<GenericData.Record> records = RandomData.generateList(SCHEMA, NUM_RECORDS, 0L);
+ try (FileAppender<GenericData.Record> writer = Parquet.write(Files.localOutput(dataFile))
+ .schema(SCHEMA)
+ .named("benchmark")
+ .build()) {
+ writer.addAll(records);
+ }
+ }
+
+ @TearDown
+ public void tearDownBenchmark() {
+ if (dataFile != null) {
+ dataFile.delete();
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readUsingIcebergReader(Blackhole blackHole) throws IOException {
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(SCHEMA)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
+ .build()) {
+
+ for (InternalRow row : rows) {
+ blackHole.consume(row);
+ }
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException {
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(SCHEMA)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
+ .build()) {
+
+ Iterable<InternalRow> unsafeRows = Iterables.transform(
+ rows,
+ APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke);
+
+ for (InternalRow row : unsafeRows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readUsingSparkReader(Blackhole blackhole) throws IOException {
+ StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(SCHEMA)
+ .readSupport(new ParquetReadSupport())
+ .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
+ .set("spark.sql.parquet.binaryAsString", "false")
+ .set("spark.sql.parquet.int96AsTimestamp", "false")
+ .callInit()
+ .build()) {
+
+ for (InternalRow row : rows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException {
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(PROJECTED_SCHEMA)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+ .build()) {
+
+ for (InternalRow row : rows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException {
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(PROJECTED_SCHEMA)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+ .build()) {
+
+ Iterable<InternalRow> unsafeRows = Iterables.transform(
+ rows,
+ APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke);
+
+ for (InternalRow row : unsafeRows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOException {
+ StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA);
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(PROJECTED_SCHEMA)
+ .readSupport(new ParquetReadSupport())
+ .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
+ .set("spark.sql.parquet.binaryAsString", "false")
+ .set("spark.sql.parquet.int96AsTimestamp", "false")
+ .callInit()
+ .build()) {
+
+ for (InternalRow row : rows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersNestedDataBenchmark.java
new file mode 100644
index 0000000..11cf45c
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetReadersNestedDataBenchmark.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.parquet;
+
+import com.google.common.collect.Iterables;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkBenchmarkUtil;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of reading nested Parquet data using
+ * Iceberg and Spark Parquet readers.
+ *
+ * To run this benchmark:
+ * <code>
+ * ./gradlew :iceberg-spark:jmh
+ * -PjmhIncludeRegex=SparkParquetReadersNestedDataBenchmark
+ * -PjmhOutputPath=benchmark/spark-parquet-readers-nested-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetReadersNestedDataBenchmark {
+
+ private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
+ .impl(UnsafeProjection.class, InternalRow.class)
+ .build();
+ private static final Schema SCHEMA = new Schema(
+ required(0, "id", Types.LongType.get()),
+ optional(4, "nested", Types.StructType.of(
+ required(1, "col1", Types.StringType.get()),
+ required(2, "col2", Types.DoubleType.get()),
+ required(3, "col3", Types.LongType.get())
+ ))
+ );
+ private static final Schema PROJECTED_SCHEMA = new Schema(
+ optional(4, "nested", Types.StructType.of(
+ required(1, "col1", Types.StringType.get())
+ ))
+ );
+ private static final int NUM_RECORDS = 10000000;
+ private File dataFile;
+
+ @Setup
+ public void setupBenchmark() throws IOException {
+ dataFile = File.createTempFile("parquet-nested-data-benchmark", ".parquet");
+ List<GenericData.Record> records = RandomData.generateList(SCHEMA, NUM_RECORDS, 0L);
+ try (FileAppender<GenericData.Record> writer = Parquet.write(Files.localOutput(dataFile))
+ .schema(SCHEMA)
+ .named("benchmark")
+ .build()) {
+ writer.addAll(records);
+ }
+ }
+
+ @TearDown
+ public void tearDownBenchmark() {
+ if (dataFile != null) {
+ dataFile.delete();
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readUsingIcebergReader(Blackhole blackhole) throws IOException {
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(SCHEMA)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
+ .build()) {
+
+ for (InternalRow row : rows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException {
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(SCHEMA)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type))
+ .build()) {
+
+ Iterable<InternalRow> unsafeRows = Iterables.transform(
+ rows,
+ APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke);
+
+ for (InternalRow row : unsafeRows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readUsingSparkReader(Blackhole blackhole) throws IOException {
+ StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(SCHEMA)
+ .readSupport(new ParquetReadSupport())
+ .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
+ .set("spark.sql.parquet.binaryAsString", "false")
+ .set("spark.sql.parquet.int96AsTimestamp", "false")
+ .callInit()
+ .build()) {
+
+ for (InternalRow row : rows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException {
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(PROJECTED_SCHEMA)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+ .build()) {
+
+ for (InternalRow row : rows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException {
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(PROJECTED_SCHEMA)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
+ .build()) {
+
+ Iterable<InternalRow> unsafeRows = Iterables.transform(
+ rows,
+ APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke);
+
+ for (InternalRow row : unsafeRows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOException {
+ StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA);
+ try (CloseableIterable<InternalRow> rows = Parquet.read(Files.localInput(dataFile))
+ .project(PROJECTED_SCHEMA)
+ .readSupport(new ParquetReadSupport())
+ .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json())
+ .set("spark.sql.parquet.binaryAsString", "false")
+ .set("spark.sql.parquet.int96AsTimestamp", "false")
+ .callInit()
+ .build()) {
+
+ for (InternalRow row : rows) {
+ blackhole.consume(row);
+ }
+ }
+ }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersFlatDataBenchmark.java
new file mode 100644
index 0000000..599aa06
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersFlatDataBenchmark.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of writing Parquet data with a flat schema using
+ * Iceberg and Spark Parquet writers.
+ *
+ * To run this benchmark:
+ * <code>
+ * ./gradlew :iceberg-spark:jmh
+ * -PjmhIncludeRegex=SparkParquetWritersFlatDataBenchmark
+ * -PjmhOutputPath=benchmark/spark-parquet-writers-flat-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetWritersFlatDataBenchmark {
+
+ private static final Schema SCHEMA = new Schema(
+ required(1, "longCol", Types.LongType.get()),
+ required(2, "intCol", Types.IntegerType.get()),
+ required(3, "floatCol", Types.FloatType.get()),
+ optional(4, "doubleCol", Types.DoubleType.get()),
+ optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+ optional(6, "dateCol", Types.DateType.get()),
+ optional(7, "timestampCol", Types.TimestampType.withZone()),
+ optional(8, "stringCol", Types.StringType.get()));
+ private static final int NUM_RECORDS = 1000000;
+ private Iterable<InternalRow> rows;
+ private File dataFile;
+
+ @Setup
+ public void setupBenchmark() throws IOException {
+ rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L);
+ dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet");
+ }
+
+ @TearDown
+ public void tearDownBenchmark() {
+ if (dataFile != null) {
+ dataFile.delete();
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUsingIcebergWriter() throws IOException {
+ try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
+ .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(SCHEMA, msgType))
+ .schema(SCHEMA)
+ .build()) {
+
+ writer.addAll(rows);
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUsingSparkWriter() throws IOException {
+ StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+ try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
+ .writeSupport(new ParquetWriteSupport())
+ .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
+ .set("spark.sql.parquet.writeLegacyFormat", "false")
+ .set("spark.sql.parquet.binaryAsString", "false")
+ .set("spark.sql.parquet.int96AsTimestamp", "false")
+ .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
+ .schema(SCHEMA)
+ .build()) {
+
+ writer.addAll(rows);
+ }
+ }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersNestedDataBenchmark.java
new file mode 100644
index 0000000..c43a885
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/parquet/SparkParquetWritersNestedDataBenchmark.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+/**
+ * A benchmark that evaluates the performance of writing nested Parquet data using
+ * Iceberg and Spark Parquet writers.
+ *
+ * To run this benchmark:
+ * <code>
+ * ./gradlew :iceberg-spark:jmh
+ * -PjmhIncludeRegex=SparkParquetWritersNestedDataBenchmark
+ * -PjmhOutputPath=benchmark/spark-parquet-writers-nested-data-benchmark-result.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class SparkParquetWritersNestedDataBenchmark {
+
+ private static final Schema SCHEMA = new Schema(
+ required(0, "id", Types.LongType.get()),
+ optional(4, "nested", Types.StructType.of(
+ required(1, "col1", Types.StringType.get()),
+ required(2, "col2", Types.DoubleType.get()),
+ required(3, "col3", Types.LongType.get())
+ ))
+ );
+ private static final int NUM_RECORDS = 1000000;
+ private Iterable<InternalRow> rows;
+ private File dataFile;
+
+ @Setup
+ public void setupBenchmark() throws IOException {
+ rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L);
+ dataFile = File.createTempFile("parquet-nested-data-benchmark", ".parquet");
+ }
+
+ @TearDown
+ public void tearDownBenchmark() {
+ if (dataFile != null) {
+ dataFile.delete();
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUsingIcebergWriter() throws IOException {
+ try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
+ .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(SCHEMA, msgType))
+ .schema(SCHEMA)
+ .build()) {
+
+ writer.addAll(rows);
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUsingSparkWriter() throws IOException {
+ StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+ try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
+ .writeSupport(new ParquetWriteSupport())
+ .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
+ .set("spark.sql.parquet.writeLegacyFormat", "false")
+ .set("spark.sql.parquet.binaryAsString", "false")
+ .set("spark.sql.parquet.int96AsTimestamp", "false")
+ .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
+ .schema(SCHEMA)
+ .build()) {
+
+ writer.addAll(rows);
+ }
+ }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
index a1927d5..97cd591 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
@@ -48,8 +48,8 @@ import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION;
@Fork(1)
@State(Scope.Benchmark)
-@Warmup(iterations = 2)
-@Measurement(iterations = 3)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
@BenchmarkMode(Mode.SingleShotTime)
public abstract class IcebergSourceBenchmark {
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
index 1f81236..9f9a070 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.types.Types;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchmark {
@@ -41,19 +42,18 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
@Override
protected final Table initTable() {
Schema schema = new Schema(
- optional(1, "longCol", Types.LongType.get()),
- optional(2, "intCol", Types.LongType.get()),
- optional(3, "floatCol", Types.LongType.get()),
- optional(4, "doubleCol", Types.LongType.get()),
- optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
- optional(6, "dateCol", Types.DateType.get()),
- optional(7, "timestampCol", Types.TimestampType.withZone()),
- optional(8, "stringCol", Types.StringType.get()));
+ required(1, "longCol", Types.LongType.get()),
+ required(2, "intCol", Types.IntegerType.get()),
+ required(3, "floatCol", Types.FloatType.get()),
+ optional(4, "doubleCol", Types.DoubleType.get()),
+ optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+ optional(6, "dateCol", Types.DateType.get()),
+ optional(7, "timestampCol", Types.TimestampType.withZone()),
+ optional(8, "stringCol", Types.StringType.get()));
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
HadoopTables tables = new HadoopTables(hadoopConf());
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
- properties.put(TableProperties.PARQUET_DICT_SIZE_BYTES, "1");
return tables.create(schema, partitionSpec, properties, newTableLocation());
}
}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
index 273d6b1..1c91645 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
@@ -76,52 +76,7 @@ public class IcebergSourceFlatParquetDataFilterBenchmark extends IcebergSourceFl
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
- Dataset<Row> df = spark().read().format("iceberg")
- .option("iceberg.read.numrecordsperbatch", "10000")
- .load(tableLocation).filter(FILTER_COND);
- materialize(df);
- });
- }
-
-
- @Benchmark
- @Threads(1)
- public void readWithFilterIceberg100k() {
- Map<String, String> tableProperties = Maps.newHashMap();
- tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
- withTableProperties(tableProperties, () -> {
- String tableLocation = table().location();
- Dataset<Row> df = spark().read().format("iceberg")
- .option("iceberg.read.numrecordsperbatch", "100000")
- .load(tableLocation).filter(FILTER_COND);
- materialize(df);
- });
- }
-
- @Benchmark
- @Threads(1)
- public void readWithFilterIceberg10k() {
- Map<String, String> tableProperties = Maps.newHashMap();
- tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
- withTableProperties(tableProperties, () -> {
- String tableLocation = table().location();
- Dataset<Row> df = spark().read().format("iceberg")
- .option("iceberg.read.numrecordsperbatch", "10000")
- .load(tableLocation).filter(FILTER_COND);
- materialize(df);
- });
- }
-
- @Benchmark
- @Threads(1)
- public void readWithFilterIceberg5k() {
- Map<String, String> tableProperties = Maps.newHashMap();
- tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
- withTableProperties(tableProperties, () -> {
- String tableLocation = table().location();
- Dataset<Row> df = spark().read().format("iceberg")
- .option("iceberg.read.numrecordsperbatch", "5000")
- .load(tableLocation).filter(FILTER_COND);
+ Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).filter(FILTER_COND);
materialize(df);
});
}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
similarity index 62%
copy from spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
copy to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
index 273d6b1..37a6336 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
@@ -37,25 +37,20 @@ import static org.apache.spark.sql.functions.date_add;
import static org.apache.spark.sql.functions.expr;
/**
- * A benchmark that evaluates the file skipping capabilities in the Spark data source for Iceberg.
- *
- * This class uses a dataset with a flat schema, where the records are clustered according to the
- * column used in the filter predicate.
- *
- * The performance is compared to the built-in file source in Spark.
+ * A benchmark that evaluates the performance of reading Parquet data with a flat schema
+ * using Iceberg and the built-in file source in Spark.
*
* To run this benchmark:
* <code>
* ./gradlew :iceberg-spark:jmh
- * -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
- * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
+ * -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
* </code>
*/
-public class IcebergSourceFlatParquetDataFilterBenchmark extends IcebergSourceFlatDataBenchmark {
+public class IcebergSourceFlatParquetDataReadBenchmark extends IcebergSourceFlatDataBenchmark {
- private static final String FILTER_COND = "dateCol == date_add(current_date(), 1)";
- private static final int NUM_FILES = 500;
- private static final int NUM_ROWS = 10000;
+ private static final int NUM_FILES = 10;
+ private static final int NUM_ROWS = 1000000;
@Setup
public void setupBenchmark() {
@@ -71,87 +66,78 @@ public class IcebergSourceFlatParquetDataFilterBenchmark extends IcebergSourceFl
@Benchmark
@Threads(1)
- public void readWithFilterIceberg() {
+ public void readIceberg() {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
- Dataset<Row> df = spark().read().format("iceberg")
- .option("iceberg.read.numrecordsperbatch", "10000")
- .load(tableLocation).filter(FILTER_COND);
+ Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
materialize(df);
});
}
-
@Benchmark
@Threads(1)
- public void readWithFilterIceberg100k() {
- Map<String, String> tableProperties = Maps.newHashMap();
- tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
- withTableProperties(tableProperties, () -> {
- String tableLocation = table().location();
- Dataset<Row> df = spark().read().format("iceberg")
- .option("iceberg.read.numrecordsperbatch", "100000")
- .load(tableLocation).filter(FILTER_COND);
+ public void readFileSourceVectorized() {
+ Map<String, String> conf = Maps.newHashMap();
+ conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
+ conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
+ withSQLConf(conf, () -> {
+ Dataset<Row> df = spark().read().parquet(dataLocation());
materialize(df);
});
}
@Benchmark
@Threads(1)
- public void readWithFilterIceberg10k() {
- Map<String, String> tableProperties = Maps.newHashMap();
- tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
- withTableProperties(tableProperties, () -> {
- String tableLocation = table().location();
- Dataset<Row> df = spark().read().format("iceberg")
- .option("iceberg.read.numrecordsperbatch", "10000")
- .load(tableLocation).filter(FILTER_COND);
+ public void readFileSourceNonVectorized() {
+ Map<String, String> conf = Maps.newHashMap();
+ conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
+ conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
+ withSQLConf(conf, () -> {
+ Dataset<Row> df = spark().read().parquet(dataLocation());
materialize(df);
});
}
@Benchmark
@Threads(1)
- public void readWithFilterIceberg5k() {
+ public void readWithProjectionIceberg() {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
- Dataset<Row> df = spark().read().format("iceberg")
- .option("iceberg.read.numrecordsperbatch", "5000")
- .load(tableLocation).filter(FILTER_COND);
+ Dataset<Row> df = spark().read().format("iceberg").load(tableLocation).select("longCol");
materialize(df);
});
}
@Benchmark
@Threads(1)
- public void readWithFilterFileSourceVectorized() {
+ public void readWithProjectionFileSourceVectorized() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true");
conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
withSQLConf(conf, () -> {
- Dataset<Row> df = spark().read().parquet(dataLocation()).filter(FILTER_COND);
+ Dataset<Row> df = spark().read().parquet(dataLocation()).select("longCol");
materialize(df);
});
}
@Benchmark
@Threads(1)
- public void readWithFilterFileSourceNonVectorized() {
+ public void readWithProjectionFileSourceNonVectorized() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false");
conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024));
withSQLConf(conf, () -> {
- Dataset<Row> df = spark().read().parquet(dataLocation()).filter(FILTER_COND);
+ Dataset<Row> df = spark().read().parquet(dataLocation()).select("longCol");
materialize(df);
});
}
private void appendData() {
- for (int fileNum = 1; fileNum < NUM_FILES; fileNum++) {
+ for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
Dataset<Row> df = spark().range(NUM_ROWS)
.withColumnRenamed("id", "longCol")
.withColumn("intCol", expr("CAST(longCol AS INT)"))
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
index 3a886c1..ab62f53 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
@@ -32,6 +32,8 @@ import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
+import static org.apache.spark.sql.functions.expr;
+
/**
* A benchmark that evaluates the performance of writing Parquet data with a flat schema
* using Iceberg and the built-in file source in Spark.
@@ -76,13 +78,13 @@ public class IcebergSourceFlatParquetDataWriteBenchmark extends IcebergSourceFla
private Dataset<Row> benchmarkData() {
return spark().range(NUM_ROWS)
.withColumnRenamed("id", "longCol")
- // .withColumn("intCol", expr("CAST(longCol AS INT)"))
- // .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
- // .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
- // .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
- // .withColumn("dateCol", expr("DATE_ADD(CURRENT_DATE(), (longCol % 20))"))
- // .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
- // .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
+ .withColumn("intCol", expr("CAST(longCol AS INT)"))
+ .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+ .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+ .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
+ .withColumn("dateCol", expr("DATE_ADD(CURRENT_DATE(), (longCol % 20))"))
+ .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+ .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
.coalesce(1);
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index e8cc083..9a36266 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -118,10 +118,6 @@ public class SparkParquetReaders {
this.type = type;
}
- protected MessageType getType() {
- return type;
- }
-
@Override
public ParquetValueReader<?> message(Types.StructType expected, MessageType message,
List<ParquetValueReader<?>> fieldReaders) {
@@ -364,7 +360,7 @@ public class SparkParquetReaders {
}
}
- protected static class StringReader extends PrimitiveReader<UTF8String> {
+ private static class StringReader extends PrimitiveReader<UTF8String> {
StringReader(ColumnDescriptor desc) {
super(desc);
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index e8b91a8..f97de81 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -85,7 +85,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
@Override
public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct, SaveMode mode,
- DataSourceOptions options) {
+ DataSourceOptions options) {
Preconditions.checkArgument(mode == SaveMode.Append || mode == SaveMode.Overwrite,
"Save mode %s is not supported", mode);
Configuration conf = new Configuration(lazyBaseConf());
@@ -100,7 +100,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
@Override
public StreamWriter createStreamWriter(String runId, StructType dsStruct,
- OutputMode mode, DataSourceOptions options) {
+ OutputMode mode, DataSourceOptions options) {
Preconditions.checkArgument(
mode == OutputMode.Append() || mode == OutputMode.Complete(),
"Output mode %s is not supported", mode);
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index fe7811e..ec51e9c 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -101,10 +101,10 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters,
SupportsPushDownRequiredColumns,
SupportsReportStatistics {
+ private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
private static final Filter[] NO_FILTERS = new Filter[0];
- private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
private final Table table;
private final Long snapshotId;
private final Long asOfTimestamp;
@@ -395,7 +395,7 @@ class Reader implements DataSourceReader,
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo,
- encryptionManager, caseSensitive);
+ encryptionManager, caseSensitive);
}
private Schema lazyTableSchema() {
@@ -431,7 +431,7 @@ class Reader implements DataSourceReader,
private InternalRow current = null;
TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo,
- EncryptionManager encryptionManager, boolean caseSensitive) {
+ EncryptionManager encryptionManager, boolean caseSensitive) {
this.fileIo = fileIo;
this.tasks = task.files().iterator();
this.tableSchema = tableSchema;
@@ -585,8 +585,8 @@ class Reader implements DataSourceReader,
}
private CloseableIterable<InternalRow> newAvroIterable(InputFile location,
- FileScanTask task,
- Schema readSchema) {
+ FileScanTask task,
+ Schema readSchema) {
return Avro.read(location)
.reuseContainers()
.project(readSchema)
@@ -596,8 +596,8 @@ class Reader implements DataSourceReader,
}
private CloseableIterable<InternalRow> newParquetIterable(InputFile location,
- FileScanTask task,
- Schema readSchema) {
+ FileScanTask task,
+ Schema readSchema) {
return Parquet.read(location)
.project(readSchema)
.split(task.start(), task.length())
@@ -608,8 +608,8 @@ class Reader implements DataSourceReader,
}
private CloseableIterable<InternalRow> newOrcIterable(InputFile location,
- FileScanTask task,
- Schema readSchema) {
+ FileScanTask task,
+ Schema readSchema) {
return ORC.read(location)
.schema(readSchema)
.split(task.start(), task.length())
@@ -627,7 +627,7 @@ class Reader implements DataSourceReader,
}
}
- public static class PartitionRowConverter implements Function<StructLike, InternalRow> {
+ private static class PartitionRowConverter implements Function<StructLike, InternalRow> {
private final DataType[] types;
private final int[] positions;
private final Class<?>[] javaTypes;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 111cbd3..f20bffc 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -257,8 +257,8 @@ class Writer implements DataSourceWriter {
private final Schema dsSchema;
WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
- Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
- long targetFileSize, Schema dsSchema) {
+ Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
+ long targetFileSize, Schema dsSchema) {
this.spec = spec;
this.format = format;
this.locations = locations;
@@ -372,7 +372,7 @@ class Writer implements DataSourceWriter {
private long currentRows = 0;
BaseWriter(PartitionSpec spec, FileFormat format, AppenderFactory<InternalRow> appenderFactory,
- WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
+ WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
this.spec = spec;
this.format = format;
this.appenderFactory = appenderFactory;
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index 2ffc81f..5458c1e 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -59,13 +59,13 @@ object SparkTableUtil {
}
/**
- * Returns a DataFrame with a row for each partition that matches the specified 'expression'.
- *
- * @param spark a Spark session.
- * @param table name of the table.
- * @param expression The expression whose matching partitions are returned.
- * @return a DataFrame of the table partitions.
- */
+ * Returns a DataFrame with a row for each partition that matches the specified 'expression'.
+ *
+ * @param spark a Spark session.
+ * @param table name of the table.
+ * @param expression The expression whose matching partitions are returned.
+ * @return a DataFrame of the table partitions.
+ */
def partitionDFByFilter(spark: SparkSession, table: String, expression: String): DataFrame = {
import spark.implicits._
@@ -125,9 +125,9 @@ object SparkTableUtil {
* @return matching table's partitions
*/
def getPartitionsByFilter(
- spark: SparkSession,
- tableIdent: TableIdentifier,
- predicateExpr: Expression): Seq[SparkPartition] = {
+ spark: SparkSession,
+ tableIdent: TableIdentifier,
+ predicateExpr: Expression): Seq[SparkPartition] = {
val catalog = spark.sessionState.catalog
val catalogTable = catalog.getTableMetadata(tableIdent)
@@ -155,9 +155,9 @@ object SparkTableUtil {
* @return a Seq of [[SparkDataFile]]
*/
def listPartition(
- partition: SparkPartition,
- conf: SerializableConfiguration,
- metricsConfig: MetricsConfig): Seq[SparkDataFile] = {
+ partition: SparkPartition,
+ conf: SerializableConfiguration,
+ metricsConfig: MetricsConfig): Seq[SparkDataFile] = {
listPartition(partition.values, partition.uri, partition.format, conf.get(), metricsConfig)
}
@@ -176,11 +176,11 @@ object SparkTableUtil {
* @return a seq of [[SparkDataFile]]
*/
def listPartition(
- partition: Map[String, String],
- uri: String,
- format: String,
- conf: Configuration = new Configuration(),
- metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[SparkDataFile] = {
+ partition: Map[String, String],
+ uri: String,
+ format: String,
+ conf: Configuration = new Configuration(),
+ metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[SparkDataFile] = {
if (format.contains("avro")) {
listAvroPartition(partition, uri, conf)
@@ -203,18 +203,18 @@ object SparkTableUtil {
* Case class representing a data file.
*/
case class SparkDataFile(
- path: String,
- partition: collection.Map[String, String],
- format: String,
- fileSize: Long,
- rowGroupSize: Long,
- rowCount: Long,
- columnSizes: Array[Long],
- valueCounts: Array[Long],
- nullValueCounts: Array[Long],
- lowerBounds: Seq[Array[Byte]],
- upperBounds: Seq[Array[Byte]]
- ) {
+ path: String,
+ partition: collection.Map[String, String],
+ format: String,
+ fileSize: Long,
+ rowGroupSize: Long,
+ rowCount: Long,
+ columnSizes: Array[Long],
+ valueCounts: Array[Long],
+ nullValueCounts: Array[Long],
+ lowerBounds: Seq[Array[Byte]],
+ upperBounds: Seq[Array[Byte]]
+ ) {
/**
* Convert this to a [[DataFile]] that can be added to a [[org.apache.iceberg.Table]].
@@ -255,7 +255,7 @@ object SparkTableUtil {
val copy = if (buffer.hasArray) {
val bytes = buffer.array()
if (buffer.arrayOffset() == 0 && buffer.position() == 0 &&
- bytes.length == buffer.remaining()) {
+ bytes.length == buffer.remaining()) {
bytes
} else {
val start = buffer.arrayOffset() + buffer.position()
@@ -326,9 +326,9 @@ object SparkTableUtil {
}
private def listAvroPartition(
- partitionPath: Map[String, String],
- partitionUri: String,
- conf: Configuration): Seq[SparkDataFile] = {
+ partitionPath: Map[String, String],
+ partitionUri: String,
+ conf: Configuration): Seq[SparkDataFile] = {
val partition = new Path(partitionUri)
val fs = partition.getFileSystem(conf)
@@ -348,10 +348,10 @@ object SparkTableUtil {
//noinspection ScalaDeprecation
private def listParquetPartition(
- partitionPath: Map[String, String],
- partitionUri: String,
- conf: Configuration,
- metricsSpec: MetricsConfig): Seq[SparkDataFile] = {
+ partitionPath: Map[String, String],
+ partitionUri: String,
+ conf: Configuration,
+ metricsSpec: MetricsConfig): Seq[SparkDataFile] = {
val partition = new Path(partitionUri)
val fs = partition.getFileSystem(conf)
@@ -372,9 +372,9 @@ object SparkTableUtil {
}
private def listOrcPartition(
- partitionPath: Map[String, String],
- partitionUri: String,
- conf: Configuration): Seq[SparkDataFile] = {
+ partitionPath: Map[String, String],
+ partitionUri: String,
+ conf: Configuration): Seq[SparkDataFile] = {
val partition = new Path(partitionUri)
val fs = partition.getFileSystem(conf)
@@ -418,9 +418,9 @@ object SparkTableUtil {
}
private def buildManifest(
- conf: SerializableConfiguration,
- spec: PartitionSpec,
- basePath: String): Iterator[SparkDataFile] => Iterator[Manifest] = { files =>
+ conf: SerializableConfiguration,
+ spec: PartitionSpec,
+ basePath: String): Iterator[SparkDataFile] => Iterator[Manifest] = { files =>
if (files.hasNext) {
val io = new HadoopFileIO(conf.get())
val ctx = TaskContext.get()
@@ -477,10 +477,10 @@ object SparkTableUtil {
* @param stagingDir a staging directory to store temporary manifest files
*/
def importSparkTable(
- spark: SparkSession,
- sourceTableIdent: TableIdentifier,
- targetTable: Table,
- stagingDir: String): Unit = {
+ spark: SparkSession,
+ sourceTableIdent: TableIdentifier,
+ targetTable: Table,
+ stagingDir: String): Unit = {
val catalog = spark.sessionState.catalog
@@ -502,9 +502,9 @@ object SparkTableUtil {
}
private def importUnpartitionedSparkTable(
- spark: SparkSession,
- sourceTableIdent: TableIdentifier,
- targetTable: Table): Unit = {
+ spark: SparkSession,
+ sourceTableIdent: TableIdentifier,
+ targetTable: Table): Unit = {
val sourceTable = spark.sessionState.catalog.getTableMetadata(sourceTableIdent)
val format = sourceTable.storage.serde.orElse(sourceTable.provider)
@@ -530,11 +530,11 @@ object SparkTableUtil {
* @param stagingDir a staging directory to store temporary manifest files
*/
def importSparkPartitions(
- spark: SparkSession,
- partitions: Seq[SparkPartition],
- targetTable: Table,
- spec: PartitionSpec,
- stagingDir: String): Unit = {
+ spark: SparkSession,
+ partitions: Seq[SparkPartition],
+ targetTable: Table,
+ spec: PartitionSpec,
+ stagingDir: String): Unit = {
import spark.implicits._
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
index 3c9d7b6..7235807 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
@@ -55,9 +55,7 @@ public class RandomData {
RandomDataGenerator generator = new RandomDataGenerator(schema, seed);
List<Record> records = Lists.newArrayListWithExpectedSize(numRecords);
for (int i = 0; i < numRecords; i += 1) {
- Record rec = (Record) TypeUtil.visit(schema, generator);
- //System.out.println("Add record "+rec);
- records.add(rec);
+ records.add((Record) TypeUtil.visit(schema, generator));
}
return records;
@@ -469,6 +467,7 @@ public class RandomData {
for (int i = 0; i < length; i += 1) {
buffer[i] = (byte) CHARS.charAt(random.nextInt(CHARS.length()));
}
+
return UTF8String.fromBytes(buffer);
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 03d093c..da182b9 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -68,8 +68,7 @@ import static scala.collection.JavaConverters.seqAsJavaListConverter;
@SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
public class TestHelpers {
- private TestHelpers() {
- }
+ private TestHelpers() {}
public static void assertEqualsSafe(Types.StructType struct, Record rec, Row row) {
List<Types.NestedField> fields = struct.fields();
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
index bafbec4..1466dea 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
@@ -104,7 +104,7 @@ public class TestParquetAvroReader {
.project(structSchema)
.createReaderFunc(
fileSchema -> ParquetAvroValueReaders.buildReader(structSchema, readSchema))
- .build()) {
+ .build()) {
long start = System.currentTimeMillis();
long val = 0;
long count = 0;
@@ -164,10 +164,10 @@ public class TestParquetAvroReader {
System.gc();
try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
- .project(COMPLEX_SCHEMA)
- .createReaderFunc(
- fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
- .build()) {
+ .project(COMPLEX_SCHEMA)
+ .createReaderFunc(
+ fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
+ .build()) {
long start = System.currentTimeMillis();
long val = 0;
long count = 0;
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
index ba4f53a..0e97c37 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
@@ -19,9 +19,21 @@
package org.apache.iceberg.spark.data;
+import java.io.File;
import java.io.IOException;
+import java.util.Iterator;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetAvroValueReaders;
+import org.apache.iceberg.parquet.ParquetAvroWriter;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -62,34 +74,34 @@ public class TestParquetAvroWriter {
@Test
public void testCorrectness() throws IOException {
- // Iterable<Record> records = RandomData.generate(COMPLEX_SCHEMA, 250_000, 34139);
- //
- // File testFile = temp.newFile();
- // Assert.assertTrue("Delete should succeed", testFile.delete());
- //
- // try (FileAppender<Record> writer = Parquet.write(Files.localOutput(testFile))
- // .schema(COMPLEX_SCHEMA)
- // .createWriterFunc(ParquetAvroWriter::buildWriter)
- // .build()) {
- // writer.addAll(records);
- // }
- //
- // // RandomData uses the root record name "test", which must match for records to be equal
- // MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");
- //
- // // verify that the new read path is correct
- // try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
- // .project(COMPLEX_SCHEMA)
- // .createReaderFunc(
- // fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
- // .build()) {
- // int recordNum = 0;
- // Iterator<Record> iter = records.iterator();
- // for (Record actual : reader) {
- // Record expected = iter.next();
- // Assert.assertEquals("Record " + recordNum + " should match expected", expected, actual);
- // recordNum += 1;
- // }
- // }
+ Iterable<Record> records = RandomData.generate(COMPLEX_SCHEMA, 250_000, 34139);
+
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ try (FileAppender<Record> writer = Parquet.write(Files.localOutput(testFile))
+ .schema(COMPLEX_SCHEMA)
+ .createWriterFunc(ParquetAvroWriter::buildWriter)
+ .build()) {
+ writer.addAll(records);
+ }
+
+ // RandomData uses the root record name "test", which must match for records to be equal
+ MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");
+
+ // verify that the new read path is correct
+ try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
+ .project(COMPLEX_SCHEMA)
+ .createReaderFunc(
+ fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
+ .build()) {
+ int recordNum = 0;
+ Iterator<Record> iter = records.iterator();
+ for (Record actual : reader) {
+ Record expected = iter.next();
+ Assert.assertEquals("Record " + recordNum + " should match expected", expected, actual);
+ recordNum += 1;
+ }
+ }
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
index ace908e..5e22cce 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
@@ -19,37 +19,52 @@
package org.apache.iceberg.spark.data;
+import java.io.File;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.Assert;
+import org.junit.Assume;
+
+import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
public class TestSparkParquetReader extends AvroDataTest {
@Override
protected void writeAndValidate(Schema schema) throws IOException {
- // Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
- // type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
- //
- // List<GenericData.Record> expected = RandomData.generateList(schema, 100, 0L);
- //
- // File testFile = temp.newFile();
- // Assert.assertTrue("Delete should succeed", testFile.delete());
- //
- // try (FileAppender<GenericData.Record> writer = Parquet.write(Files.localOutput(testFile))
- // .schema(schema)
- // .named("test")
- // .build()) {
- // writer.addAll(expected);
- // }
- //
- // try (CloseableIterable<InternalRow> reader = Parquet.read(Files.localInput(testFile))
- // .project(schema)
- // .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
- // .build()) {
- // Iterator<InternalRow> rows = reader.iterator();
- // for (int i = 0; i < expected.size(); i += 1) {
- // Assert.assertTrue("Should have expected number of rows", rows.hasNext());
- // assertEqualsUnsafe(schema.asStruct(), expected.get(i), rows.next());
- // }
- // Assert.assertFalse("Should not have extra rows", rows.hasNext());
- // }
+ Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema,
+ type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get()));
+
+ List<GenericData.Record> expected = RandomData.generateList(schema, 100, 0L);
+
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ try (FileAppender<GenericData.Record> writer = Parquet.write(Files.localOutput(testFile))
+ .schema(schema)
+ .named("test")
+ .build()) {
+ writer.addAll(expected);
+ }
+
+ try (CloseableIterable<InternalRow> reader = Parquet.read(Files.localInput(testFile))
+ .project(schema)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
+ .build()) {
+ Iterator<InternalRow> rows = reader.iterator();
+ for (int i = 0; i < expected.size(); i += 1) {
+ Assert.assertTrue("Should have expected number of rows", rows.hasNext());
+ assertEqualsUnsafe(schema.asStruct(), expected.get(i), rows.next());
+ }
+ Assert.assertFalse("Should not have extra rows", rows.hasNext());
+ }
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
index 5688596..00f95f3 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
@@ -19,9 +19,19 @@
package org.apache.iceberg.spark.data;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.Assert;
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.apache.iceberg.types.Types.NestedField.optional;
@@ -58,32 +68,32 @@ public class TestSparkParquetWriter {
optional(2, "slide", Types.StringType.get())
);
- // @Test
- // public void testCorrectness() throws IOException {
- // int numRows = 250_000;
- // Iterable<InternalRow> records = RandomData.generateSpark(COMPLEX_SCHEMA, numRows, 19981);
- //
- // File testFile = temp.newFile();
- // Assert.assertTrue("Delete should succeed", testFile.delete());
- //
- // try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(testFile))
- // .schema(COMPLEX_SCHEMA)
- // .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(COMPLEX_SCHEMA, msgType))
- // .build()) {
- // writer.addAll(records);
- // }
- //
- // try (CloseableIterable<InternalRow> reader = Parquet.read(Files.localInput(testFile))
- // .project(COMPLEX_SCHEMA)
- // .createReaderFunc(type -> SparkParquetReaders.buildReader(COMPLEX_SCHEMA, type))
- // .build()) {
- // Iterator<InternalRow> expected = records.iterator();
- // Iterator<InternalRow> rows = reader.iterator();
- // for (int i = 0; i < numRows; i += 1) {
- // Assert.assertTrue("Should have expected number of rows", rows.hasNext());
- // TestHelpers.assertEquals(COMPLEX_SCHEMA, expected.next(), rows.next());
- // }
- // Assert.assertFalse("Should not have extra rows", rows.hasNext());
- // }
- // }
+ @Test
+ public void testCorrectness() throws IOException {
+ int numRows = 250_000;
+ Iterable<InternalRow> records = RandomData.generateSpark(COMPLEX_SCHEMA, numRows, 19981);
+
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(testFile))
+ .schema(COMPLEX_SCHEMA)
+ .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(COMPLEX_SCHEMA, msgType))
+ .build()) {
+ writer.addAll(records);
+ }
+
+ try (CloseableIterable<InternalRow> reader = Parquet.read(Files.localInput(testFile))
+ .project(COMPLEX_SCHEMA)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(COMPLEX_SCHEMA, type))
+ .build()) {
+ Iterator<InternalRow> expected = records.iterator();
+ Iterator<InternalRow> rows = reader.iterator();
+ for (int i = 0; i < numRows; i += 1) {
+ Assert.assertTrue("Should have expected number of rows", rows.hasNext());
+ TestHelpers.assertEquals(COMPLEX_SCHEMA, expected.next(), rows.next());
+ }
+ Assert.assertFalse("Should not have extra rows", rows.hasNext());
+ }
+ }
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
index 13d5a21..c935bfc 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
@@ -67,11 +67,11 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
// Create a spark session.
TestSparkTableUtil.spark = SparkSession.builder().master("local[2]")
- .enableHiveSupport()
- .config("spark.hadoop.hive.metastore.uris", metastoreURI)
- .config("hive.exec.dynamic.partition", "true")
- .config("hive.exec.dynamic.partition.mode", "nonstrict")
- .getOrCreate();
+ .enableHiveSupport()
+ .config("spark.hadoop.hive.metastore.uris", metastoreURI)
+ .config("hive.exec.dynamic.partition", "true")
+ .config("hive.exec.dynamic.partition.mode", "nonstrict")
+ .getOrCreate();
}
@AfterClass
@@ -89,24 +89,24 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
SQLContext sc = new SQLContext(TestSparkTableUtil.spark);
sc.sql(String.format(
- "CREATE TABLE %s (\n" +
- " id int COMMENT 'unique id'\n" +
- ")\n" +
- " PARTITIONED BY (data string)\n" +
- " LOCATION '%s'", qualifiedTableName, tableLocationStr)
+ "CREATE TABLE %s (\n" +
+ " id int COMMENT 'unique id'\n" +
+ ")\n" +
+ " PARTITIONED BY (data string)\n" +
+ " LOCATION '%s'", qualifiedTableName, tableLocationStr)
);
List<SimpleRecord> expected = Lists.newArrayList(
- new SimpleRecord(1, "a"),
- new SimpleRecord(2, "b"),
- new SimpleRecord(3, "c")
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
);
Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
df.select("id", "data").orderBy("data").write()
- .mode("append")
- .insertInto(qualifiedTableName);
+ .mode("append")
+ .insertInto(qualifiedTableName);
}
@After
@@ -141,14 +141,14 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
public void testImportPartitionedTable() throws Exception {
File location = temp.newFolder("partitioned_table");
spark.table(qualifiedTableName).write().mode("overwrite").partitionBy("data").format("parquet")
- .saveAsTable("test_partitioned_table");
+ .saveAsTable("test_partitioned_table");
TableIdentifier source = spark.sessionState().sqlParser()
- .parseTableIdentifier("test_partitioned_table");
+ .parseTableIdentifier("test_partitioned_table");
HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
Table table = tables.create(SparkSchemaUtil.schemaForTable(spark, qualifiedTableName),
- SparkSchemaUtil.specForTable(spark, qualifiedTableName),
- ImmutableMap.of(),
- location.getCanonicalPath());
+ SparkSchemaUtil.specForTable(spark, qualifiedTableName),
+ ImmutableMap.of(),
+ location.getCanonicalPath());
File stagingDir = temp.newFolder("staging-dir");
SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString());
long count = spark.read().format("iceberg").load(location.toString()).count();
@@ -159,14 +159,14 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
public void testImportUnpartitionedTable() throws Exception {
File location = temp.newFolder("unpartitioned_table");
spark.table(qualifiedTableName).write().mode("overwrite").format("parquet")
- .saveAsTable("test_unpartitioned_table");
+ .saveAsTable("test_unpartitioned_table");
TableIdentifier source = spark.sessionState().sqlParser()
- .parseTableIdentifier("test_unpartitioned_table");
+ .parseTableIdentifier("test_unpartitioned_table");
HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
Table table = tables.create(SparkSchemaUtil.schemaForTable(spark, qualifiedTableName),
- SparkSchemaUtil.specForTable(spark, qualifiedTableName),
- ImmutableMap.of(),
- location.getCanonicalPath());
+ SparkSchemaUtil.specForTable(spark, qualifiedTableName),
+ ImmutableMap.of(),
+ location.getCanonicalPath());
File stagingDir = temp.newFolder("staging-dir");
SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString());
long count = spark.read().format("iceberg").load(location.toString()).count();
@@ -176,24 +176,24 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
@Test
public void testImportAsHiveTable() throws Exception {
spark.table(qualifiedTableName).write().mode("overwrite").format("parquet")
- .saveAsTable("unpartitioned_table");
+ .saveAsTable("unpartitioned_table");
TableIdentifier source = new TableIdentifier("unpartitioned_table");
Table table = catalog.createTable(
- org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "test_unpartitioned_table"),
- SparkSchemaUtil.schemaForTable(spark, "unpartitioned_table"),
- SparkSchemaUtil.specForTable(spark, "unpartitioned_table"));
+ org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "test_unpartitioned_table"),
+ SparkSchemaUtil.schemaForTable(spark, "unpartitioned_table"),
+ SparkSchemaUtil.specForTable(spark, "unpartitioned_table"));
File stagingDir = temp.newFolder("staging-dir");
SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString());
long count1 = spark.read().format("iceberg").load(DB_NAME + ".test_unpartitioned_table").count();
Assert.assertEquals("three values ", 3, count1);
spark.table(qualifiedTableName).write().mode("overwrite").partitionBy("data").format("parquet")
- .saveAsTable("partitioned_table");
+ .saveAsTable("partitioned_table");
source = new TableIdentifier("partitioned_table");
table = catalog.createTable(
- org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "test_partitioned_table"),
- SparkSchemaUtil.schemaForTable(spark, "partitioned_table"),
- SparkSchemaUtil.specForTable(spark, "partitioned_table"));
+ org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "test_partitioned_table"),
+ SparkSchemaUtil.schemaForTable(spark, "partitioned_table"),
+ SparkSchemaUtil.specForTable(spark, "partitioned_table"));
SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString());
long count2 = spark.read().format("iceberg").load(DB_NAME + ".test_partitioned_table").count();