You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lantao Jin (JIRA)" <ji...@apache.org> on 2018/01/23 06:56:00 UTC

[jira] [Created] (SPARK-23187) Accumulator object can not be sent from Executor to Driver

Lantao Jin created SPARK-23187:
----------------------------------

             Summary: Accumulator object can not be sent from Executor to Driver
                 Key: SPARK-23187
                 URL: https://issues.apache.org/jira/browse/SPARK-23187
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.2.1
            Reporter: Lantao Jin


In the Executor.scala->reportHeartBeat(), task Metrics value can not be sent to Driver (In receive side all values are zero).

I write an UT for explanation.
{code}
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index f9481f8..57fb096 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -17,11 +17,16 @@

 package org.apache.spark.rpc.netty

+import scala.collection.mutable.ArrayBuffer
+
 import org.scalatest.mockito.MockitoSugar

 import org.apache.spark._
 import org.apache.spark.network.client.TransportClient
 import org.apache.spark.rpc._
+import org.apache.spark.util.AccumulatorContext
+import org.apache.spark.util.AccumulatorV2
+import org.apache.spark.util.LongAccumulator

 class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {

@@ -83,5 +88,21 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
     assertRequestMessageEquals(
       msg3,
       RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
+
+    val acc = new LongAccumulator
+    val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local").setAppName("testAcc"));
+    sc.register(acc, "testAcc")
+    acc.setValue(11111)
+//    val msg4 = new RequestMessage(senderAddress, receiver, acc)
+//    assertRequestMessageEquals(
+//      msg4,
+//      RequestMessage(nettyEnv, client, msg4.serialize(nettyEnv)))
+
+    val accbuf = new ArrayBuffer[AccumulatorV2[_, _]]()
+    accbuf += acc
+    val msg5 = new RequestMessage(senderAddress, receiver, accbuf)
+    assertRequestMessageEquals(
+      msg5,
+      RequestMessage(nettyEnv, client, msg5.serialize(nettyEnv)))
   }
 }
{code}

msg4 and msg5 are all going to failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org