You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/09/23 04:58:32 UTC

[GitHub] [iceberg] HeartSaVioR opened a new pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

HeartSaVioR opened a new pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487


   This PR proposes to add E2E tests on partition pruning for both Spark 2.4 and Spark 3. 
   
   This PR tests below queries:
   
   * identity with String type
   * bucket with Integer type
   * truncate with String type
   * hour with Timestamp type
   
   which the table has all of them in partition spec.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#discussion_r494576585



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+  private static JavaSparkContext sparkContext = null;
+
+  private static Transform<Object, Integer> bucketTransform = Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = Transforms.hour(Types.TimestampType.withoutZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate();
+    TestPartitionPruning.sparkContext = new JavaSparkContext(spark.sparkContext());
+
+    String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme);
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set("spark.sql.session.timeZone", "UTC");
+    spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", getInstant("2020-02-02T00:00:00")),
+      LogMessage.info("2020-02-02", "info event 1", getInstant("2020-02-02T01:00:00")),
+      LogMessage.debug("2020-02-02", "debug event 2", getInstant("2020-02-02T02:00:00")),
+      LogMessage.info("2020-02-03", "info event 2", getInstant("2020-02-03T00:00:00")),
+      LogMessage.debug("2020-02-03", "debug event 3", getInstant("2020-02-03T01:00:00")),
+      LogMessage.info("2020-02-03", "info event 3", getInstant("2020-02-03T02:00:00")),
+      LogMessage.error("2020-02-03", "error event 1", getInstant("2020-02-03T03:00:00")),
+      LogMessage.debug("2020-02-04", "debug event 4", getInstant("2020-02-04T01:00:00")),
+      LogMessage.warn("2020-02-04", "warn event 1", getInstant("2020-02-04T02:00:00")),
+      LogMessage.debug("2020-02-04", "debug event 5", getInstant("2020-02-04T03:00:00"))
+  );
+
+  private static Instant getInstant(String timestampWithoutZone) {
+    Long epochMicros = (Long) Literal.of(timestampWithoutZone).to(Types.TimestampType.withoutZone()).value();
+    return Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(epochMicros));
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private PartitionSpec spec = PartitionSpec.builderFor(LOG_SCHEMA)
+      .identity("date")
+      .identity("level")
+      .bucket("id", 3)
+      .truncate("message", 5)
+      .hour("timestamp")
+      .build();
+
+  private Random random = new Random();
+
+  @Test
+  public void testPartitionPruningIdentityString() {
+    String filterCond = "date >= '2020-02-03' AND level = 'DEBUG'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String date = r.getString(0);
+      String level = r.getString(1);
+      return date.compareTo("2020-02-03") >= 0 && level.equals("DEBUG");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningBucketingInteger() {
+    final int[] ids = new int[]{
+        LOGS.get(3).getId(),
+        LOGS.get(7).getId()
+    };
+    String condForIds = Arrays.stream(ids).mapToObj(String::valueOf)
+        .collect(Collectors.joining(",", "(", ")"));
+    String filterCond = "id in " + condForIds;
+    Predicate<Row> partCondition = (Row r) -> {
+      int bucketId = r.getInt(2);
+      Set<Integer> buckets = Arrays.stream(ids).map(bucketTransform::apply)
+          .boxed().collect(Collectors.toSet());
+      return buckets.contains(bucketId);
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningTruncatedString() {
+    String filterCond = "message like 'info event%'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String truncatedMessage = r.getString(3);
+      return truncatedMessage.equals("info ");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningTruncatedStringComparingValueShorterThanPartitionValue() {
+    String filterCond = "message like 'inf%'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String truncatedMessage = r.getString(3);
+      return truncatedMessage.startsWith("inf");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningHourlyPartition() {
+    String filterCond;
+    if (spark.version().startsWith("2")) {
+      // Looks like from Spark 2 we need to compare timestamp with timestamp to push down the filter.
+      filterCond = "timestamp >= to_timestamp('2020-02-03T01:00:00')";
+    } else {
+      filterCond = "timestamp >= '2020-02-03T01:00:00'";
+    }
+    Predicate<Row> partCondition = (Row r) -> {
+      int hourValue = r.getInt(4);
+      Instant instant = getInstant("2020-02-03T01:00:00");
+      Integer hourValueToFilter = hourTransform.apply(TimeUnit.MILLISECONDS.toMicros(instant.toEpochMilli()));
+      return hourValue >= hourValueToFilter;
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  private void runTest(String filterCond, Predicate<Row> partCondition) {
+    File originTableLocation = createTempDir();
+    Assert.assertTrue("Temp folder should exist", originTableLocation.exists());
+
+    Table table = createTable(originTableLocation);
+    Dataset<Row> logs = createTestDataset();
+    saveTestDatasetToTable(logs, table);
+
+    List<Row> expected = logs
+        .select("id", "date", "level", "message", "timestamp")
+        .filter(filterCond)
+        .orderBy("id")
+        .collectAsList();
+    Assert.assertFalse("Expected rows should be not empty", expected.isEmpty());
+
+    // remove records which may be recorded during storing to table
+    CountOpenLocalFileSystem.resetRecordsInPathPrefix(originTableLocation.getAbsolutePath());
+
+    List<Row> actual = spark.read()
+        .format("iceberg")
+        .option("vectorization-enabled", String.valueOf(vectorized))
+        .load(table.location())
+        .select("id", "date", "level", "message", "timestamp")
+        .filter(filterCond)
+        .orderBy("id")
+        .collectAsList();
+    Assert.assertFalse("Actual rows should not be empty", actual.isEmpty());
+
+    Assert.assertEquals("Rows should match", expected, actual);
+
+    assertAccessOnDataFiles(originTableLocation, table, partCondition);
+  }
+
+  private File createTempDir() {
+    try {
+      int rand = random.nextInt(1000000);
+      return temp.newFolder(String.format("logs-%d", rand));

Review comment:
       Why does this create its own random number instead of just getting a new temp folder? I don't see much benefit to doing it this way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on a change in pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#discussion_r494117437



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+
+  private static Transform<Object, Integer> bucketTransform = Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = Transforms.hour(Types.TimestampType.withZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate();
+    String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme);
+
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        (Long) org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", Timestamp.valueOf("2020-02-02 00:00:00")),

Review comment:
       Just dealt with 7429dc0 - not sure I followed your suggestion properly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-704412750


   Thanks for the explanation. My general preference is for tests with a smaller scope to help identify the part that is going wrong, rather than larger end-to-end tests that could fail in lots of places. But `TestFilteredScan` is already exercising more than just the filter pushdown during planning, so both are similar in scope.
   
   I'll merge this and defer to you about how we should maintain these two. Thanks for taking the time to write these.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-698148650


   One of new tests are failing on Spark 2.4 which didn't fail in IDEA. It looks to be only failing with Gradle.
   
   `gradle :iceberg-spark2:test`
   
   ```
   org.apache.iceberg.spark.source.TestPartitionPruning24 > testPartitionPruningBucketingInteger[0] FAILED
       java.lang.AssertionError: Some of data files in partition range should be read. 
   
   Read files in query: [/tmp/junit2992201386166743058/logs-742071/metadata/snap-6042077398117050700-1-c705c4d6-eee7-4155-8959-c9c87c8ee790.avro, /tmp/junit2992201386166743058/logs-742071/metadata/c705c4d6-eee7-4155-8959-c9c87c8ee790-m0.avro, /tmp/junit2992201386166743058/logs-742071/metadata/version-hint.text, /tmp/junit2992201386166743058/logs-742071/metadata/v2.metadata.json] / 
   
   data files in partition range: [/tmp/junit2992201386166743058/logs-742071/data/date=2020-02-03/level=INFO/id_bucket=0/message_trunc=info+/timestamp_hour=2020-02-03-00/00005-104-9228c157-fa0d-4a0c-a7f0-0aa69acee184-00001.parquet, /tmp/junit2992201386166743058/logs-742071/data/date=2020-02-02/level=INFO/id_bucket=0/message_trunc=info+/timestamp_hour=2020-02-02-01/00002-101-0fd6786b-fae7-400b-a696-bcbed73dfcde-00001.parquet, /tmp/junit2992201386166743058/logs-742071/data/date=2020-02-04/level=DEBUG/id_bucket=0/message_trunc=debug/timestamp_hour=2020-02-04-01/00007-106-500c178d-079a-4afa-b0d3-4a396c4fb630-00001.parquet]
   
   org.apache.iceberg.spark.source.TestPartitionPruning24 > testPartitionPruningBucketingInteger[1] FAILED
       java.lang.AssertionError: Some of data files in partition range should be read. 
   
   Read files in query: [/tmp/junit6453382097752467142/logs-387757/metadata/v2.metadata.json, /tmp/junit6453382097752467142/logs-387757/metadata/snap-3808976267395158188-1-411c0913-6e93-43a5-a375-a7e6f7b0a5a8.avro, /tmp/junit6453382097752467142/logs-387757/metadata/version-hint.text, /tmp/junit6453382097752467142/logs-387757/metadata/411c0913-6e93-43a5-a375-a7e6f7b0a5a8-m0.avro] / 
   
   data files in partition range: [/tmp/junit6453382097752467142/logs-387757/data/date=2020-02-04/level=DEBUG/id_bucket=0/message_trunc=debug/timestamp_hour=2020-02-04-01/00007-217-b5ecf7bf-5207-4470-adf9-58b1571ff7b5-00001.parquet, /tmp/junit6453382097752467142/logs-387757/data/date=2020-02-03/level=INFO/id_bucket=0/message_trunc=info+/timestamp_hour=2020-02-03-00/00005-215-8accdc56-06d0-4fec-9a18-ad10d350ccc8-00001.parquet, /tmp/junit6453382097752467142/logs-387757/data/date=2020-02-02/level=INFO/id_bucket=0/message_trunc=info+/timestamp_hour=2020-02-02-01/00002-212-44a0d2fb-48d3-439a-8818-e800e98b5c7c-00001.parquet]
   
   org.apache.iceberg.spark.source.TestPartitionPruning24 > testPartitionPruningBucketingInteger[3] FAILED
       java.lang.AssertionError: Some of data files in partition range should be read. 
   
   Read files in query: [/tmp/junit5114414994557432305/logs-606638/metadata/version-hint.text, /tmp/junit5114414994557432305/logs-606638/metadata/snap-7702056315490708928-1-aa5dc06e-b173-4df3-9af0-e6f907ea3c37.avro, /tmp/junit5114414994557432305/logs-606638/metadata/v2.metadata.json, /tmp/junit5114414994557432305/logs-606638/metadata/aa5dc06e-b173-4df3-9af0-e6f907ea3c37-m0.avro] / 
   
   data files in partition range: [/tmp/junit5114414994557432305/logs-606638/data/date=2020-02-02/level=INFO/id_bucket=0/message_trunc=info+/timestamp_hour=2020-02-02-01/00002-434-39ad40a9-3b76-47ec-820b-5a305edabd79-00001.orc, /tmp/junit5114414994557432305/logs-606638/data/date=2020-02-03/level=INFO/id_bucket=0/message_trunc=info+/timestamp_hour=2020-02-03-00/00005-437-e7545e12-e5f8-40cd-98e7-177969365c5a-00001.orc, /tmp/junit5114414994557432305/logs-606638/data/date=2020-02-04/level=DEBUG/id_bucket=0/message_trunc=debug/timestamp_hour=2020-02-04-01/00007-439-d424d5d2-336f-484c-a499-6478f11f1602-00001.orc]
   
   org.apache.iceberg.spark.source.TestPartitionPruning24 > testPartitionPruningBucketingInteger[4] FAILED
       java.lang.AssertionError: Some of data files in partition range should be read. 
   
   Read files in query: [/tmp/junit8082590320046964751/logs-144508/metadata/v2.metadata.json, /tmp/junit8082590320046964751/logs-144508/metadata/361a3e56-58b7-4c4c-827b-cc83326d918a-m0.avro, /tmp/junit8082590320046964751/logs-144508/metadata/snap-1068076646929597670-1-361a3e56-58b7-4c4c-827b-cc83326d918a.avro, /tmp/junit8082590320046964751/logs-144508/metadata/version-hint.text] / 
   
   data files in partition range: [/tmp/junit8082590320046964751/logs-144508/data/date=2020-02-03/level=INFO/id_bucket=0/message_trunc=info+/timestamp_hour=2020-02-03-00/00005-548-4b531da9-09c9-4d16-a851-ed6b5cd35379-00001.orc, /tmp/junit8082590320046964751/logs-144508/data/date=2020-02-04/level=DEBUG/id_bucket=0/message_trunc=debug/timestamp_hour=2020-02-04-01/00007-550-6e838e99-33b0-43b7-b2af-41585e12b4ea-00001.orc, /tmp/junit8082590320046964751/logs-144508/data/date=2020-02-02/level=INFO/id_bucket=0/message_trunc=info+/timestamp_hour=2020-02-02-01/00002-545-8c2a7444-646f-45ea-b082-f4db470db1c3-00001.orc]
   ```
   
   That is not related to the timestamp issue - tracker doesn't see any "data" files being read, and query result looks to be (surprisingly) empty. (I modified the test a bit to check whether the query result is empty or not, and it failed on the check.) It's not reproducible in IDEA, very weird.
   
   Digging on this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#discussion_r494576585



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+  private static JavaSparkContext sparkContext = null;
+
+  private static Transform<Object, Integer> bucketTransform = Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = Transforms.hour(Types.TimestampType.withoutZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate();
+    TestPartitionPruning.sparkContext = new JavaSparkContext(spark.sparkContext());
+
+    String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme);
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set("spark.sql.session.timeZone", "UTC");
+    spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", getInstant("2020-02-02T00:00:00")),
+      LogMessage.info("2020-02-02", "info event 1", getInstant("2020-02-02T01:00:00")),
+      LogMessage.debug("2020-02-02", "debug event 2", getInstant("2020-02-02T02:00:00")),
+      LogMessage.info("2020-02-03", "info event 2", getInstant("2020-02-03T00:00:00")),
+      LogMessage.debug("2020-02-03", "debug event 3", getInstant("2020-02-03T01:00:00")),
+      LogMessage.info("2020-02-03", "info event 3", getInstant("2020-02-03T02:00:00")),
+      LogMessage.error("2020-02-03", "error event 1", getInstant("2020-02-03T03:00:00")),
+      LogMessage.debug("2020-02-04", "debug event 4", getInstant("2020-02-04T01:00:00")),
+      LogMessage.warn("2020-02-04", "warn event 1", getInstant("2020-02-04T02:00:00")),
+      LogMessage.debug("2020-02-04", "debug event 5", getInstant("2020-02-04T03:00:00"))
+  );
+
+  private static Instant getInstant(String timestampWithoutZone) {
+    Long epochMicros = (Long) Literal.of(timestampWithoutZone).to(Types.TimestampType.withoutZone()).value();
+    return Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(epochMicros));
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private PartitionSpec spec = PartitionSpec.builderFor(LOG_SCHEMA)
+      .identity("date")
+      .identity("level")
+      .bucket("id", 3)
+      .truncate("message", 5)
+      .hour("timestamp")
+      .build();
+
+  private Random random = new Random();
+
+  @Test
+  public void testPartitionPruningIdentityString() {
+    String filterCond = "date >= '2020-02-03' AND level = 'DEBUG'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String date = r.getString(0);
+      String level = r.getString(1);
+      return date.compareTo("2020-02-03") >= 0 && level.equals("DEBUG");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningBucketingInteger() {
+    final int[] ids = new int[]{
+        LOGS.get(3).getId(),
+        LOGS.get(7).getId()
+    };
+    String condForIds = Arrays.stream(ids).mapToObj(String::valueOf)
+        .collect(Collectors.joining(",", "(", ")"));
+    String filterCond = "id in " + condForIds;
+    Predicate<Row> partCondition = (Row r) -> {
+      int bucketId = r.getInt(2);
+      Set<Integer> buckets = Arrays.stream(ids).map(bucketTransform::apply)
+          .boxed().collect(Collectors.toSet());
+      return buckets.contains(bucketId);
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningTruncatedString() {
+    String filterCond = "message like 'info event%'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String truncatedMessage = r.getString(3);
+      return truncatedMessage.equals("info ");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningTruncatedStringComparingValueShorterThanPartitionValue() {
+    String filterCond = "message like 'inf%'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String truncatedMessage = r.getString(3);
+      return truncatedMessage.startsWith("inf");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningHourlyPartition() {
+    String filterCond;
+    if (spark.version().startsWith("2")) {
+      // Looks like from Spark 2 we need to compare timestamp with timestamp to push down the filter.
+      filterCond = "timestamp >= to_timestamp('2020-02-03T01:00:00')";
+    } else {
+      filterCond = "timestamp >= '2020-02-03T01:00:00'";
+    }
+    Predicate<Row> partCondition = (Row r) -> {
+      int hourValue = r.getInt(4);
+      Instant instant = getInstant("2020-02-03T01:00:00");
+      Integer hourValueToFilter = hourTransform.apply(TimeUnit.MILLISECONDS.toMicros(instant.toEpochMilli()));
+      return hourValue >= hourValueToFilter;
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  private void runTest(String filterCond, Predicate<Row> partCondition) {
+    File originTableLocation = createTempDir();
+    Assert.assertTrue("Temp folder should exist", originTableLocation.exists());
+
+    Table table = createTable(originTableLocation);
+    Dataset<Row> logs = createTestDataset();
+    saveTestDatasetToTable(logs, table);
+
+    List<Row> expected = logs
+        .select("id", "date", "level", "message", "timestamp")
+        .filter(filterCond)
+        .orderBy("id")
+        .collectAsList();
+    Assert.assertFalse("Expected rows should be not empty", expected.isEmpty());
+
+    // remove records which may be recorded during storing to table
+    CountOpenLocalFileSystem.resetRecordsInPathPrefix(originTableLocation.getAbsolutePath());
+
+    List<Row> actual = spark.read()
+        .format("iceberg")
+        .option("vectorization-enabled", String.valueOf(vectorized))
+        .load(table.location())
+        .select("id", "date", "level", "message", "timestamp")
+        .filter(filterCond)
+        .orderBy("id")
+        .collectAsList();
+    Assert.assertFalse("Actual rows should not be empty", actual.isEmpty());
+
+    Assert.assertEquals("Rows should match", expected, actual);
+
+    assertAccessOnDataFiles(originTableLocation, table, partCondition);
+  }
+
+  private File createTempDir() {
+    try {
+      int rand = random.nextInt(1000000);
+      return temp.newFolder(String.format("logs-%d", rand));

Review comment:
       Why does this create its own random number instead of just getting a new temp folder? I don't see much benefit to doing it this way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-698158371


   Never mind. Auto increasing ID mattered, and I fixed the test to not relying on static id values.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-697135213


   I tried to deal with `date` against hour partition, but no luck on making filter be pushed down (both Spark 2.4 and Spark 3). If someone has an idea to do that (or have an idea to test `date` against hour partition) it would be awesome.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-698148650






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-698558967


   @HeartSaVioR, it looks like the purpose of these tests is to validate that predicate pushdown filters partitions correctly. Have you looked at `TestFilteredScan`? That is very similar. Maybe we don't need both?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR edited a comment on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-698617667


   @rdblue 
   
   Yes, but the rationalization looks to be a bit different. As I wrote in the PR title, the new test suite is intended to run E2E test and see whether Iceberg and Spark works correctly as we expected. It only uses Iceberg API on creating table and partition spec (because Spark 2.4 doesn't support DDL, probably also a good chance to go through DDL for Spark 3?), and for the remaining part we don't leverage anything on Iceberg internal.
   
   Due to the difference of rationalization there seems to be some differences on coverage:
   
   * TestFilteredScan only applies single partition criteria per test, while the new test suite apply all partition criteria on the table. New test suite can additionally check the Iceberg's benefit, partition pruning isn't affected by the order of partition criteria.
   * TestFilteredScan checks the tasks after planning input partitions, which is good to count how many data files are being read (if I understand correctly, and also assume planInputPartitions works as expected), but it doesn't provide the actual data files being read. The new test suite goes with hack to track the files being read, which is a bit more complicated, but is able to check the query doesn't read the data files outside of partition range.
   * TestFilteredScan has separate implementations between Spark 2.4 and 3 (I'm not sure whether it should be, or just a miss) where the new test suite simply works between two versions (with slight adjust on filter condition for timestamp filter pushdown).
   
   So while there're common parts between twos, there're also different parts (and rationalizations) between twos. I'm not sure which side to consolidate - I'm not sure we want to ensure Spark-Iceberg E2E query works perfectly in the Iceberg project side - that's up to the community's decision.
   
   If we would like to simply ensure Iceberg works as expected, probably just need to tweak a bit on TestFilteredScan to have `truncate`, and probably change the table partition spec to statically have all partitions for all tests instead of one partition per test. (My understanding is that tests should still work after the change. Do I understand correctly?) Once the decision has taken in this direction I might move the new test in our side, as I still need to ensure E2E query works nicely.
   
   Otherwise, probably we just need to add date partition on the new test suite and have a new test on date partition, and probably have tests for between on timestamp partitions to cover the tests on partitioned table in TestFilteredScan. Not sure we'd like to remove TestFilteredScan after enriching the new test suite, as it still has tests on unpartitioned table, and good to check whether the problem comes from Iceberg vs Spark - E2E test would fail on problems from any side.
   
   Please let me know what you think. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-698558967


   @HeartSaVioR, it looks like the purpose of these tests is to validate that predicate pushdown filters partitions correctly. Have you looked at `TestFilteredScan`? That is very similar. Maybe we don't need both?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR edited a comment on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-698617667


   @rdblue 
   
   Yes, but the rationalization looks to be a bit different. As I wrote in the PR title, the new test suite is intended to run E2E test and see whether Iceberg and Spark works correctly as we expected. It only uses Iceberg API on creating table and partition spec (because Spark 2.4 doesn't support DDL, probably also a good chance to go through DDL for Spark 3?), and for the remaining part we don't leverage anything on Iceberg internal.
   
   Due to the difference of rationalization there seems to be some differences on coverage:
   
   * TestFilteredScan only applies single partition criteria per test, while the new test suite apply all partition criteria on the table. New test suite can additionally check the Iceberg's benefit, partition pruning isn't affected by the order of partition criteria.
   * TestFilteredScan checks the tasks after planning input partitions, which is good to count how many data files are being read (if I understand correctly, and also assume planInputPartitions works as expected), but it doesn't provide the actual data files being read. The new test suite goes with hack to track the files being read, which is a bit more complicated, but is able to check the query doesn't read the data files outside of partition range.
   * TestFilteredScan has separate implementations between Spark 2.4 and 3 (I'm not sure whether it should be, or just a miss) where the new test suite simply works between two versions (with slight adjust on filter condition for timestamp filter pushdown).
   
   So while there're common parts between twos, there're also different parts (and rationalizations) between twos. I'm not sure which side to consolidate - I'm not sure we want to ensure Spark-Iceberg E2E query works perfectly in the Iceberg project side - that's up to the community's decision.
   
   If we would like to simply ensure Iceberg works as expected, probably just need to tweak a bit on TestFilteredScan to have `truncate`, and probably change the table partition spec to statically have all partitions for all tests instead of one partition per test. (My understanding is that tests should still work after the change. Do I understand correctly?) Once the decision has taken in this direction I might move the new test in our side, as I still need to ensure E2E query works nicely.
   
   Otherwise, probably we just need to add date partition on the new test suite and have a new test on date partition, and probably have tests for between on timestamp partitions to cover the tests on partitioned table in TestFilteredScan. Not sure we'd like to remove TestFilteredScan after enriching the new test suite, as it still has tests on unpartitioned table, and good to check whether the problem comes from Iceberg vs Spark - E2E test would fail on problems from any side.
   
   Please let me know what you think. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on a change in pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#discussion_r494049778



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+
+  private static Transform<Object, Integer> bucketTransform = Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = Transforms.hour(Types.TimestampType.withZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate();
+    String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme);
+
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        (Long) org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", Timestamp.valueOf("2020-02-02 00:00:00")),

Review comment:
       Looks like at least in Spark 2.4, having Java 8 Instant as field directly won't work via Java type inference on Java beans.
   
   So if I understand correctly, we should either pass `long` or `Timestamp` to Spark on timestamp column, and TestSparkDataFile looks to leverage `long`. Would it be OK to change this to store long (using your suggestion to get microseconds, and probably convert to milliseconds due to Spark 2.4), and pass the long value to Spark?

##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+
+  private static Transform<Object, Integer> bucketTransform = Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = Transforms.hour(Types.TimestampType.withZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate();
+    String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme);
+
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        (Long) org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", Timestamp.valueOf("2020-02-02 00:00:00")),

Review comment:
       Just dealt with 7429dc0 - not sure I followed your suggestion properly.

##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+  private static JavaSparkContext sparkContext = null;
+
+  private static Transform<Object, Integer> bucketTransform = Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = Transforms.hour(Types.TimestampType.withoutZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate();
+    TestPartitionPruning.sparkContext = new JavaSparkContext(spark.sparkContext());
+
+    String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme);
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set("spark.sql.session.timeZone", "UTC");
+    spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", getInstant("2020-02-02T00:00:00")),
+      LogMessage.info("2020-02-02", "info event 1", getInstant("2020-02-02T01:00:00")),
+      LogMessage.debug("2020-02-02", "debug event 2", getInstant("2020-02-02T02:00:00")),
+      LogMessage.info("2020-02-03", "info event 2", getInstant("2020-02-03T00:00:00")),
+      LogMessage.debug("2020-02-03", "debug event 3", getInstant("2020-02-03T01:00:00")),
+      LogMessage.info("2020-02-03", "info event 3", getInstant("2020-02-03T02:00:00")),
+      LogMessage.error("2020-02-03", "error event 1", getInstant("2020-02-03T03:00:00")),
+      LogMessage.debug("2020-02-04", "debug event 4", getInstant("2020-02-04T01:00:00")),
+      LogMessage.warn("2020-02-04", "warn event 1", getInstant("2020-02-04T02:00:00")),
+      LogMessage.debug("2020-02-04", "debug event 5", getInstant("2020-02-04T03:00:00"))
+  );
+
+  private static Instant getInstant(String timestampWithoutZone) {
+    Long epochMicros = (Long) Literal.of(timestampWithoutZone).to(Types.TimestampType.withoutZone()).value();
+    return Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(epochMicros));
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private PartitionSpec spec = PartitionSpec.builderFor(LOG_SCHEMA)
+      .identity("date")
+      .identity("level")
+      .bucket("id", 3)
+      .truncate("message", 5)
+      .hour("timestamp")
+      .build();
+
+  private Random random = new Random();
+
+  @Test
+  public void testPartitionPruningIdentityString() {
+    String filterCond = "date >= '2020-02-03' AND level = 'DEBUG'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String date = r.getString(0);
+      String level = r.getString(1);
+      return date.compareTo("2020-02-03") >= 0 && level.equals("DEBUG");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningBucketingInteger() {
+    final int[] ids = new int[]{
+        LOGS.get(3).getId(),
+        LOGS.get(7).getId()
+    };
+    String condForIds = Arrays.stream(ids).mapToObj(String::valueOf)
+        .collect(Collectors.joining(",", "(", ")"));
+    String filterCond = "id in " + condForIds;
+    Predicate<Row> partCondition = (Row r) -> {
+      int bucketId = r.getInt(2);
+      Set<Integer> buckets = Arrays.stream(ids).map(bucketTransform::apply)
+          .boxed().collect(Collectors.toSet());
+      return buckets.contains(bucketId);
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningTruncatedString() {
+    String filterCond = "message like 'info event%'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String truncatedMessage = r.getString(3);
+      return truncatedMessage.equals("info ");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningTruncatedStringComparingValueShorterThanPartitionValue() {
+    String filterCond = "message like 'inf%'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String truncatedMessage = r.getString(3);
+      return truncatedMessage.startsWith("inf");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningHourlyPartition() {
+    String filterCond;
+    if (spark.version().startsWith("2")) {
+      // Looks like from Spark 2 we need to compare timestamp with timestamp to push down the filter.
+      filterCond = "timestamp >= to_timestamp('2020-02-03T01:00:00')";
+    } else {
+      filterCond = "timestamp >= '2020-02-03T01:00:00'";
+    }
+    Predicate<Row> partCondition = (Row r) -> {
+      int hourValue = r.getInt(4);
+      Instant instant = getInstant("2020-02-03T01:00:00");
+      Integer hourValueToFilter = hourTransform.apply(TimeUnit.MILLISECONDS.toMicros(instant.toEpochMilli()));
+      return hourValue >= hourValueToFilter;
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  private void runTest(String filterCond, Predicate<Row> partCondition) {
+    File originTableLocation = createTempDir();
+    Assert.assertTrue("Temp folder should exist", originTableLocation.exists());
+
+    Table table = createTable(originTableLocation);
+    Dataset<Row> logs = createTestDataset();
+    saveTestDatasetToTable(logs, table);
+
+    List<Row> expected = logs
+        .select("id", "date", "level", "message", "timestamp")
+        .filter(filterCond)
+        .orderBy("id")
+        .collectAsList();
+    Assert.assertFalse("Expected rows should be not empty", expected.isEmpty());
+
+    // remove records which may be recorded during storing to table
+    CountOpenLocalFileSystem.resetRecordsInPathPrefix(originTableLocation.getAbsolutePath());
+
+    List<Row> actual = spark.read()
+        .format("iceberg")
+        .option("vectorization-enabled", String.valueOf(vectorized))
+        .load(table.location())
+        .select("id", "date", "level", "message", "timestamp")
+        .filter(filterCond)
+        .orderBy("id")
+        .collectAsList();
+    Assert.assertFalse("Actual rows should not be empty", actual.isEmpty());
+
+    Assert.assertEquals("Rows should match", expected, actual);
+
+    assertAccessOnDataFiles(originTableLocation, table, partCondition);
+  }
+
+  private File createTempDir() {
+    try {
+      int rand = random.nextInt(1000000);
+      return temp.newFolder(String.format("logs-%d", rand));

Review comment:
       I didn't realize `temp.newFolder()` would just work for creating unique dir. My bad.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-704600505


   Thanks for reviewing and merging! I'll think about the approach of maintaining these tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#discussion_r493983892



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+
+  private static Transform<Object, Integer> bucketTransform = Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = Transforms.hour(Types.TimestampType.withZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate();
+    String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme);
+
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        (Long) org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", Timestamp.valueOf("2020-02-02 00:00:00")),

Review comment:
       I don't think it is a good idea for tests to create `Timestamp` or `Date` objects because those representations are tied to a time zone in the JVM implementation. If you have to use them, then pass an instant in milliseconds or microseconds that is produced by Iceberg's literals: `Literal.of("2020-02-02 01:00:00").to(TimestampType.withoutZone()).value()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on a change in pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#discussion_r494049778



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+
+  private static Transform<Object, Integer> bucketTransform = Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = Transforms.hour(Types.TimestampType.withZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate();
+    String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme);
+
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        (Long) org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", Timestamp.valueOf("2020-02-02 00:00:00")),

Review comment:
       Looks like at least in Spark 2.4, having Java 8 Instant as field directly won't work via Java type inference on Java beans.
   
   So if I understand correctly, we should either pass `long` or `Timestamp` to Spark on timestamp column, and TestSparkDataFile looks to leverage `long`. Would it be OK to change this to store long (using your suggestion to get microseconds, and probably convert to milliseconds due to Spark 2.4), and pass the long value to Spark?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#issuecomment-698617667


   @rdblue 
   
   Yes, but the rationalization looks to be a bit different. As I wrote in the PR title, the new test suite is intended to run E2E test and see whether Iceberg and Spark works correctly as we expected. It only uses Iceberg API on creating table and partition spec (because Spark 2.4 doesn't support DDL, probably also a good chance to do it for Spark 3?), and for the remaining part we don't leverage anything on Iceberg internal.
   
   Due to the difference of rationalization there seems to be some differences on coverage:
   
   * TestFilteredScan only applies single partition criteria per test, while the new test suite apply all partition criteria on the table. New test suite can additionally check the Iceberg's benefit, partition pruning isn't affected by the order of partition criteria.
   * TestFilteredScan checks the tasks after planning input partitions, which is good to count how many data files are being read (if I understand correctly, and also assume planInputPartitions works as expected), but it doesn't provide the actual data files being read. The new test suite goes with hack to track the files being read, which is a bit more complicated, but is able to check the query doesn't read the data files outside of partition range.
   * TestFilteredScan has separate implementations between Spark 2.4 and 3 (I'm not sure whether it should be, or just a miss) where the new test suite simply works between two versions (with slight adjust on filter condition for timestamp filter pushdown).
   
   So while there're common parts between twos, there're also different parts (and rationalizations) between twos. I'm not sure which side to consolidate - I'm not sure we want to ensure Spark-Iceberg E2E query works perfectly in the Iceberg project side - that's up to the community's decision.
   
   If we would like to simply ensure Iceberg works as expected, probably just need to tweak a bit on TestFilteredScan to have `truncate`, and probably change the table partition spec to statically have all partitions for all tests instead of one partition per test. (My understanding is that tests should still work after the change. Do I understand correctly?) Once the decision has taken in this direction I might move the new test in our side, as I still need to ensure E2E query works nicely.
   
   Otherwise, probably we just need to add date partition on the new test suite and have a new test on date partition, and probably have tests for between on timestamp partitions to cover the tests on partitioned table in TestFilteredScan. Not sure we'd like to remove TestFilteredScan after enriching the new test suite, as it still has tests on unpartitioned table, and good to check whether the problem comes from Iceberg vs Spark - E2E test would fail on problems from any side.
   
   Please let me know what you think. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on a change in pull request #1487: Spark: add E2E test on partition pruning for filter pushdown

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#discussion_r494616890



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+  private static JavaSparkContext sparkContext = null;
+
+  private static Transform<Object, Integer> bucketTransform = Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = Transforms.hour(Types.TimestampType.withoutZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate();
+    TestPartitionPruning.sparkContext = new JavaSparkContext(spark.sparkContext());
+
+    String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme);
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set("spark.sql.session.timeZone", "UTC");
+    spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", getInstant("2020-02-02T00:00:00")),
+      LogMessage.info("2020-02-02", "info event 1", getInstant("2020-02-02T01:00:00")),
+      LogMessage.debug("2020-02-02", "debug event 2", getInstant("2020-02-02T02:00:00")),
+      LogMessage.info("2020-02-03", "info event 2", getInstant("2020-02-03T00:00:00")),
+      LogMessage.debug("2020-02-03", "debug event 3", getInstant("2020-02-03T01:00:00")),
+      LogMessage.info("2020-02-03", "info event 3", getInstant("2020-02-03T02:00:00")),
+      LogMessage.error("2020-02-03", "error event 1", getInstant("2020-02-03T03:00:00")),
+      LogMessage.debug("2020-02-04", "debug event 4", getInstant("2020-02-04T01:00:00")),
+      LogMessage.warn("2020-02-04", "warn event 1", getInstant("2020-02-04T02:00:00")),
+      LogMessage.debug("2020-02-04", "debug event 5", getInstant("2020-02-04T03:00:00"))
+  );
+
+  private static Instant getInstant(String timestampWithoutZone) {
+    Long epochMicros = (Long) Literal.of(timestampWithoutZone).to(Types.TimestampType.withoutZone()).value();
+    return Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(epochMicros));
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private PartitionSpec spec = PartitionSpec.builderFor(LOG_SCHEMA)
+      .identity("date")
+      .identity("level")
+      .bucket("id", 3)
+      .truncate("message", 5)
+      .hour("timestamp")
+      .build();
+
+  private Random random = new Random();
+
+  @Test
+  public void testPartitionPruningIdentityString() {
+    String filterCond = "date >= '2020-02-03' AND level = 'DEBUG'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String date = r.getString(0);
+      String level = r.getString(1);
+      return date.compareTo("2020-02-03") >= 0 && level.equals("DEBUG");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningBucketingInteger() {
+    final int[] ids = new int[]{
+        LOGS.get(3).getId(),
+        LOGS.get(7).getId()
+    };
+    String condForIds = Arrays.stream(ids).mapToObj(String::valueOf)
+        .collect(Collectors.joining(",", "(", ")"));
+    String filterCond = "id in " + condForIds;
+    Predicate<Row> partCondition = (Row r) -> {
+      int bucketId = r.getInt(2);
+      Set<Integer> buckets = Arrays.stream(ids).map(bucketTransform::apply)
+          .boxed().collect(Collectors.toSet());
+      return buckets.contains(bucketId);
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningTruncatedString() {
+    String filterCond = "message like 'info event%'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String truncatedMessage = r.getString(3);
+      return truncatedMessage.equals("info ");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningTruncatedStringComparingValueShorterThanPartitionValue() {
+    String filterCond = "message like 'inf%'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String truncatedMessage = r.getString(3);
+      return truncatedMessage.startsWith("inf");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningHourlyPartition() {
+    String filterCond;
+    if (spark.version().startsWith("2")) {
+      // Looks like from Spark 2 we need to compare timestamp with timestamp to push down the filter.
+      filterCond = "timestamp >= to_timestamp('2020-02-03T01:00:00')";
+    } else {
+      filterCond = "timestamp >= '2020-02-03T01:00:00'";
+    }
+    Predicate<Row> partCondition = (Row r) -> {
+      int hourValue = r.getInt(4);
+      Instant instant = getInstant("2020-02-03T01:00:00");
+      Integer hourValueToFilter = hourTransform.apply(TimeUnit.MILLISECONDS.toMicros(instant.toEpochMilli()));
+      return hourValue >= hourValueToFilter;
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  private void runTest(String filterCond, Predicate<Row> partCondition) {
+    File originTableLocation = createTempDir();
+    Assert.assertTrue("Temp folder should exist", originTableLocation.exists());
+
+    Table table = createTable(originTableLocation);
+    Dataset<Row> logs = createTestDataset();
+    saveTestDatasetToTable(logs, table);
+
+    List<Row> expected = logs
+        .select("id", "date", "level", "message", "timestamp")
+        .filter(filterCond)
+        .orderBy("id")
+        .collectAsList();
+    Assert.assertFalse("Expected rows should be not empty", expected.isEmpty());
+
+    // remove records which may be recorded during storing to table
+    CountOpenLocalFileSystem.resetRecordsInPathPrefix(originTableLocation.getAbsolutePath());
+
+    List<Row> actual = spark.read()
+        .format("iceberg")
+        .option("vectorization-enabled", String.valueOf(vectorized))
+        .load(table.location())
+        .select("id", "date", "level", "message", "timestamp")
+        .filter(filterCond)
+        .orderBy("id")
+        .collectAsList();
+    Assert.assertFalse("Actual rows should not be empty", actual.isEmpty());
+
+    Assert.assertEquals("Rows should match", expected, actual);
+
+    assertAccessOnDataFiles(originTableLocation, table, partCondition);
+  }
+
+  private File createTempDir() {
+    try {
+      int rand = random.nextInt(1000000);
+      return temp.newFolder(String.format("logs-%d", rand));

Review comment:
       I didn't realize `temp.newFolder()` would just work for creating unique dir. My bad.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org