You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Brad Willard (JIRA)" <ji...@apache.org> on 2015/04/17 17:44:59 UTC

[jira] [Created] (SPARK-6982) Data Frame and Spark SQL should allow filtering on key portion of incremental parquet files

Brad Willard created SPARK-6982:
-----------------------------------

             Summary: Data Frame and Spark SQL should allow filtering on key portion of incremental parquet files
                 Key: SPARK-6982
                 URL: https://issues.apache.org/jira/browse/SPARK-6982
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core, SQL
    Affects Versions: 1.3.0
            Reporter: Brad Willard


I'm working with a 2.4 billion dataset. I just converted it to use the incremental schema features of parquet added in 1.3.0 where you save incremental files with key=X.

I'm currently saving files where the key is a timestamp like key=2015-01-01. If I run a query, the key comes back as an attributes in the row. It would be amazing to be able to do comparisons and filters on the key attribute to do efficient queries between points in time and just skip the partitions of data outside of a key range.

df = sql_context.parquetFile('/super_large_dataset_over time')
df.filter(df.key >= '2015-3-24').filter(df.key < '2015-04-01').count()

That job could then skip large portions of the dataset very quickly even if the entire parquet file contains years of data.

Currently that will throw an error because key is not part of the parquet schema even though it's returned in the rows.

However it does strangely work with the in clause which is my current work around
df.where('key in ("2015-04-02", "2015-04-03")').count()


Job aborted due to stage failure: Task 122 in stage 6.0 failed 100 times, most recent failure: Lost task 122.99 in stage 6.0 (TID 39498, ip-XXXXXXXXXXXX): java.lang.IllegalArgumentException: Column [key] was not found in schema!
	at parquet.Preconditions.checkArgument(Preconditions.java:47)
	at parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:172)
	at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:160)
	at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:142)
	at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:76)
	at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:41)
	at parquet.filter2.predicate.Operators$Eq.accept(Operators.java:162)
	at parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:46)
	at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:41)
	at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:22)
	at parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:108)
	at parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:28)
	at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:158)
	at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.NewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD.compute(NewHadoopRDD.scala:244)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:64)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org