You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zhanpeng Wu (JIRA)" <ji...@apache.org> on 2015/12/25 09:40:49 UTC

[jira] [Resolved] (SPARK-12518) Problem in Spark deserialization with htsjdk BAMRecordCodec

     [ https://issues.apache.org/jira/browse/SPARK-12518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Zhanpeng Wu resolved SPARK-12518.
---------------------------------
    Resolution: Fixed

> Problem in Spark deserialization with htsjdk BAMRecordCodec
> -----------------------------------------------------------
>
>                 Key: SPARK-12518
>                 URL: https://issues.apache.org/jira/browse/SPARK-12518
>             Project: Spark
>          Issue Type: Question
>          Components: Java API
>    Affects Versions: 1.5.2
>         Environment: Linux Red Hat 4.8.2-16, Java 8, htsjdk-1.130
>            Reporter: Zhanpeng Wu
>
> When I used [htsjdk|https://github.com/samtools/htsjdk] in my Spark application, I found some problem in record deserialization. The object of *SAMRecord* could not be deserialzed and throw the exception: 
> {quote}
> WARN ThrowableSerializationWrapper: Task exception could not be deserialized
> java.lang.ClassNotFoundException: htsjdk.samtools.util.RuntimeIOException
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:340)
>         at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>         at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>         at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>         at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>         at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
>         at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
>         at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
>         at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
>         at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
>         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>         at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> It seems that the application encountered a premature EOF when deserialing.
> Here is my test code: 
> {code:title=Test.java|borderStyle=solid}
> public class Test {
> 	public static void main(String[] args) {
> 		SparkConf sparkConf = new SparkConf().setAppName("Spark htsjdk - Test ver");
> 		JavaSparkContext jsc = new JavaSparkContext(sparkConf);
> 		jsc.newAPIHadoopFile(args[0], BAMInputFormat.class, LongWritable.class, SAMRecordWritable.class,
> 				jsc.hadoopConfiguration())
> 				.map(new Function<Tuple2<LongWritable, SAMRecordWritable>, SAMRecordWritable>() {
> 					private static final long serialVersionUID = 1791992620460009575L;
> 					@Override
> 					public SAMRecordWritable call(Tuple2<LongWritable, SAMRecordWritable> tuple2)
>                                         throws Exception {
> 						return tuple2._2;
> 					}
> 				}).repartition(18).saveAsTextFile(args[1]);
> 		jsc.close();
> 		jsc.stop();
> 	}
> }
> {code}
> My custom JavaSerializer class: 
> {code:title=SAMRecordWritable .java|borderStyle=solid}
> public class SAMRecordWritable extends JavaSerializer {
> 	private static final long serialVersionUID = 8212888201641460871L;
> 	private static final BAMRecordCodec lazyCodec =
> 		new BAMRecordCodec(null, new LazyBAMRecordFactory());
> 	private transient SAMRecord record;
> 	public SAMRecord get()            { return record; }
> 	public void      set(SAMRecord r) { record = r; }
> 	
> 	/* JavaSerializer */
> 	public void writeExternal(java.io.ObjectOutput out) {
> 		final BAMRecordCodec codec = new BAMRecordCodec(record.getHeader());
> 		codec.setOutputStream(new DataOutputWrapper(out));
> 		codec.encode(record);
> 	}
> 	
> 	public void readExternal(java.io.ObjectInput in) {
> 		lazyCodec.setInputStream(new DataInputWrapper(in));
> 		record = lazyCodec.decode();
> 	}
> }
> {code}
> But when I serialize the record to a local file, not in Spark, it works. This confuses me a lot. Anybody help?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org