You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by rd...@apache.org on 2020/05/22 14:54:24 UTC

[incubator-iceberg] 05/05: Resolve conflicts

This is an automated email from the ASF dual-hosted git repository.

rdsr pushed a commit to branch orc_nested_partition
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git

commit 743cab88b7e11c53aaca500640c7ca96ec1e5ee9
Author: Ratandeep Ratt <rr...@linkedin.com>
AuthorDate: Fri May 22 07:52:49 2020 -0700

    Resolve conflicts
---
 .../apache/iceberg/spark/source/RowDataReader.java | 27 ++--------------------
 1 file changed, 2 insertions(+), 25 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 05802ff..a025e34 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.spark.source;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -33,7 +32,6 @@ import org.apache.avro.util.Utf8;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataTask;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -59,7 +57,6 @@ import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.catalyst.expressions.AttributeReference;
-import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.StructType;
@@ -67,8 +64,6 @@ import org.apache.spark.unsafe.types.UTF8String;
 import scala.collection.JavaConverters;
 
 class RowDataReader extends BaseDataReader<InternalRow> {
-  private static final Set<FileFormat> SUPPORTS_CONSTANTS = Sets.newHashSet(
-      FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC);
   // for some reason, the apply method can't be called from Java without reflection
   private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
       .impl(UnsafeProjection.class, InternalRow.class)
@@ -101,27 +96,9 @@ class RowDataReader extends BaseDataReader<InternalRow> {
     boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
 
     if (projectsIdentityPartitionColumns) {
-      if (SUPPORTS_CONSTANTS.contains(file.format())) {
-        return open(task, expectedSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant))
-            .iterator();
-      }
-
-      // schema used to read data files
-      Schema readSchema = TypeUtil.selectNot(expectedSchema, idColumns);
-      PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
-      JoinedRow joined = new JoinedRow();
-
-      // create joined rows and project from the joined schema to the final schema
-      Schema joinedSchema = TypeUtil.join(readSchema, partitionSchema);
-      InternalRow partition = convertToRow.apply(file.partition());
-      joined.withRight(partition);
-
-      CloseableIterable<InternalRow> transformedIterable = CloseableIterable.transform(
-          CloseableIterable.transform(open(task, readSchema, ImmutableMap.of()), joined::withLeft),
-          APPLY_PROJECTION.bind(projection(expectedSchema, joinedSchema))::invoke);
-      return transformedIterable.iterator();
+      return open(task, expectedSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant))
+          .iterator();
     }
-
     // return the base iterator
     return open(task, expectedSchema, ImmutableMap.of()).iterator();
   }