You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Haopu Wang <HW...@qilinsoft.com> on 2014/09/22 10:34:49 UTC
Spark SQL 1.1.0: NPE when join two cached table
I have two data sets and want to join them on each first field. Sample
data are below:
data set 1:
id2,name1,2,300.0
data set 2:
id1,aaaaaaaaaaaa
The code is something like below:
val sparkConf = new SparkConf().setAppName("JoinInScala")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed",
"true")
import org.apache.spark.sql._
val testdata = sc.textFile(args(0) + "data.txt").map(_.split(","))
.map(p => Row(p(0), p(1).trim, p(2).trim.toLong,
p(3).trim.toDouble))
val fields = new Array[StructField](4)
fields(0) = StructField("id", StringType, false);
fields(1) = StructField("name", StringType, false);
fields(2) = StructField("agg1", LongType, false);
fields(3) = StructField("agg2", DoubleType, false);
val schema = StructType(fields);
val data = sqlContext.applySchema(testdata, schema)
data.registerTempTable("datatable")
sqlContext.cacheTable("datatable")
val refdata = sc.textFile(args(0) + "ref.txt").map(_.split(","))
.map(p => Row(p(0), p(1).trim))
val reffields = new Array[StructField](2)
reffields(0) = StructField("id", StringType, false);
reffields(1) = StructField("data", StringType, true);
val refschema = StructType(reffields);
val refschemardd = sqlContext.applySchema(refdata, refschema)
refschemardd.registerTempTable("ref")
sqlContext.cacheTable("ref")
val results = sqlContext.sql("SELECT
d.id,d.name,d.agg1,d.agg2,ref.data FROM datatable as d join ref on
d.id=ref.id")
results.foreach(T => Unit);
But I got below NullPointerException. If I comment out the two
"cacheTable()" calls, the program run well. Please shed some lights,
thank you!
Exception in thread "main" java.lang.NullPointerException
at
org.apache.spark.sql.columnar.InMemoryRelation.statistics$lzycompute(InM
emoryColumnarTableScan.scala:43)
at
org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumn
arTableScan.scala:42)
at
org.apache.spark.sql.execution.SparkStrategies$HashJoin$.apply(SparkStra
tegies.scala:83)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(Que
ryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(Que
ryPlanner.scala:58)
at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.s
cala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlann
er.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(Spa
rkStrategies.scala:268)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(Que
ryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(Que
ryPlanner.scala:58)
at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.s
cala:59)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLC
ontext.scala:402)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scal
a:400)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(S
QLContext.scala:406)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.s
cala:406)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLConte
xt.scala:409)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:40
9)
at
org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)
at
org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)
at
org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:189)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:189)
at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1233)
at
org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:117)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:759)
at Join$$anonfun$main$1.apply$mcVI$sp(Join.scala:44)
at
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at Join$.main(Join.scala:42)
at Join.main(Join.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Re: Spark SQL 1.1.0: NPE when join two cached table
Posted by Yin Huai <hu...@gmail.com>.
It is a bug. I have created https://issues.apache.org/jira/browse/SPARK-3641
to track it.
Thanks for reporting it.
Yin
On Mon, Sep 22, 2014 at 4:34 AM, Haopu Wang <HW...@qilinsoft.com> wrote:
> I have two data sets and want to join them on each first field. Sample
> data are below:
>
>
>
> data set 1:
>
> id2,name1,2,300.0
>
>
>
> data set 2:
>
> id1,aaaaaaaaaaaa
>
>
>
> The code is something like below:
>
>
>
> *val* sparkConf = *new* SparkConf().setAppName("JoinInScala")
>
> *val* sc = *new* SparkContext(sparkConf)
>
> *val* sqlContext = *new* SQLContext(sc)
>
> sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed",
> "true")
>
> *import* org.apache.spark.sql._
>
>
>
> *val* testdata = sc.textFile(args(0) + "data.txt").map(_.split(","))
>
> .map(p => Row(p(0), p(1).trim, *p(**2**).trim*.toLong, *p(**3*
> *).trim*.toDouble))
>
>
>
> *val* fields = *new* Array[StructField](4)
>
> fields(0) = StructField("id", StringType, *false*);
>
> fields(1) = StructField("name", StringType, *false*);
>
> fields(2) = StructField("agg1", LongType, *false*);
>
> fields(3) = StructField("agg2", DoubleType, *false*);
>
> *val* schema = StructType(*fields*);
>
>
>
> *val* data = sqlContext.applySchema(testdata, schema)
>
>
>
> data.registerTempTable("datatable")
>
> sqlContext.cacheTable("datatable")
>
>
>
> *val* refdata = sc.textFile(args(0) + "ref.txt").map(_.split(","))
>
> .map(p => Row(p(0), p(1).trim))
>
>
>
> *val* reffields = *new* Array[StructField](2)
>
> reffields(0) = StructField("id", StringType, *false*);
>
> reffields(1) = StructField("data", StringType, *true*);
>
> *val* refschema = StructType(*reffields*);
>
>
>
> *val* refschemardd = sqlContext.applySchema(refdata, refschema)
>
> refschemardd.registerTempTable("ref")
>
> sqlContext.cacheTable("ref")
>
>
>
> *val* results = sqlContext.sql("SELECT d.id,d.name,d.agg1,d.agg2,ref.data
> FROM datatable as d join ref on d.id=ref.id")
>
> results.foreach(T => Unit);
>
>
>
> But I got below NullPointerException. If I comment out the two
> "cacheTable()" calls, the program run well. Please shed some lights, thank
> you!
>
>
>
> Exception in thread "main" java.lang.NullPointerException
>
> at
> org.apache.spark.sql.columnar.InMemoryRelation.statistics$lzycompute(InMemoryColumnarTableScan.scala:43)
>
> at
> org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:42)
>
> at
> org.apache.spark.sql.execution.SparkStrategies$HashJoin$.apply(SparkStrategies.scala:83)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:268)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)
>
> at
> org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:189)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at org.apache.spark.rdd.RDD.dependencies(RDD.scala:189)
>
> at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1233)
>
> at
> org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:117)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
>
> at org.apache.spark.rdd.RDD.foreach(RDD.scala:759)
>
> at Join$$anonfun$main$1.apply$mcVI$sp(Join.scala:44)
>
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>
> at Join$.main(Join.scala:42)
>
> at Join.main(Join.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
>
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
>
>
>
>
>
>
>
>