You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Luis Felipe Sant Ana (JIRA)" <ji...@apache.org> on 2017/02/23 15:28:44 UTC

[jira] [Updated] (SPARK-19711) Bug in gapply function

     [ https://issues.apache.org/jira/browse/SPARK-19711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Luis Felipe Sant Ana updated SPARK-19711:
-----------------------------------------
    Description: 
I have a dataframe in SparkR like 

  CNPJ            PID       DATA N
1 10140281000131 10000000000021 2015-04-23 1
2 10140281000131 10000000000021 2015-04-27 1
3 10140281000131 10000000000021 2015-04-02 1
4 10140281000131 10000000000021 2015-11-10 1
5 10140281000131 10000000000021 2016-11-14 1
6 10140281000131 10000000000021 2015-04-03 1

And, I want to group by columns CNPJ and PID using gapply() function, filling in the column DATA with date. Then I fill in the missing dates with zeros.

The code:
schema <- structType(structField("CNPJ", "string"), 
                     structField("PID", "string"),
                     structField("DATA", "date"),
                     structField("N", "double"))

result <- gapply(
  ds_filtered,
  c("CNPJ", "PID"),
  function(key, x) {
    dts <- data.frame(key, DATA = seq(min(as.Date(x$DATA)), as.Date(e_date), "days"))
    colnames(dts)[c(1, 2)] <- c("CNPJ", "PID")
    
    y <- data.frame(key, DATA = as.Date(x$DATA), N = x$N)
    colnames(y)[c(1, 2)] <- c("CNPJ", "PID")
    
    y <- dplyr::left_join(dts, 
                     y,
                     by = c("CNPJ", "PID", "DATA"))
    
    y[is.na(y$N), 4] <- 0
    
    data.frame(CNPJ = as.character(y$CNPJ),
               PID = as.character(y$PID),
               DATA = y$DATA,
               N = y$N)
  }, 
  schema)

Error:

Error in handleErrors(returnStatus, conn) : 
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 92.0 failed 4 times, most recent failure: Lost task 0.3 in stage 92.0 (TID 7032, 10.93.243.111, executor 0): org.apache.spark.SparkException: R computation failed with
 Error in writeType(con, serdeType) : 
  Unsupported type for serialization factor
Calls: outputResult ... serializeRow -> writeList -> writeObject -> writeType
Execution halted
	at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:404)
	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:386)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2784)
	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2354)
	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2354)
	at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:2768)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2767)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2354)
	at org.apache.spark.sql.api.r.SQLUtils$.dfToCols(SQLUtils.scala:208)
	at org.apache.spark.sql.api.r.SQLUtils.dfToCols(SQLUtils.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:167)
	at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:108)
	at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:40)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: R computation failed with
 Error in writeType(con, serdeType) : 
  Unsupported type for serialization factor
Calls: outputResult ... serializeRow -> writeList -> writeObject -> writeType
Execution halted
	at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:404)
	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:386)
	at org.apache.spark.rdd.RDD$$an

With gapplyCollect() function this work. 


Thank you! 
 


  was:
I have a dataframe in SparkR like 

  CNPJ            PID       DATA N
1 10140281000131 10000000000021 2015-04-23 1
2 10140281000131 10000000000021 2015-04-27 1
3 10140281000131 10000000000021 2015-04-02 1
4 10140281000131 10000000000021 2015-11-10 1
5 10140281000131 10000000000021 2016-11-14 1
6 10140281000131 10000000000021 2015-04-03 1

And, I want to group by columns CNPJ and PID using gapply() function, filling in the column DATA with date

The code:
schema <- structType(structField("CNPJ", "string"), 
                     structField("PID", "string"),
                     structField("DATA", "date"),
                     structField("N", "double"))

result <- gapply(
  ds_filtered,
  c("CNPJ", "PID"),
  function(key, x) {
    dts <- data.frame(key, DATA = seq(min(as.Date(x$DATA)), as.Date(e_date), "days"))
    colnames(dts)[c(1, 2)] <- c("CNPJ", "PID")
    
    y <- data.frame(key, DATA = as.Date(x$DATA), N = x$N)
    colnames(y)[c(1, 2)] <- c("CNPJ", "PID")
    
    y <- dplyr::left_join(dts, 
                     y,
                     by = c("CNPJ", "PID", "DATA"))
    
    y[is.na(y$N), 4] <- 0
    
    data.frame(CNPJ = as.character(y$CNPJ),
               PID = as.character(y$PID),
               DATA = y$DATA,
               N = y$N)
  }, 
  schema)

Error:

Error in handleErrors(returnStatus, conn) : 
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 92.0 failed 4 times, most recent failure: Lost task 0.3 in stage 92.0 (TID 7032, 10.93.243.111, executor 0): org.apache.spark.SparkException: R computation failed with
 Error in writeType(con, serdeType) : 
  Unsupported type for serialization factor
Calls: outputResult ... serializeRow -> writeList -> writeObject -> writeType
Execution halted
	at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:404)
	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:386)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2784)
	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2354)
	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2354)
	at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:2768)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2767)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2354)
	at org.apache.spark.sql.api.r.SQLUtils$.dfToCols(SQLUtils.scala:208)
	at org.apache.spark.sql.api.r.SQLUtils.dfToCols(SQLUtils.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:167)
	at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:108)
	at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:40)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: R computation failed with
 Error in writeType(con, serdeType) : 
  Unsupported type for serialization factor
Calls: outputResult ... serializeRow -> writeList -> writeObject -> writeType
Execution halted
	at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:404)
	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:386)
	at org.apache.spark.rdd.RDD$$an

With gapplyCollect() function this work. 


Thank you! 
 



> Bug in gapply function
> ----------------------
>
>                 Key: SPARK-19711
>                 URL: https://issues.apache.org/jira/browse/SPARK-19711
>             Project: Spark
>          Issue Type: Bug
>          Components: SparkR
>    Affects Versions: 2.1.0
>         Environment: Using Databricks plataform.
>            Reporter: Luis Felipe Sant Ana
>
> I have a dataframe in SparkR like 
>   CNPJ            PID       DATA N
> 1 10140281000131 10000000000021 2015-04-23 1
> 2 10140281000131 10000000000021 2015-04-27 1
> 3 10140281000131 10000000000021 2015-04-02 1
> 4 10140281000131 10000000000021 2015-11-10 1
> 5 10140281000131 10000000000021 2016-11-14 1
> 6 10140281000131 10000000000021 2015-04-03 1
> And, I want to group by columns CNPJ and PID using gapply() function, filling in the column DATA with date. Then I fill in the missing dates with zeros.
> The code:
> schema <- structType(structField("CNPJ", "string"), 
>                      structField("PID", "string"),
>                      structField("DATA", "date"),
>                      structField("N", "double"))
> result <- gapply(
>   ds_filtered,
>   c("CNPJ", "PID"),
>   function(key, x) {
>     dts <- data.frame(key, DATA = seq(min(as.Date(x$DATA)), as.Date(e_date), "days"))
>     colnames(dts)[c(1, 2)] <- c("CNPJ", "PID")
>     
>     y <- data.frame(key, DATA = as.Date(x$DATA), N = x$N)
>     colnames(y)[c(1, 2)] <- c("CNPJ", "PID")
>     
>     y <- dplyr::left_join(dts, 
>                      y,
>                      by = c("CNPJ", "PID", "DATA"))
>     
>     y[is.na(y$N), 4] <- 0
>     
>     data.frame(CNPJ = as.character(y$CNPJ),
>                PID = as.character(y$PID),
>                DATA = y$DATA,
>                N = y$N)
>   }, 
>   schema)
> Error:
> Error in handleErrors(returnStatus, conn) : 
>   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 92.0 failed 4 times, most recent failure: Lost task 0.3 in stage 92.0 (TID 7032, 10.93.243.111, executor 0): org.apache.spark.SparkException: R computation failed with
>  Error in writeType(con, serdeType) : 
>   Unsupported type for serialization factor
> Calls: outputResult ... serializeRow -> writeList -> writeObject -> writeType
> Execution halted
> 	at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
> 	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:404)
> 	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:386)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:99)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> 	at scala.Option.foreach(Option.scala:257)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
> 	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
> 	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
> 	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2784)
> 	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2354)
> 	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2354)
> 	at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:2768)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> 	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2767)
> 	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2354)
> 	at org.apache.spark.sql.api.r.SQLUtils$.dfToCols(SQLUtils.scala:208)
> 	at org.apache.spark.sql.api.r.SQLUtils.dfToCols(SQLUtils.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:167)
> 	at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:108)
> 	at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:40)
> 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
> 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
> 	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
> 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
> 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
> 	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
> 	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
> 	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: R computation failed with
>  Error in writeType(con, serdeType) : 
>   Unsupported type for serialization factor
> Calls: outputResult ... serializeRow -> writeList -> writeObject -> writeType
> Execution halted
> 	at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
> 	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:404)
> 	at org.apache.spark.sql.execution.FlatMapGroupsInRExec$$anonfun$12.apply(objects.scala:386)
> 	at org.apache.spark.rdd.RDD$$an
> With gapplyCollect() function this work. 
> Thank you! 
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org