You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lantao Jin (JIRA)" <ji...@apache.org> on 2018/01/23 06:56:00 UTC
[jira] [Created] (SPARK-23187) Accumulator object can not be sent
from Executor to Driver
Lantao Jin created SPARK-23187:
----------------------------------
Summary: Accumulator object can not be sent from Executor to Driver
Key: SPARK-23187
URL: https://issues.apache.org/jira/browse/SPARK-23187
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 2.2.1
Reporter: Lantao Jin
In the Executor.scala->reportHeartBeat(), task Metrics value can not be sent to Driver (In receive side all values are zero).
I write an UT for explanation.
{code}
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index f9481f8..57fb096 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -17,11 +17,16 @@
package org.apache.spark.rpc.netty
+import scala.collection.mutable.ArrayBuffer
+
import org.scalatest.mockito.MockitoSugar
import org.apache.spark._
import org.apache.spark.network.client.TransportClient
import org.apache.spark.rpc._
+import org.apache.spark.util.AccumulatorContext
+import org.apache.spark.util.AccumulatorV2
+import org.apache.spark.util.LongAccumulator
class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
@@ -83,5 +88,21 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
assertRequestMessageEquals(
msg3,
RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
+
+ val acc = new LongAccumulator
+ val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local").setAppName("testAcc"));
+ sc.register(acc, "testAcc")
+ acc.setValue(11111)
+// val msg4 = new RequestMessage(senderAddress, receiver, acc)
+// assertRequestMessageEquals(
+// msg4,
+// RequestMessage(nettyEnv, client, msg4.serialize(nettyEnv)))
+
+ val accbuf = new ArrayBuffer[AccumulatorV2[_, _]]()
+ accbuf += acc
+ val msg5 = new RequestMessage(senderAddress, receiver, accbuf)
+ assertRequestMessageEquals(
+ msg5,
+ RequestMessage(nettyEnv, client, msg5.serialize(nettyEnv)))
}
}
{code}
msg4 and msg5 are all going to failed.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org