You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tinkerpop.apache.org by "Dan LaRocque (JIRA)" <ji...@apache.org> on 2016/06/18 06:22:05 UTC

[jira] [Commented] (TINKERPOP-1341) UnshadedKryoAdapter fails to deserialize StarGraph when SparkConf sets spark.rdd.compress=true whereas GryoSerializer works

    [ https://issues.apache.org/jira/browse/TINKERPOP-1341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15337592#comment-15337592 ] 

Dan LaRocque commented on TINKERPOP-1341:
-----------------------------------------

Hi [~dylanht], I worked on the new serializer.

I think there are two completely orthogonal problems here (correct me if you see it otherwise):

1. CompactBuffer[] is not registered by default.  This is a bug.  GryoRegistrator should register this and probably BoxedUnit.  I just sent a more detailed message about this problem to the thread https://groups.google.com/d/topic/gremlin-users/s-mVqjRf3BI/discussion on gremlin-users.  I would link directly, but my message hasn't appeared yet (sent about 10 min ago), so I'm copying the relevant part of my message here:

    {quote}
GryoSerializer registered a slew of scala runtime and Spark types.  I deliberately dropped all of these in GryoRegistrator, thinking that KryoSerializer registered all scala runtime or Spark types that TP jobs wolud need, so that GryoRegistrator could just carry over every TinkerPop registration and function equivalently.   I was wrong.  For instance, GryoRegistrator registers both CompactBuffer and CompactBuffer[].  CompactBuffer is registered in KryoSerializer, but CompactBuffer[] (the array type) is not.   I think this means that TinkerPop is passing around CompactBuffer[] in the course of running jobs (not sure where/why), whereas Spark does not necessarily need CompactBuffer[] serialization for its own internals.
 
Going through each line of GryoSerializer's constructor that registers a scala runtime type or Spark type and comparing against https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala, I found the following:
 
Not registered anywhere:
 
CompactBuffer[]:  NOT REGISTERED (should be registered in GryoRegistrator)
BoxedUnit: (probably) NOT REGISTERED (should be registered in GryoRegistrator)
 
Registered somewhere:
 
Tuple2, Tuple3: registered in KS (via KS calling chill's AllScalaRegistrar which calls chill's ScalaTupleSerialization which does the actual registration)
Tuple2[], Tuple3[]: registered in KS
CompactBuffer: registered in KS
CompressedMapStatus: registered in KS
BlockManagerId: registered in KS
HighlyCompressedMapStatus: registered in KS
HttpBroadcast: registered in KS
PythonBroadcast: registered in KS
scala.reflect.ClassTag$$anon$1: registered in GryoRegistrator (only known to come up testing though)
scala.reflect.ManifestFactory$$anon$1: same as last line
WrappedArray.ofRef: registered in GryoRegistrator
 
I'm going to make a PR that adds CompactBuffer[] and probably BoxedUnit to GryoRegistrator.  I may try to see whether BoxedUnit just eluded my source reading by starting up a test Spark environment and finding a way to do something like <spark's KryoSerializer>.newKryo().getClassResolver().getRegistration(BoxedUnit.class).
{quote}

2. Snappy+Snappy decompression failure.  I agree with you that https://issues.apache.org/jira/browse/SPARK-3630 could be related.  Are you willing to try Gabor's suggested workaround of switching algorithms ("I have switched 'spark.io.compression.codec' to 'lz4' and now the problem seems to be gone")?  That might give us a hint about whether this is related to the spark issue.

> UnshadedKryoAdapter fails to deserialize StarGraph when SparkConf sets spark.rdd.compress=true whereas GryoSerializer works
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: TINKERPOP-1341
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-1341
>             Project: TinkerPop
>          Issue Type: Bug
>          Components: io
>    Affects Versions: 3.2.1, 3.3.0
>            Reporter: Dylan Bethune-Waddell
>            Priority: Minor
>
> When trying to bulk load a large dataset into Titan I was running into OOM errors and decided to try tweaking some spark configuration settings - although I am having trouble bulk loading with the new GryoRegistrator/UnshadedKryo serialization shim stuff in master whereby a few hundred tasks into the edge loading stage (stage 5) exceptions are thrown complaining about the need to explicitly register CompactBuffer[].class with Kryo, this approach with spark.rdd.compress=true fails a few hundred tasks into the vertex loading stage (stage 1) of BulkLoaderVertexProgram. GryoSerializer instead of KryoSerializer with GryoRegistrator does not fail and successfully loads the data with this compression flag flipped on whereas before I would just get OOM errors until eventually the job was set back so far that it just failed. So it would seem it is desirable in some instances to use this setting, and the new Serialization stuff seems to break it. Could be a Spark upstream issue based on this open JIRA ticket (https://issues.apache.org/jira/browse/SPARK-3630). Here is the exception that is thrown with the middle bits cut out:
> com.esotericsoftware.kryo.KryoException: java.io.IOException: PARSING_ERROR(2)
>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
>         at com.esotericsoftware.kryo.io.Input.require(Input.java:169)
>         at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:715)
>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:665)
>         at com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:113)
>         at com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:103)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>         at org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readClassAndObject(UnshadedKryoAdapter.java:48)
>         at org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readClassAndObject(UnshadedKryoAdapter.java:30)
>         at org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer.readEdges(StarGraphSerializer.java:134)
>         at org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer.read(StarGraphSerializer.java:91)
>         at org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer.read(StarGraphSerializer.java:45)
>         at org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter.read(UnshadedSerializerAdapter.java:55)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
>         at org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readObject(UnshadedKryoAdapter.java:42)
>         at org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readObject(UnshadedKryoAdapter.java:30)
>         at org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer.read(VertexWritableSerializer.java:46)
>         at org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer.read(VertexWritableSerializer.java:36)
>         at org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter.read(UnshadedSerializerAdapter.java:55)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>         at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
> ........................................................ and so on .....................................
> Caused by: java.io.IOException: PARSING_ERROR(2)
>         at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
>         at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
>         at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
>         at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:358)
>         at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
>         at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)
>         ... 51 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)