You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bjørnar Jensen (JIRA)" <ji...@apache.org> on 2018/11/30 10:29:00 UTC

[jira] [Reopened] (SPARK-25145) Buffer size too small on spark.sql query with filterPushdown predicate=True

     [ https://issues.apache.org/jira/browse/SPARK-25145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bjørnar Jensen reopened SPARK-25145:
------------------------------------

New information that I believe will make it reproducible:

The "zerocopy" option is what triggers this crash in our system. With "zerocopy" set to "false" the reads produces results. spark.sql().explain() show that it pushes the filters down.

Turning on "zerocopy" causes the query with filter pushdown to crash with buffer size too small. Furthermore, orc.compress.size and orc.buffer.size.enforce does not seem to have any effect/stick when tried.
{code:java}
pyspark --conf 'spark.hadoop.hive.exec.orc.zerocopy=true" => Crashes

pyspark --conf 'spark.hadoop.hive.exec.orc.zerocopy=false" => Succeeds
{code}
 

I do not know why, though.

 

Best regards,

Bjørnar.

> Buffer size too small on spark.sql query with filterPushdown predicate=True
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-25145
>                 URL: https://issues.apache.org/jira/browse/SPARK-25145
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.3
>         Environment:  
> {noformat}
> # Generated by Apache Ambari. Wed Mar 21 15:37:53 2018
> spark.driver.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
> spark.eventLog.dir hdfs:///spark2-history/
> spark.eventLog.enabled true
> spark.executor.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
> spark.hadoop.hive.vectorized.execution.enabled true
> spark.history.fs.logDirectory hdfs:///spark2-history/
> spark.history.kerberos.keytab none
> spark.history.kerberos.principal none
> spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
> spark.history.retainedApplications 50
> spark.history.ui.port 18081
> spark.io.compression.lz4.blockSize 128k
> spark.locality.wait 2s
> spark.network.timeout 600s
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.shuffle.consolidateFiles true
> spark.shuffle.io.numConnectionsPerPeer 10
> spark.sql.autoBroadcastJoinTreshold 26214400
> spark.sql.shuffle.partitions 300
> spark.sql.statistics.fallBack.toHdfs true
> spark.sql.tungsten.enabled true
> spark.driver.memoryOverhead 2048
> spark.executor.memoryOverhead 4096
> spark.yarn.historyServer.address service-10-4.local:18081
> spark.yarn.queue default
> spark.sql.warehouse.dir hdfs:///apps/hive/warehouse
> spark.sql.execution.arrow.enabled true
> spark.sql.hive.convertMetastoreOrc true
> spark.sql.orc.char.enabled true
> spark.sql.orc.enabled true
> spark.sql.orc.filterPushdown true
> spark.sql.orc.impl native
> spark.sql.orc.enableVectorizedReader true
> spark.yarn.jars hdfs:///apps/spark-jars/231/jars/*
> {noformat}
>  
>            Reporter: Bjørnar Jensen
>            Priority: Minor
>         Attachments: create_bug.py, report.txt
>
>
> java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 2205991
>  # 
> {code:java}
> Python
> import numpy as np
> import pandas as pd
> # Create a spark dataframe
> df = pd.DataFrame({'a': np.arange(10), 'b': np.arange(10) / 2.0})
> sdf = spark.createDataFrame(df)
> print('Created spark dataframe:')
> sdf.show()
> # Save table as orc
> sdf.write.saveAsTable(format='orc', mode='overwrite', name='bjornj.spark_buffer_size_too_small_on_filter_pushdown', compression='zlib')
> # Ensure filterPushdown is enabled
> spark.conf.set('spark.sql.orc.filterPushdown', True)
> # Fetch entire table (works)
> print('Read entire table with "filterPushdown"=True')
> spark.sql('SELECT * FROM bjornj.spark_buffer_size_too_small_on_filter_pushdown').show()
> # Ensure filterPushdown is disabled
> spark.conf.set('spark.sql.orc.filterPushdown', False)
> # Query without filterPushdown (works)
> print('Read a selection from table with "filterPushdown"=False')
> spark.sql('SELECT * FROM bjornj.spark_buffer_size_too_small_on_filter_pushdown WHERE a > 5').show()
> # Ensure filterPushdown is enabled
> spark.conf.set('spark.sql.orc.filterPushdown', True)
> # Query with filterPushDown (fails)
> print('Read a selection from table with "filterPushdown"=True')
> spark.sql('SELECT * FROM bjornj.spark_buffer_size_too_small_on_filter_pushdown WHERE a > 5').show()
> {code}
> {noformat}
> ~/bug_report $ pyspark
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
> 2018-08-17 13:44:31,365 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
> Jupyter console 5.1.0
> Python 3.6.3 |Intel Corporation| (default, May 4 2018, 04:22:28)
> Type 'copyright', 'credits' or 'license' for more information
> IPython 6.3.1 -- An enhanced Interactive Python. Type '?' for help.
> In [1]: %run -i create_bug.py
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT
> /_/
> Using Python version 3.6.3 (default, May 4 2018 04:22:28)
> SparkSession available as 'spark'.
> Created spark dataframe:
> +---+---+
> | a| b|
> +---+---+
> | 0|0.0|
> | 1|0.5|
> | 2|1.0|
> | 3|1.5|
> | 4|2.0|
> | 5|2.5|
> | 6|3.0|
> | 7|3.5|
> | 8|4.0|
> | 9|4.5|
> +---+---+
> Read entire table with "filterPushdown"=True
> +---+---+
> | a| b|
> +---+---+
> | 1|0.5|
> | 2|1.0|
> | 3|1.5|
> | 5|2.5|
> | 6|3.0|
> | 7|3.5|
> | 8|4.0|
> | 9|4.5|
> | 4|2.0|
> | 0|0.0|
> +---+---+
> Read a selection from table with "filterPushdown"=False
> +---+---+
> | a| b|
> +---+---+
> | 6|3.0|
> | 7|3.5|
> | 8|4.0|
> | 9|4.5|
> +---+---+
> Read a selection from table with "filterPushdown"=True
> 2018-08-17 13:44:48,685 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 40)
> java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 2205991
> at org.apache.orc.impl.InStream$CompressedStream.readHeader(InStream.java:212)
> at org.apache.orc.impl.InStream$CompressedStream.ensureUncompressed(InStream.java:263)
> at org.apache.orc.impl.InStream$CompressedStream.read(InStream.java:250)
> at java.io.InputStream.read(InputStream.java:101)
> at com.google.protobuf25.CodedInputStream.refillBuffer(CodedInputStream.java:737)
> at com.google.protobuf25.CodedInputStream.isAtEnd(CodedInputStream.java:701)
> at com.google.protobuf25.CodedInputStream.readTag(CodedInputStream.java:99)
> at org.apache.orc.OrcProto$RowIndex.<init>(OrcProto.java:7609)
> at org.apache.orc.OrcProto$RowIndex.<init>(OrcProto.java:7573)
> at org.apache.orc.OrcProto$RowIndex$1.parsePartialFrom(OrcProto.java:7662)
> at org.apache.orc.OrcProto$RowIndex$1.parsePartialFrom(OrcProto.java:7657)
> at com.google.protobuf25.AbstractParser.parseFrom(AbstractParser.java:89)
> at com.google.protobuf25.AbstractParser.parseFrom(AbstractParser.java:95)
> at com.google.protobuf25.AbstractParser.parseFrom(AbstractParser.java:49)
> at org.apache.orc.OrcProto$RowIndex.parseFrom(OrcProto.java:7794)
> at org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readRowIndex(RecordReaderUtils.java:231)
> at org.apache.orc.impl.RecordReaderImpl.readRowIndex(RecordReaderImpl.java:1281)
> at org.apache.orc.impl.RecordReaderImpl.readRowIndex(RecordReaderImpl.java:1264)
> at org.apache.orc.impl.RecordReaderImpl.pickRowGroups(RecordReaderImpl.java:918)
> at org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:949)
> at org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1116)
> at org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1151)
> at org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:271)
> at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:627)
> at org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.initialize(OrcColumnarBatchReader.java:138)
> at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2.apply(OrcFileFormat.scala:196)
> at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2.apply(OrcFileFormat.scala:160)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2018-08-17 13:44:48,708 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 40, localhost, executor driver): java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 2205991
> {noformat}
> Meta data for test table (orc-tools/orc-metadata):
> {noformat}
> { "name": "/apps/hive/warehouse/spark_buffer_size_too_small_on_filter_pushdown/part-00000-358856bc-f771-43d1-bd83-024a288df787-c000.zlib.orc",
> "type": "struct<a:bigint,b:double>",
> "rows": 1,
> "stripe count": 1,
> "format": "0.12", "writer version": "ORC-135",
> "compression": "zlib", "compression block": 262144,
> "file length": 269,
> "content": 121, "stripe stats": 42, "footer": 82, "postscript": 23,
> "row index stride": 10000,
> "user metadata": {
> },
> "stripes": [
> { "stripe": 0, "rows": 1,
> "offset": 3, "length": 118,
> "index": 63, "data": 14, "footer": 41
> }
> ]
> }
> {noformat}
> Workaround: set spark.sql.orc.filterPushdown = false
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org