You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by "Robbie Strickland (JIRA)" <ji...@apache.org> on 2015/10/01 16:21:26 UTC

[jira] [Created] (ZEPPELIN-332) CNFE when running SQL query against Cassandra temp table

Robbie Strickland created ZEPPELIN-332:
------------------------------------------

             Summary: CNFE when running SQL query against Cassandra temp table
                 Key: ZEPPELIN-332
                 URL: https://issues.apache.org/jira/browse/ZEPPELIN-332
             Project: Zeppelin
          Issue Type: Bug
          Components: Interpreters
    Affects Versions: 0.6.0
         Environment: Ubuntu 14.04, Spark 1.4.1, Zeppelin 0.6.0, Hadoop 2.6.0, Cassandra 2.1.8, Cassandra-Spark Connector 1.4.0
            Reporter: Robbie Strickland


When running a SQL statement against a Cassandra temp table where no records have previously been realized using the {{SQLContext}}, a {{ClassNotFoundException}} is thrown.

For example, we run the following code to register the table:

{code:scala}
import com.datastax.spark.connector._
case class Stats(queue: String, time: Long, host: String, successes: Long)
val stats2 = sc.cassandraTable[Stats]("prod_analytics_events", "stats").select("queue", "time", "host", "successes").where("time >= 1442707200000 and time < 1442793600000")
stats2.toDF.registerTempTable("stats2")
{code}

If we immediately try to run a {{%sql}} query, such as:

{code:sql}
%sql
select * from stats2 limit 10
{code}

we will get the following stack trace:

{code}
java.lang.ClassNotFoundException: $line551.$read
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:500)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1167)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1255)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:202)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:65)
	at com.datastax.spark.connector.rdd.reader.AnyObjectFactory.<init>(AnyObjectFactory.scala:30)
	at com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.<init>(GettableDataToMappedTypeConverter.scala:45)
	at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.<init>(ClassBasedRowReader.scala:22)
	at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:47)
	at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:42)
	at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.rowReader(CassandraTableRowReaderProvider.scala:48)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader$lzycompute(CassandraTableScanRDD.scala:59)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader(CassandraTableScanRDD.scala:59)
	at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:151)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:121)
	at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)
	at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
	at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
	at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
	at sun.reflect.GeneratedMethodAccessor106.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.zeppelin.spark.ZeppelinContext.showDF(ZeppelinContext.java:300)
	at org.apache.zeppelin.spark.SparkSqlInterpreter.interpret(SparkSqlInterpreter.java:142)
	at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
	at org.apache.zeppelin.scheduler.Job.run(Job.java:170)
	at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

However, it is possible to run a query directly using the {{SQLContext}} without issue:

{code:scala}
sqlContext.sql("select * from stats2 limit 10").collect
{code}

returns the expected results:

{code}
stats2: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Stats] = CassandraTableScanRDD[637] at RDD at CassandraRDD.scala:15
res155: Array[org.apache.spark.sql.Row] = Array([events_ANDROID_LocationUpdate,1442707206499,sink4x056,1821024], [events_ANDROID_LocationUpdate,1442707207062,sink4x019,1480357], [events_ANDROID_LocationUpdate,1442707266854,sink4x056,1821394], [events_ANDROID_LocationUpdate,1442707268281,sink4x019,1480675], [events_ANDROID_LocationUpdate,1442707329595,sink4x056,1821771], [events_ANDROID_LocationUpdate,1442707332608,sink4x019,1480979], [events_ANDROID_LocationUpdate,1442707389853,sink4x056,1822088], [events_ANDROID_LocationUpdate,1442707393107,sink4x019,1481257], [events_ANDROID_LocationUpdate,1442707451639,sink4x056,1822413], [events_ANDROID_LocationUpdate,1442707457504,sink4x019,1481591])
{code}

Additionally, if we first materialize some rows using the {{SQLContext}} (such as in the above example), further queries using {{%sql}} work fine.

Relevant config from zeppelin-env.sh:

{code}
export ZEPPELIN_JAVA_OPTS="-Dspark.jars=/opt/spark/lib/spark-cassandra-connector-assembly.jar:/opt/hadoop/share/hadoop/tools/lib/*:/opt/jars/*:/opt/spark/lib/pyspark-cassandra.jar -Dspark.cassandra.connection.host=x.x.x.x -Dspark.cassandra.read.timeout_ms=300000 -Dspark.cassandra.auth.username=zeppelin -Dspark.cassandra.auth.password=[password]"
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)