You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lantao Jin (JIRA)" <ji...@apache.org> on 2018/01/28 06:16:00 UTC

[jira] [Comment Edited] (SPARK-23187) Accumulator object can not be sent from Executor to Driver

    [ https://issues.apache.org/jira/browse/SPARK-23187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16342463#comment-16342463 ] 

Lantao Jin edited comment on SPARK-23187 at 1/28/18 6:15 AM:
-------------------------------------------------------------

[~jerryshao]
{quote}
So I guess it is because UI doesn't display your registered accumulator in time
{quote}
I don't think so. I hard code some logs in driver instead of checking UI.
Please check the value of accumUpdates in method executorHeartbeatReceived() of TaskSchedulerImpl. The value set in reportHeartBeat() can't be received here.


was (Author: cltlfcjin):
[~jerryshao]
{quota}
So I guess it is because UI doesn't display your registered accumulator in time
{quota}
I don't think so. I hard code some logs in driver instead of checking UI.
Please check the value of accumUpdates in method executorHeartbeatReceived() of TaskSchedulerImpl. The value set in reportHeartBeat() can't be received here.

> Accumulator object can not be sent from Executor to Driver
> ----------------------------------------------------------
>
>                 Key: SPARK-23187
>                 URL: https://issues.apache.org/jira/browse/SPARK-23187
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.1, 2.3.0
>            Reporter: Lantao Jin
>            Priority: Major
>
> In the Executor.scala->reportHeartBeat(), task Metrics value can not be sent to Driver (In receive side all values are zero).
> I write an UT for explanation.
> {code}
> diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> index f9481f8..57fb096 100644
> --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> @@ -17,11 +17,16 @@
>  package org.apache.spark.rpc.netty
> +import scala.collection.mutable.ArrayBuffer
> +
>  import org.scalatest.mockito.MockitoSugar
>  import org.apache.spark._
>  import org.apache.spark.network.client.TransportClient
>  import org.apache.spark.rpc._
> +import org.apache.spark.util.AccumulatorContext
> +import org.apache.spark.util.AccumulatorV2
> +import org.apache.spark.util.LongAccumulator
>  class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
> @@ -83,5 +88,21 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
>      assertRequestMessageEquals(
>        msg3,
>        RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
> +
> +    val acc = new LongAccumulator
> +    val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local").setAppName("testAcc"));
> +    sc.register(acc, "testAcc")
> +    acc.setValue(11111)
> +//    val msg4 = new RequestMessage(senderAddress, receiver, acc)
> +//    assertRequestMessageEquals(
> +//      msg4,
> +//      RequestMessage(nettyEnv, client, msg4.serialize(nettyEnv)))
> +
> +    val accbuf = new ArrayBuffer[AccumulatorV2[_, _]]()
> +    accbuf += acc
> +    val msg5 = new RequestMessage(senderAddress, receiver, accbuf)
> +    assertRequestMessageEquals(
> +      msg5,
> +      RequestMessage(nettyEnv, client, msg5.serialize(nettyEnv)))
>    }
>  }
> {code}
> msg4 and msg5 are all going to failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org