You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by StanZhai <ma...@zhaishidan.cn> on 2017/02/04 07:15:19 UTC
[SQL]A confusing NullPointerException when creating table using
Spark2.1.0
Hi all,
After upgrading our Spark from 1.6.2 to 2.1.0, I encounter a confusing
NullPointerException when creating table under Spark 2.1.0, but the problem
does not exists in Spark 1.6.1.
Environment: Hive 1.2.1, Hadoop 2.6.4
==================== Code ====================
// spark is an instance of HiveContext
// merge is a Hive UDF
val df = spark.sql("SELECT merge(field_a, null) AS new_a, field_b AS new_b
FROM tb_1 group by field_a, field_b")
df.createTempView("tb_temp")
spark.sql("create table tb_result stored as parquet as " +
"SELECT new_a" +
"FROM tb_temp" +
"LEFT JOIN `tb_2` ON " +
"if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL),
concat('GrLSRwZE_', cast((rand() * 200) AS int)), (`tb_temp`.`new_b`)) =
`tb_2`.`fka6862f17`")
==================== Physical Plan ====================
*Project [new_a]
+- *BroadcastHashJoin [if (((new_b = ) || isnull(new_b))) concat(GrLSRwZE_,
cast(cast((_nondeterministic * 200.0) as int) as string)) else new_b],
[fka6862f17], LeftOuter, BuildRight
:- HashAggregate(keys=[field_a, field_b], functions=[], output=[new_a,
new_b, _nondeterministic])
: +- Exchange(coordinator ) hashpartitioning(field_a, field_b, 180),
coordinator[target post-shuffle partition size: 1024880]
: +- *HashAggregate(keys=[field_a, field_b], functions=[],
output=[field_a, field_b])
: +- *FileScan parquet bdp.tb_1[field_a,field_b] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_1,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
+- *Project [fka6862f17]
+- *FileScan parquet bdp.tb_2[fka6862f17] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_2,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
What does '*' mean before HashAggregate?
==================== Exception ====================
org.apache.spark.SparkException: Task failed while writing rows
...
java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$3.apply(AggregationIterator.scala:260)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$3.apply(AggregationIterator.scala:259)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:392)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:252)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:199)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:197)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:202)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$4.apply(FileFormatWriter.scala:138)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$4.apply(FileFormatWriter.scala:137)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I also found that when I changed my code as follow:
spark.sql("create table tb_result stored as parquet as " +
"SELECT new_b" +
"FROM tb_temp" +
"LEFT JOIN `tb_2` ON " +
"if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL),
concat('GrLSRwZE_', cast((rand() * 200) AS int)), (`tb_temp`.`new_b`)) =
`tb_2`.`fka6862f17`")
or
spark.sql("create table tb_result stored as parquet as " +
"SELECT new_a" +
"FROM tb_temp" +
"LEFT JOIN `tb_2` ON " +
"if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL),
concat('GrLSRwZE_', cast((200) AS int)), (`tb_temp`.`new_b`)) =
`tb_2`.`fka6862f17`")
will not have this problem.
== Physical Plan of select new_b ... ==
*Project [new_b]
+- *BroadcastHashJoin [if (((new_b = ) || isnull(new_b))) concat(GrLSRwZE_,
cast(cast((_nondeterministic * 200.0) as int) as string)) else new_b],
[fka6862f17], LeftOuter, BuildRight
:- *HashAggregate(keys=[field_a, field_b], functions=[], output=[new_b,
_nondeterministic])
: +- Exchange(coordinator ) hashpartitioning(field_a, field_b, 180),
coordinator[target post-shuffle partition size: 1024880]
: +- *HashAggregate(keys=[field_a, field_b], functions=[],
output=[field_a, field_b])
: +- *FileScan parquet bdp.tb_1[field_a,field_b] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_1,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
+- *Project [fka6862f17]
+- *FileScan parquet bdp.tb_2[fka6862f17] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_2,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
Difference is `HashAggregate(keys=[field_a, field_b], functions=[],
output=[new_b, _nondeterministic])` has a '*' char before it.
It looks like something wrong with WholeStageCodegen when combine HiveUDF +
rand().
How can I solve this problem?
Any help is greatly appreicated!
Best,
Stan
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-A-confusing-NullPointerException-when-creating-table-using-Spark2-1-0-tp20851.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
Re: [SQL]A confusing NullPointerException when creating table using
Spark2.1.0
Posted by StanZhai <ma...@zhaishidan.cn>.
This issue has been fixed by https://github.com/apache/spark/pull/16820
<https://github.com/apache/spark/pull/16820> .
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-A-confusing-NullPointerException-when-creating-table-using-Spark2-1-0-tp20851p20866.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org