You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Han JU <ju...@gmail.com> on 2016/05/20 09:37:02 UTC

Dataset API and avro type

Hello,

I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0. However
it does not seems to work with Avro data types:


object Datasets extends App {
  val conf = new SparkConf()
  conf.setAppName("Dataset")
  conf.setMaster("local[2]")
  conf.setIfMissing("spark.serializer", classOf[KryoSerializer].getName)
  conf.setIfMissing("spark.kryo.registrator",
classOf[DatasetKryoRegistrator].getName)

  val sc = new SparkContext(conf)
  val sql = new SQLContext(sc)
  import sql.implicits._

  implicit val encoder = Encoders.kryo[MyAvroType]
  val data = sql.read.parquet("path/to/data").as[MyAvroType]

  var c = 0
  // BUG here
  val sizes = data.mapPartitions { iter =>
    List(iter.size).iterator
  }.collect().toList

  println(c)
}


class DatasetKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(
      classOf[MyAvroType],
AvroSerializer.SpecificRecordBinarySerializer[MyAvroType])
  }
}


I'm using chill-avro's kryo servirilizer for avro types and I've tried
`Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them
works. The errors seems to be that the generated code does not compile with
janino.

Tested in 1.6.1 and the 2.0.0-preview. Any idea?

-- 
*JU Han*

Software Engineer @ Teads.tv

+33 0619608888

Re: Dataset API and avro type

Posted by Michael Armbrust <mi...@databricks.com>.
if you are using the kryo encoder, you can only use it to to map to/from
kryo encoded binary data.  This is because spark does not understand kryo's
encoding, its just using it as an opaque blob of bytes.

On Mon, May 23, 2016 at 1:28 AM, Han JU <ju...@gmail.com> wrote:

> Just one more question: does Dataset suppose to be able to cast data to an
> avro type? For a very simple format (a string and a long), I can cast it to
> a tuple or case class, but not an avro type (also contains only a string
> and a long).
>
> The error is like this for this very simple type:
>
> === Result of Batch Resolution ===
> !'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0,
> string])) null else input[0, string].toString, if (isnull(input[1,
> bigint])) null else input[1, bigint],
> StructField(auctionId,StringType,true), StructField(ts,LongType,true)),
> auctionId#0, ts#1L) AS #2]   Project [createexternalrow(if
> (isnull(auctionId#0)) null else auctionId#0.toString, if (isnull(ts#1L))
> null else ts#1L, StructField(auctionId,StringType,true),
> StructField(ts,LongType,true)) AS #2]
>  +- LocalRelation [auctionId#0,ts#1L]
>
>
>                                                     +- LocalRelation
> [auctionId#0,ts#1L]
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to
> map struct<auctionId:string,ts:bigint> to Tuple1, but failed as the number
> of fields does not line up.
>  - Input schema: struct<auctionId:string,ts:bigint>
>  - Target schema: struct<value:binary>;
> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.org
> $apache$spark$sql$catalyst$encoders$ExpressionEncoder$$fail$1(ExpressionEncoder.scala:267)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.validate(ExpressionEncoder.scala:281)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:201)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:168)
> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:57)
> at org.apache.spark.sql.Dataset.as(Dataset.scala:366)
> at Datasets$.delayedEndpoint$Datasets$1(Datasets.scala:35)
> at Datasets$delayedInit$body.apply(Datasets.scala:23)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at Datasets$.main(Datasets.scala:23)
> at Datasets.main(Datasets.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> 2016-05-22 22:02 GMT+02:00 Michael Armbrust <mi...@databricks.com>:
>
>> That's definitely a bug.  If you can come up with a small reproduction it
>> would be great if you could open a JIRA.
>> On May 22, 2016 12:21 PM, "Han JU" <ju...@gmail.com> wrote:
>>
>>> Hi Michael,
>>>
>>> The error is like this under 2.0.0-preview. In 1.6.1 the error is very
>>> similar if not exactly the same.
>>> The file is a parquet file containing avro objects.
>>>
>>> Thanks!
>>>
>>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
>>> failed to compile: org.codehaus.commons.compiler.CompileException: File
>>> 'generated.java', Line 25, Column 160: No applicable constructor/method
>>> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
>>> candidates are: "public static java.nio.ByteBuffer
>>> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
>>> java.nio.ByteBuffer.wrap(byte[], int, int)"
>>> /* 001 */
>>> /* 002 */ public java.lang.Object generate(Object[] references) {
>>> /* 003 */   return new SpecificSafeProjection(references);
>>> /* 004 */ }
>>> /* 005 */
>>> /* 006 */ class SpecificSafeProjection extends
>>> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
>>> /* 007 */
>>> /* 008 */   private Object[] references;
>>> /* 009 */   private MutableRow mutableRow;
>>> /* 010 */   private org.apache.spark.serializer.KryoSerializerInstance
>>> serializer;
>>> /* 011 */
>>> /* 012 */
>>> /* 013 */   public SpecificSafeProjection(Object[] references) {
>>> /* 014 */     this.references = references;
>>> /* 015 */     mutableRow = (MutableRow) references[references.length -
>>> 1];
>>> /* 016 */     serializer =
>>> (org.apache.spark.serializer.KryoSerializerInstance) new
>>> org.apache.spark.serializer.KryoSerializer(new
>>> org.apache.spark.SparkConf()).newInstance();
>>> /* 017 */   }
>>> /* 018 */
>>> /* 019 */   public java.lang.Object apply(java.lang.Object _i) {
>>> /* 020 */     InternalRow i = (InternalRow) _i;
>>> /* 021 */     /* decodeusingserializer(input[0,
>>> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:str...
>>> */
>>> /* 022 */     /* input[0,
>>> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:struct<id:string,seatbid:...
>>> */
>>> /* 023 */     boolean isNull1 = i.isNullAt(0);
>>> /* 024 */     InternalRow value1 = isNull1 ? null : (i.getStruct(0, 7));
>>> /* 025 */     final tv.teads.model.rtb.RtbResponseEvent value = isNull1
>>> ? null : (tv.teads.model.rtb.RtbResponseEvent)
>>> serializer.deserialize(java.nio.ByteBuffer.wrap(value1), null);
>>> /* 026 */     if (isNull1) {
>>> /* 027 */       mutableRow.setNullAt(0);
>>> /* 028 */     } else {
>>> /* 029 */
>>> /* 030 */       mutableRow.update(0, value);
>>> /* 031 */     }
>>> /* 032 */
>>> /* 033 */     return mutableRow;
>>> /* 034 */   }
>>> /* 035 */ }
>>> /* 036 */
>>>
>>> at
>>> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>>> at
>>> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>>> at
>>> org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>>> at
>>> org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>>> at
>>> org.spark_project.guava.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2349)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>>> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>>> at
>>> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>>> at
>>> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:764)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:186)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:748)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:745)
>>> at
>>> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:61)
>>> at
>>> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:60)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> 2016-05-20 22:51 GMT+02:00 Michael Armbrust <mi...@databricks.com>:
>>>
>>>> What is the error?  I would definitely expect it to work with kryo at
>>>> least.
>>>>
>>>>
>>>> On Fri, May 20, 2016 at 2:37 AM, Han JU <ju...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0.
>>>>> However it does not seems to work with Avro data types:
>>>>>
>>>>>
>>>>> object Datasets extends App {
>>>>>   val conf = new SparkConf()
>>>>>   conf.setAppName("Dataset")
>>>>>   conf.setMaster("local[2]")
>>>>>   conf.setIfMissing("spark.serializer",
>>>>> classOf[KryoSerializer].getName)
>>>>>   conf.setIfMissing("spark.kryo.registrator",
>>>>> classOf[DatasetKryoRegistrator].getName)
>>>>>
>>>>>   val sc = new SparkContext(conf)
>>>>>   val sql = new SQLContext(sc)
>>>>>   import sql.implicits._
>>>>>
>>>>>   implicit val encoder = Encoders.kryo[MyAvroType]
>>>>>   val data = sql.read.parquet("path/to/data").as[MyAvroType]
>>>>>
>>>>>   var c = 0
>>>>>   // BUG here
>>>>>   val sizes = data.mapPartitions { iter =>
>>>>>     List(iter.size).iterator
>>>>>   }.collect().toList
>>>>>
>>>>>   println(c)
>>>>> }
>>>>>
>>>>>
>>>>> class DatasetKryoRegistrator extends KryoRegistrator {
>>>>>   override def registerClasses(kryo: Kryo) {
>>>>>     kryo.register(
>>>>>       classOf[MyAvroType],
>>>>> AvroSerializer.SpecificRecordBinarySerializer[MyAvroType])
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> I'm using chill-avro's kryo servirilizer for avro types and I've tried
>>>>> `Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them
>>>>> works. The errors seems to be that the generated code does not compile with
>>>>> janino.
>>>>>
>>>>> Tested in 1.6.1 and the 2.0.0-preview. Any idea?
>>>>>
>>>>> --
>>>>> *JU Han*
>>>>>
>>>>> Software Engineer @ Teads.tv
>>>>>
>>>>> +33 0619608888
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *JU Han*
>>>
>>> Software Engineer @ Teads.tv
>>>
>>> +33 0619608888
>>>
>>
>
>
> --
> *JU Han*
>
> Software Engineer @ Teads.tv
>
> +33 0619608888
>

Re: Dataset API and avro type

Posted by Han JU <ju...@gmail.com>.
Just one more question: does Dataset suppose to be able to cast data to an
avro type? For a very simple format (a string and a long), I can cast it to
a tuple or case class, but not an avro type (also contains only a string
and a long).

The error is like this for this very simple type:

=== Result of Batch Resolution ===
!'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0,
string])) null else input[0, string].toString, if (isnull(input[1,
bigint])) null else input[1, bigint],
StructField(auctionId,StringType,true), StructField(ts,LongType,true)),
auctionId#0, ts#1L) AS #2]   Project [createexternalrow(if
(isnull(auctionId#0)) null else auctionId#0.toString, if (isnull(ts#1L))
null else ts#1L, StructField(auctionId,StringType,true),
StructField(ts,LongType,true)) AS #2]
 +- LocalRelation [auctionId#0,ts#1L]


                                                  +- LocalRelation
[auctionId#0,ts#1L]

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to
map struct<auctionId:string,ts:bigint> to Tuple1, but failed as the number
of fields does not line up.
 - Input schema: struct<auctionId:string,ts:bigint>
 - Target schema: struct<value:binary>;
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.org
$apache$spark$sql$catalyst$encoders$ExpressionEncoder$$fail$1(ExpressionEncoder.scala:267)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.validate(ExpressionEncoder.scala:281)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:201)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:168)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:57)
at org.apache.spark.sql.Dataset.as(Dataset.scala:366)
at Datasets$.delayedEndpoint$Datasets$1(Datasets.scala:35)
at Datasets$delayedInit$body.apply(Datasets.scala:23)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at Datasets$.main(Datasets.scala:23)
at Datasets.main(Datasets.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

2016-05-22 22:02 GMT+02:00 Michael Armbrust <mi...@databricks.com>:

> That's definitely a bug.  If you can come up with a small reproduction it
> would be great if you could open a JIRA.
> On May 22, 2016 12:21 PM, "Han JU" <ju...@gmail.com> wrote:
>
>> Hi Michael,
>>
>> The error is like this under 2.0.0-preview. In 1.6.1 the error is very
>> similar if not exactly the same.
>> The file is a parquet file containing avro objects.
>>
>> Thanks!
>>
>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
>> failed to compile: org.codehaus.commons.compiler.CompileException: File
>> 'generated.java', Line 25, Column 160: No applicable constructor/method
>> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
>> candidates are: "public static java.nio.ByteBuffer
>> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
>> java.nio.ByteBuffer.wrap(byte[], int, int)"
>> /* 001 */
>> /* 002 */ public java.lang.Object generate(Object[] references) {
>> /* 003 */   return new SpecificSafeProjection(references);
>> /* 004 */ }
>> /* 005 */
>> /* 006 */ class SpecificSafeProjection extends
>> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
>> /* 007 */
>> /* 008 */   private Object[] references;
>> /* 009 */   private MutableRow mutableRow;
>> /* 010 */   private org.apache.spark.serializer.KryoSerializerInstance
>> serializer;
>> /* 011 */
>> /* 012 */
>> /* 013 */   public SpecificSafeProjection(Object[] references) {
>> /* 014 */     this.references = references;
>> /* 015 */     mutableRow = (MutableRow) references[references.length - 1];
>> /* 016 */     serializer =
>> (org.apache.spark.serializer.KryoSerializerInstance) new
>> org.apache.spark.serializer.KryoSerializer(new
>> org.apache.spark.SparkConf()).newInstance();
>> /* 017 */   }
>> /* 018 */
>> /* 019 */   public java.lang.Object apply(java.lang.Object _i) {
>> /* 020 */     InternalRow i = (InternalRow) _i;
>> /* 021 */     /* decodeusingserializer(input[0,
>> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:str...
>> */
>> /* 022 */     /* input[0,
>> struct<auctionId:string,ts:bigint,scenarioId:bigint,connectionId:bigint,level:int,bidResponse:struct<id:string,seatbid:...
>> */
>> /* 023 */     boolean isNull1 = i.isNullAt(0);
>> /* 024 */     InternalRow value1 = isNull1 ? null : (i.getStruct(0, 7));
>> /* 025 */     final tv.teads.model.rtb.RtbResponseEvent value = isNull1 ?
>> null : (tv.teads.model.rtb.RtbResponseEvent)
>> serializer.deserialize(java.nio.ByteBuffer.wrap(value1), null);
>> /* 026 */     if (isNull1) {
>> /* 027 */       mutableRow.setNullAt(0);
>> /* 028 */     } else {
>> /* 029 */
>> /* 030 */       mutableRow.update(0, value);
>> /* 031 */     }
>> /* 032 */
>> /* 033 */     return mutableRow;
>> /* 034 */   }
>> /* 035 */ }
>> /* 036 */
>>
>> at
>> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>> at
>> org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>> at
>> org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>> at
>> org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>> at
>> org.spark_project.guava.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620)
>> at
>> org.spark_project.guava.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362)
>> at
>> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2349)
>> at
>> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>> at
>> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>> at
>> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:764)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:186)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:748)
>> at
>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:745)
>> at
>> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:61)
>> at
>> org.apache.spark.sql.execution.DeserializeToObject$$anonfun$2.apply(objects.scala:60)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2016-05-20 22:51 GMT+02:00 Michael Armbrust <mi...@databricks.com>:
>>
>>> What is the error?  I would definitely expect it to work with kryo at
>>> least.
>>>
>>>
>>> On Fri, May 20, 2016 at 2:37 AM, Han JU <ju...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0.
>>>> However it does not seems to work with Avro data types:
>>>>
>>>>
>>>> object Datasets extends App {
>>>>   val conf = new SparkConf()
>>>>   conf.setAppName("Dataset")
>>>>   conf.setMaster("local[2]")
>>>>   conf.setIfMissing("spark.serializer", classOf[KryoSerializer].getName)
>>>>   conf.setIfMissing("spark.kryo.registrator",
>>>> classOf[DatasetKryoRegistrator].getName)
>>>>
>>>>   val sc = new SparkContext(conf)
>>>>   val sql = new SQLContext(sc)
>>>>   import sql.implicits._
>>>>
>>>>   implicit val encoder = Encoders.kryo[MyAvroType]
>>>>   val data = sql.read.parquet("path/to/data").as[MyAvroType]
>>>>
>>>>   var c = 0
>>>>   // BUG here
>>>>   val sizes = data.mapPartitions { iter =>
>>>>     List(iter.size).iterator
>>>>   }.collect().toList
>>>>
>>>>   println(c)
>>>> }
>>>>
>>>>
>>>> class DatasetKryoRegistrator extends KryoRegistrator {
>>>>   override def registerClasses(kryo: Kryo) {
>>>>     kryo.register(
>>>>       classOf[MyAvroType],
>>>> AvroSerializer.SpecificRecordBinarySerializer[MyAvroType])
>>>>   }
>>>> }
>>>>
>>>>
>>>> I'm using chill-avro's kryo servirilizer for avro types and I've tried
>>>> `Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them
>>>> works. The errors seems to be that the generated code does not compile with
>>>> janino.
>>>>
>>>> Tested in 1.6.1 and the 2.0.0-preview. Any idea?
>>>>
>>>> --
>>>> *JU Han*
>>>>
>>>> Software Engineer @ Teads.tv
>>>>
>>>> +33 0619608888
>>>>
>>>
>>>
>>
>>
>> --
>> *JU Han*
>>
>> Software Engineer @ Teads.tv
>>
>> +33 0619608888
>>
>


-- 
*JU Han*

Software Engineer @ Teads.tv

+33 0619608888

Re: Dataset API and avro type

Posted by Michael Armbrust <mi...@databricks.com>.
What is the error?  I would definitely expect it to work with kryo at least.

On Fri, May 20, 2016 at 2:37 AM, Han JU <ju...@gmail.com> wrote:

> Hello,
>
> I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0. However
> it does not seems to work with Avro data types:
>
>
> object Datasets extends App {
>   val conf = new SparkConf()
>   conf.setAppName("Dataset")
>   conf.setMaster("local[2]")
>   conf.setIfMissing("spark.serializer", classOf[KryoSerializer].getName)
>   conf.setIfMissing("spark.kryo.registrator",
> classOf[DatasetKryoRegistrator].getName)
>
>   val sc = new SparkContext(conf)
>   val sql = new SQLContext(sc)
>   import sql.implicits._
>
>   implicit val encoder = Encoders.kryo[MyAvroType]
>   val data = sql.read.parquet("path/to/data").as[MyAvroType]
>
>   var c = 0
>   // BUG here
>   val sizes = data.mapPartitions { iter =>
>     List(iter.size).iterator
>   }.collect().toList
>
>   println(c)
> }
>
>
> class DatasetKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.register(
>       classOf[MyAvroType],
> AvroSerializer.SpecificRecordBinarySerializer[MyAvroType])
>   }
> }
>
>
> I'm using chill-avro's kryo servirilizer for avro types and I've tried
> `Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them
> works. The errors seems to be that the generated code does not compile with
> janino.
>
> Tested in 1.6.1 and the 2.0.0-preview. Any idea?
>
> --
> *JU Han*
>
> Software Engineer @ Teads.tv
>
> +33 0619608888
>