You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/05 20:50:51 UTC

[GitHub] [iceberg] thesquelched opened a new issue #2553: Spark filters do not work on int96 timestamp columns

thesquelched opened a new issue #2553:
URL: https://github.com/apache/iceberg/issues/2553


   Spark filter operations don't work on int96 timestamp columns, raising a `ClassCastException`. I've reproduced this live with both spark 2 and 3, and it can be reproduced in unit tests with the following diff off of 9edb1fabc0a4ad4c3f10625c4a15918bf5790f8b:
   
   ```diff
   diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java b/spark2/src/test/java/org/apache/iceberg/spark/
   index 72fea444..5d1902b9 100644
   --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
   +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
   @@ -22,6 +22,7 @@ package org.apache.iceberg.spark.source;
    import java.io.File;
    import java.io.IOException;
    import java.nio.ByteBuffer;
   +import java.sql.Timestamp;
    import java.util.List;
    import java.util.Map;
    import org.apache.hadoop.conf.Configuration;
   @@ -468,6 +469,17 @@ public class TestSparkTableUtilWithInMemoryCatalog {
              .collectAsList();
          Assert.assertEquals("Rows must match", expected, actual);
   
   +      // validate we can filter on int96 timestamp column
   +      List<Row> expectedTs = spark.table("parquet_table")
   +              .filter(functions.col("tmp_col").equalTo(Timestamp.valueOf("2010-03-20 10:40:30.1234")))
   +              .select("id", "tmp_col")
   +              .collectAsList();
   +      List<Row> actualTs = spark.read().format("iceberg").load(tableLocation)
   +              .filter(functions.col("tmp_col").equalTo(Timestamp.valueOf("2010-03-20 10:40:30.1234")))
   +              .select("id", "tmp_col")
   +              .collectAsList();
   +      Assert.assertEquals("Rows must match", expectedTs, actualTs);
   +
          // validate we did not persist metrics for INT96
          Dataset<Row> fileDF = spark.read().format("iceberg").load(tableLocation + "#files");
   ```
   
   From the logs...
   
   ```
   [Executor task launch worker for task 4] ERROR org.apache.iceberg.spark.source.BaseDataReader - Error reading file: file:/var/folders/ny/gq0jmnjn2y76y52m5mj_z0343kg170/T/junit8823410885203046367/parquet_table/part-00000-77f7712c-ce0c-42f1-be53-cef1f4621267-c000.snappy.parquet
   java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.parquet.io.api.Binary
   	at org.apache.parquet.io.api.Binary$ByteArrayBackedBinary.compareTo(Binary.java:263)
   	at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52)
   	at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
   	at org.apache.iceberg.types.Comparators$NullSafeChainedComparator.compare(Comparators.java:245)
   	at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:352)
   	at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:79)
   	at org.apache.iceberg.expressions.ExpressionVisitors$BoundExpressionVisitor.predicate(ExpressionVisitors.java:139)
   	at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:346)
   	at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:361)
   	at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eval(ParquetMetricsRowGroupFilter.java:103)
   	at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.access$100(ParquetMetricsRowGroupFilter.java:79)
   	at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter.shouldRead(ParquetMetricsRowGroupFilter.java:73)
   	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:108)
   	at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66)
   	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77)
   	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:38)
   	at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:35)
   	at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:73)
   	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:91)
   	at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:93)
   	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
   	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
   	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
   	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
   	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
   	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:123)
   	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] thesquelched commented on issue #2553: Spark filters do not work on int96 timestamp columns

Posted by GitBox <gi...@apache.org>.
thesquelched commented on issue #2553:
URL: https://github.com/apache/iceberg/issues/2553#issuecomment-833752923


   > Would you like to open a PR with the fix, @thesquelched?
   
   I'm hoping to have one up today or tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] thesquelched commented on issue #2553: Spark filters do not work on int96 timestamp columns

Posted by GitBox <gi...@apache.org>.
thesquelched commented on issue #2553:
URL: https://github.com/apache/iceberg/issues/2553#issuecomment-832999203


   Note that this happens regardless of whether metrics for this column are set to `none` or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on issue #2553: Spark filters do not work on int96 timestamp columns

Posted by GitBox <gi...@apache.org>.
rdblue commented on issue #2553:
URL: https://github.com/apache/iceberg/issues/2553#issuecomment-833724970


   The metrics setting is for Iceberg metrics and this is happening when filtering row groups in Parquet. That's why the metrics setting doesn't help. We just need to update Iceberg's [ParquetMetricsRowGroupFilter](https://github.com/apache/iceberg/blob/master/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java#L97) to ignore INT96 stats. I thought that Parquet would not return them in the first place, which is why I think we overlooked this. Would you like to open a PR with the fix, @thesquelched?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org