You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bjørnar Jensen (JIRA)" <ji...@apache.org> on 2018/08/17 13:09:00 UTC

[jira] [Updated] (SPARK-25145) Buffer size too small on spark.sql query with filterPushdown predicate=True

     [ https://issues.apache.org/jira/browse/SPARK-25145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bjørnar Jensen updated SPARK-25145:
-----------------------------------
    Attachment: create_bug.py

> Buffer size too small on spark.sql query with filterPushdown predicate=True
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-25145
>                 URL: https://issues.apache.org/jira/browse/SPARK-25145
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.3
>         Environment:  
> {noformat}
> # Generated by Apache Ambari. Wed Mar 21 15:37:53 2018
> spark.driver.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
> spark.eventLog.dir hdfs:///spark2-history/
> spark.eventLog.enabled true
> spark.executor.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
> spark.hadoop.hive.vectorized.execution.enabled true
> spark.history.fs.logDirectory hdfs:///spark2-history/
> spark.history.kerberos.keytab none
> spark.history.kerberos.principal none
> spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
> spark.history.retainedApplications 50
> spark.history.ui.port 18081
> spark.io.compression.lz4.blockSize 128k
> spark.locality.wait 2s
> spark.network.timeout 600s
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.shuffle.consolidateFiles true
> spark.shuffle.io.numConnectionsPerPeer 10
> spark.sql.autoBroadcastJoinTreshold 26214400
> spark.sql.shuffle.partitions 300
> spark.sql.statistics.fallBack.toHdfs true
> spark.sql.tungsten.enabled true
> spark.driver.memoryOverhead 2048
> spark.executor.memoryOverhead 4096
> spark.yarn.historyServer.address service-10-4.local:18081
> spark.yarn.queue default
> spark.sql.warehouse.dir hdfs:///apps/hive/warehouse
> spark.sql.execution.arrow.enabled true
> spark.sql.hive.convertMetastoreOrc true
> spark.sql.orc.char.enabled true
> spark.sql.orc.enabled true
> spark.sql.orc.filterPushdown true
> spark.sql.orc.impl native
> spark.sql.orc.enableVectorizedReader true
> spark.yarn.jars hdfs:///apps/spark-jars/231/jars/*
> {noformat}
>  
>            Reporter: Bjørnar Jensen
>            Priority: Minor
>         Attachments: create_bug.py
>
>
> java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 2205991
>  # 
> {code:java}
> Python
> import numpy as np
> import pandas as pd
> # Create a spark dataframe
> df = pd.DataFrame({'a': np.arange(10), 'b': np.arange(10) / 2.0})
> sdf = spark.createDataFrame(df)
> print('Created spark dataframe:')
> sdf.show()
> # Save table as orc
> sdf.write.saveAsTable(format='orc', mode='overwrite', name='bjornj.spark_buffer_size_too_small_on_filter_pushdown', compression='zlib')
> # Ensure filterPushdown is enabled
> spark.conf.set('spark.sql.orc.filterPushdown', True)
> # Fetch entire table (works)
> print('Read entire table with "filterPushdown"=True')
> spark.sql('SELECT * FROM bjornj.spark_buffer_size_too_small_on_filter_pushdown').show()
> # Ensure filterPushdown is disabled
> spark.conf.set('spark.sql.orc.filterPushdown', False)
> # Query without filterPushdown (works)
> print('Read a selection from table with "filterPushdown"=False')
> spark.sql('SELECT * FROM bjornj.spark_buffer_size_too_small_on_filter_pushdown WHERE a > 5').show()
> # Ensure filterPushdown is enabled
> spark.conf.set('spark.sql.orc.filterPushdown', True)
> # Query with filterPushDown (fails)
> print('Read a selection from table with "filterPushdown"=True')
> spark.sql('SELECT * FROM bjornj.spark_buffer_size_too_small_on_filter_pushdown WHERE a > 5').show()
> {code}
> {noformat}
> ~/bug_report $ pyspark
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
> 2018-08-17 13:44:31,365 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
> Jupyter console 5.1.0
> Python 3.6.3 |Intel Corporation| (default, May 4 2018, 04:22:28)
> Type 'copyright', 'credits' or 'license' for more information
> IPython 6.3.1 -- An enhanced Interactive Python. Type '?' for help.
> In [1]: %run -i create_bug.py
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT
> /_/
> Using Python version 3.6.3 (default, May 4 2018 04:22:28)
> SparkSession available as 'spark'.
> Created spark dataframe:
> +---+---+
> | a| b|
> +---+---+
> | 0|0.0|
> | 1|0.5|
> | 2|1.0|
> | 3|1.5|
> | 4|2.0|
> | 5|2.5|
> | 6|3.0|
> | 7|3.5|
> | 8|4.0|
> | 9|4.5|
> +---+---+
> Read entire table with "filterPushdown"=True
> +---+---+
> | a| b|
> +---+---+
> | 1|0.5|
> | 2|1.0|
> | 3|1.5|
> | 5|2.5|
> | 6|3.0|
> | 7|3.5|
> | 8|4.0|
> | 9|4.5|
> | 4|2.0|
> | 0|0.0|
> +---+---+
> Read a selection from table with "filterPushdown"=False
> +---+---+
> | a| b|
> +---+---+
> | 6|3.0|
> | 7|3.5|
> | 8|4.0|
> | 9|4.5|
> +---+---+
> Read a selection from table with "filterPushdown"=True
> 2018-08-17 13:44:48,685 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 40)
> java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 2205991
> at org.apache.orc.impl.InStream$CompressedStream.readHeader(InStream.java:212)
> at org.apache.orc.impl.InStream$CompressedStream.ensureUncompressed(InStream.java:263)
> at org.apache.orc.impl.InStream$CompressedStream.read(InStream.java:250)
> at java.io.InputStream.read(InputStream.java:101)
> at com.google.protobuf25.CodedInputStream.refillBuffer(CodedInputStream.java:737)
> at com.google.protobuf25.CodedInputStream.isAtEnd(CodedInputStream.java:701)
> at com.google.protobuf25.CodedInputStream.readTag(CodedInputStream.java:99)
> at org.apache.orc.OrcProto$RowIndex.<init>(OrcProto.java:7609)
> at org.apache.orc.OrcProto$RowIndex.<init>(OrcProto.java:7573)
> at org.apache.orc.OrcProto$RowIndex$1.parsePartialFrom(OrcProto.java:7662)
> at org.apache.orc.OrcProto$RowIndex$1.parsePartialFrom(OrcProto.java:7657)
> at com.google.protobuf25.AbstractParser.parseFrom(AbstractParser.java:89)
> at com.google.protobuf25.AbstractParser.parseFrom(AbstractParser.java:95)
> at com.google.protobuf25.AbstractParser.parseFrom(AbstractParser.java:49)
> at org.apache.orc.OrcProto$RowIndex.parseFrom(OrcProto.java:7794)
> at org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readRowIndex(RecordReaderUtils.java:231)
> at org.apache.orc.impl.RecordReaderImpl.readRowIndex(RecordReaderImpl.java:1281)
> at org.apache.orc.impl.RecordReaderImpl.readRowIndex(RecordReaderImpl.java:1264)
> at org.apache.orc.impl.RecordReaderImpl.pickRowGroups(RecordReaderImpl.java:918)
> at org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:949)
> at org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1116)
> at org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1151)
> at org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:271)
> at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:627)
> at org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.initialize(OrcColumnarBatchReader.java:138)
> at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2.apply(OrcFileFormat.scala:196)
> at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2.apply(OrcFileFormat.scala:160)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2018-08-17 13:44:48,708 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 40, localhost, executor driver): java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 2205991
> {noformat}
> Meta data for test table (orc-tools/orc-metadata):
> {noformat}
> { "name": "/apps/hive/warehouse/spark_buffer_size_too_small_on_filter_pushdown/part-00000-358856bc-f771-43d1-bd83-024a288df787-c000.zlib.orc",
> "type": "struct<a:bigint,b:double>",
> "rows": 1,
> "stripe count": 1,
> "format": "0.12", "writer version": "ORC-135",
> "compression": "zlib", "compression block": 262144,
> "file length": 269,
> "content": 121, "stripe stats": 42, "footer": 82, "postscript": 23,
> "row index stride": 10000,
> "user metadata": {
> },
> "stripes": [
> { "stripe": 0, "rows": 1,
> "offset": 3, "length": 118,
> "index": 63, "data": 14, "footer": 41
> }
> ]
> }
> {noformat}
> Workaround: set spark.sql.orc.filterPushdown = false
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org