You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by shaiw75 <sh...@intentiq.com> on 2014/06/25 16:08:20 UTC

Spark and Cassandra - NotSerializableException

Hi,

I am writing a standalone Spark program that gets its data from Cassandra.
I followed the examples and created the RDD via the newAPIHadoopRDD() and
the ColumnFamilyInputFormat class.
The RDD is created, but I get a NotSerializableException when I call the
RDD's .groupByKey() method:

public static void main(String[] args) {
	SparkConf sparkConf = new SparkConf();
	sparkConf.setMaster("local").setAppName("Test");
	JavaSparkContext ctx = new JavaSparkContext(sparkConf);

	Job job = new Job();
	Configuration jobConf = job.getConfiguration();
	job.setInputFormatClass(ColumnFamilyInputFormat.class);

	ConfigHelper.setInputInitialAddress(jobConf, host);
	ConfigHelper.setInputRpcPort(jobConf, port);
	ConfigHelper.setOutputInitialAddress(jobConf, host);
	ConfigHelper.setOutputRpcPort(jobConf, port);
	ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true);
	ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner");
	ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner");

	SlicePredicate predicate = new SlicePredicate();
	SliceRange sliceRange = new SliceRange();
	sliceRange.setFinish(new byte[0]);
	sliceRange.setStart(new byte[0]);
	predicate.setSlice_range(sliceRange);
	ConfigHelper.setInputSlicePredicate(jobConf, predicate);

	JavaPairRDD<ByteBuffer, SortedMap&lt;ByteBuffer, IColumn>> rdd =
		spark.newAPIHadoopRDD(jobConf,
		
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
			ByteBuffer.class, SortedMap.class);
			
	JavaPairRDD<ByteBuffer, Iterable&lt;SortedMap&lt;ByteBuffer, IColumn>>>
groupRdd = rdd.groupByKey();
	System.out.println(groupRdd.count());
}

The exception:

java.io.NotSerializableException: java.nio.HeapByteBuffer
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
	at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
	at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179)
	at
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
	at
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
	at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.Task.run(Task.scala:51)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:662)
	
What I am trying to do is to merge all row key columns into a single entry.
I also get the same exception when I try to use the reduceByKey() method
like so:

JavaPairRDD<ByteBuffer, SortedMap&lt;ByteBuffer, IColumn>> reducedRdd =
rdd.reduceByKey(
	new Function2<SortedMap&lt;ByteBuffer, IColumn>, SortedMap<ByteBuffer,
IColumn>, SortedMap<ByteBuffer, IColumn>>() {
		public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn>
arg0,
				SortedMap<ByteBuffer, IColumn> arg1) throws Exception {
			SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer,
IColumn>(arg0.comparator());
			sortedMap.putAll(arg0);
			sortedMap.putAll(arg1);
			return sortedMap;
		}
	}
);

I am using:
1. spark-1.0.0-bin-hadoop1
2. Cassandra 1.2.12
3. Java 1.6

Do anyone know what the problem is?
What is there that fails serialization?

Thanks,
Shai



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Cassandra-NotSerializableException-tp8260.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.