You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by shaiw75 <sh...@intentiq.com> on 2014/06/25 16:08:20 UTC
Spark and Cassandra - NotSerializableException
Hi,
I am writing a standalone Spark program that gets its data from Cassandra.
I followed the examples and created the RDD via the newAPIHadoopRDD() and
the ColumnFamilyInputFormat class.
The RDD is created, but I get a NotSerializableException when I call the
RDD's .groupByKey() method:
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local").setAppName("Test");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
Job job = new Job();
Configuration jobConf = job.getConfiguration();
job.setInputFormatClass(ColumnFamilyInputFormat.class);
ConfigHelper.setInputInitialAddress(jobConf, host);
ConfigHelper.setInputRpcPort(jobConf, port);
ConfigHelper.setOutputInitialAddress(jobConf, host);
ConfigHelper.setOutputRpcPort(jobConf, port);
ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true);
ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner");
ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner");
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setFinish(new byte[0]);
sliceRange.setStart(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(jobConf, predicate);
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd =
spark.newAPIHadoopRDD(jobConf,
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
ByteBuffer.class, SortedMap.class);
JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>>
groupRdd = rdd.groupByKey();
System.out.println(groupRdd.count());
}
The exception:
java.io.NotSerializableException: java.nio.HeapByteBuffer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179)
at
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
What I am trying to do is to merge all row key columns into a single entry.
I also get the same exception when I try to use the reduceByKey() method
like so:
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd =
rdd.reduceByKey(
new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer,
IColumn>, SortedMap<ByteBuffer, IColumn>>() {
public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn>
arg0,
SortedMap<ByteBuffer, IColumn> arg1) throws Exception {
SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer,
IColumn>(arg0.comparator());
sortedMap.putAll(arg0);
sortedMap.putAll(arg1);
return sortedMap;
}
}
);
I am using:
1. spark-1.0.0-bin-hadoop1
2. Cassandra 1.2.12
3. Java 1.6
Do anyone know what the problem is?
What is there that fails serialization?
Thanks,
Shai
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Cassandra-NotSerializableException-tp8260.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.