You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/12/15 00:16:20 UTC

[GitHub] [iceberg] kbendick commented on a change in pull request #3745: Push down partition filter to Spark when Importing File Based Tables

kbendick commented on a change in pull request #3745:
URL: https://github.com/apache/iceberg/pull/3745#discussion_r769142116



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -766,9 +774,22 @@ public static TableIdentifier identifierToTableIdentifier(Identifier identifier)
 
     org.apache.spark.sql.execution.datasources.PartitionSpec spec = fileIndex.partitionSpec();
     StructType schema = spec.partitionColumns();
+    if (schema.isEmpty()) {
+      return new ArrayList<>();
+    }
+
+    List<org.apache.spark.sql.catalyst.expressions.Expression> filterExpressions =
+        getPartitionFilterExpressions(schema, partitionFilter);
+
+    List<org.apache.spark.sql.catalyst.expressions.Expression> dataFilters = new java.util.ArrayList<>();

Review comment:
       Nit: Same note about avoiding `new ArrayList<>` in favor of one of the ones mentioned above 👍 

##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -779,10 +800,54 @@ public static TableIdentifier identifierToTableIdentifier(Identifier identifier)
             Object value = CatalystTypeConverters.convertToScala(catalystValue, field.dataType());
             values.put(field.name(), value.toString());
           });
-          return new SparkPartition(values, partition.path().toString(), format);
+          FileStatus fileStatus =
+              scala.collection.JavaConverters.seqAsJavaListConverter(partition.files()).asJava().get(0);
+
+          return new SparkPartition(values, fileStatus.getPath().getParent().toString(), format);
         }).collect(Collectors.toList());
   }
 
+  private static List getPartitionFilterExpressions(StructType schema,
+                                                    Map<String, String> partitionFilter) {
+    List<org.apache.spark.sql.catalyst.expressions.Expression> filterExpressions = new java.util.ArrayList<>();
+    for (Map.Entry<String, String> entry : partitionFilter.entrySet()) {
+      try {
+        // IllegalArgumentException is thrown if schema doesn't contain this entry,
+        // which means partition filter is not on partition columns.
+        int index = schema.fieldIndex(entry.getKey());
+        org.apache.spark.sql.types.DataType dataType = schema.fields()[index].dataType();
+        BoundReference ref = new BoundReference(index, dataType, true);
+        if (dataType.sameType(DataTypes.IntegerType) || dataType.sameType(DataTypes.ShortType) ||
+            dataType.sameType(DataTypes.ByteType)) {
+          filterExpressions.add(new EqualTo(ref,
+              org.apache.spark.sql.catalyst.expressions.Literal.create(Integer.parseInt(entry.getValue()),
+              DataTypes.IntegerType)));
+        } else if (schema.fields()[index].dataType().sameType(DataTypes.StringType)) {
+          filterExpressions.add(new EqualTo(ref,
+              org.apache.spark.sql.catalyst.expressions.Literal.create(entry.getValue(), DataTypes.StringType)));
+        } else if (schema.fields()[index].dataType().sameType(DataTypes.LongType)) {
+          filterExpressions.add(new EqualTo(ref,
+              org.apache.spark.sql.catalyst.expressions.Literal.create(Long.parseLong(entry.getValue()),
+              DataTypes.LongType)));
+        } else if (schema.fields()[index].dataType().sameType(DataTypes.DateType)) {
+          filterExpressions.add(new EqualTo(ref,
+              org.apache.spark.sql.catalyst.expressions.Literal.create(entry.getValue(), DataTypes.DateType)));
+        } else if (schema.fields()[index].dataType().sameType(DataTypes.TimestampType)) {
+          filterExpressions.add(new EqualTo(ref,
+              org.apache.spark.sql.catalyst.expressions.Literal.create(entry.getValue(), DataTypes.TimestampType)));
+        } else if (schema.fields()[index].dataType()
+            .sameType(DataTypes.CalendarIntervalType)) {
+          filterExpressions.add(new EqualTo(ref,
+              org.apache.spark.sql.catalyst.expressions.Literal.create(entry.getValue(),
+              DataTypes.CalendarIntervalType)));

Review comment:
       For this large if-else-if chain, you might want to look into this lookup-map pattern used here: https://github.com/apache/iceberg/blob/466073b7d8c23ebeae045822ee6e1a1104a5ed5a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java#L80-L220
   
   I'm not sure if a look-up map can be used here because of the usage of `sameType` function, but it might be worth looking into 😄 

##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -766,9 +774,22 @@ public static TableIdentifier identifierToTableIdentifier(Identifier identifier)
 
     org.apache.spark.sql.execution.datasources.PartitionSpec spec = fileIndex.partitionSpec();
     StructType schema = spec.partitionColumns();
+    if (schema.isEmpty()) {
+      return new ArrayList<>();

Review comment:
       Nit: Can you use `Lists.newArrayList()` or `ImmutableList.empty()` here?
   
   For `Lists`, we use the repackaged internal guava version from `org.apache.iceberg.relocated.com.google.common.collect.Lists`. The same is true for `ImmutableList`, which is already imported.
   
   You could also use `Collections.emptyList()` to be similar to the `emptyMap` above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org