You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by rd...@apache.org on 2020/05/22 14:54:24 UTC
[incubator-iceberg] 05/05: Resolve conflicts
This is an automated email from the ASF dual-hosted git repository.
rdsr pushed a commit to branch orc_nested_partition
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
commit 743cab88b7e11c53aaca500640c7ca96ec1e5ee9
Author: Ratandeep Ratt <rr...@linkedin.com>
AuthorDate: Fri May 22 07:52:49 2020 -0700
Resolve conflicts
---
.../apache/iceberg/spark/source/RowDataReader.java | 27 ++--------------------
1 file changed, 2 insertions(+), 25 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 05802ff..a025e34 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.spark.source;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
@@ -33,7 +32,6 @@ import org.apache.avro.util.Utf8;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTask;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -59,7 +57,6 @@ import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
-import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.StructType;
@@ -67,8 +64,6 @@ import org.apache.spark.unsafe.types.UTF8String;
import scala.collection.JavaConverters;
class RowDataReader extends BaseDataReader<InternalRow> {
- private static final Set<FileFormat> SUPPORTS_CONSTANTS = Sets.newHashSet(
- FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC);
// for some reason, the apply method can't be called from Java without reflection
private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
.impl(UnsafeProjection.class, InternalRow.class)
@@ -101,27 +96,9 @@ class RowDataReader extends BaseDataReader<InternalRow> {
boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
if (projectsIdentityPartitionColumns) {
- if (SUPPORTS_CONSTANTS.contains(file.format())) {
- return open(task, expectedSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant))
- .iterator();
- }
-
- // schema used to read data files
- Schema readSchema = TypeUtil.selectNot(expectedSchema, idColumns);
- PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
- JoinedRow joined = new JoinedRow();
-
- // create joined rows and project from the joined schema to the final schema
- Schema joinedSchema = TypeUtil.join(readSchema, partitionSchema);
- InternalRow partition = convertToRow.apply(file.partition());
- joined.withRight(partition);
-
- CloseableIterable<InternalRow> transformedIterable = CloseableIterable.transform(
- CloseableIterable.transform(open(task, readSchema, ImmutableMap.of()), joined::withLeft),
- APPLY_PROJECTION.bind(projection(expectedSchema, joinedSchema))::invoke);
- return transformedIterable.iterator();
+ return open(task, expectedSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant))
+ .iterator();
}
-
// return the base iterator
return open(task, expectedSchema, ImmutableMap.of()).iterator();
}