You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gleb Abroskin (Jira)" <ji...@apache.org> on 2022/08/26 09:01:00 UTC

[jira] [Updated] (SPARK-40230) Executor connection issue in hybrid cloud deployment

     [ https://issues.apache.org/jira/browse/SPARK-40230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gleb Abroskin updated SPARK-40230:
----------------------------------
    Environment: 
About the k8s setup:
 * 6+ nodes in AWS
 * 4 nodes in DC

Spark 3.2.1 + spark-hadoop-cloud 3.2.1
{code:java}
JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home spark-submit \
  --master k8s://https://kubemaster:6443 \
  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties" \
  --conf spark.submit.deployMode=cluster \
  --conf spark.kubernetes.namespace=ml \
  --conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
  --conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
  --conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
  --conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
  --conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
  --conf "spark.hadoop.fs.s3a.access.key=XXX" \
  --conf "spark.hadoop.fs.s3a.secret.key=XXX" \
  --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
  --conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
  --conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
  --conf spark.sql.shuffle.partitions=500 \
  --num-executors 100 \
  --driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties" \
  --name k8s-pyspark-test \
  main.py{code}
main.py is just pi.py from the examples, modified to work on 100 machines (this is a way to make sure executors are deployed in both AWS & DC)
{code:java}
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PythonPi") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("TRACE")

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
    n = 10000000 * partitions

    def f(_: int) -> float:
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n)) {code}

  was:
About the k8s setup:
 * 6+ nodes in AWS
 * 4 nodes in DC

Spark 3.2.1 + spark-hadoop-cloud 3.2.1
{code:java}
JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home spark-submit \
  --master k8s://https://ifunny-ml-kubemaster.ash1.fun.co:6443 \
  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties" \
  --conf spark.submit.deployMode=cluster \
  --conf spark.kubernetes.namespace=ml \
  --conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
  --conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
  --conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
  --conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
  --conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
  --conf "spark.hadoop.fs.s3a.access.key=XXX" \
  --conf "spark.hadoop.fs.s3a.secret.key=XXX" \
  --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
  --conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
  --conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
  --conf spark.sql.shuffle.partitions=500 \
  --num-executors 100 \
  --driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties" \
  --name k8s-pyspark-test \
  main.py{code}
main.py is just pi.py from the examples, modified to work on 100 machines (this is a way to make sure executors are deployed in both AWS & DC)
{code:java}
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PythonPi") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("TRACE")

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
    n = 10000000 * partitions

    def f(_: int) -> float:
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n)) {code}


> Executor connection issue in hybrid cloud deployment
> ----------------------------------------------------
>
>                 Key: SPARK-40230
>                 URL: https://issues.apache.org/jira/browse/SPARK-40230
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Kubernetes
>    Affects Versions: 3.2.1
>         Environment: About the k8s setup:
>  * 6+ nodes in AWS
>  * 4 nodes in DC
> Spark 3.2.1 + spark-hadoop-cloud 3.2.1
> {code:java}
> JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home spark-submit \
>   --master k8s://https://kubemaster:6443 \
>   --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties" \
>   --conf spark.submit.deployMode=cluster \
>   --conf spark.kubernetes.namespace=ml \
>   --conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
>   --conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
>   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
>   --conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
>   --conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
>   --conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
>   --conf "spark.hadoop.fs.s3a.access.key=XXX" \
>   --conf "spark.hadoop.fs.s3a.secret.key=XXX" \
>   --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
>   --conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
>   --conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
>   --conf spark.sql.shuffle.partitions=500 \
>   --num-executors 100 \
>   --driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties" \
>   --name k8s-pyspark-test \
>   main.py{code}
> main.py is just pi.py from the examples, modified to work on 100 machines (this is a way to make sure executors are deployed in both AWS & DC)
> {code:java}
> import sys
> from random import random
> from operator import add
> from pyspark.sql import SparkSession
> if __name__ == "__main__":
>     spark = SparkSession \
>         .builder \
>         .appName("PythonPi") \
>         .getOrCreate()
>     spark.sparkContext.setLogLevel("TRACE")
>     partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
>     n = 10000000 * partitions
>     def f(_: int) -> float:
>         x = random() * 2 - 1
>         y = random() * 2 - 1
>         return 1 if x ** 2 + y ** 2 <= 1 else 0
>     count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
>     print("Pi is roughly %f" % (4.0 * count / n)) {code}
>            Reporter: Gleb Abroskin
>            Priority: Major
>
> I understand that the issue is quite subtle and might be hard to debug, still I was not able to find issue with our infra, so I guess that is something inside the spark.
> We deploy spark application in k8s and everything works well, if all the driver & executor pods are either in AWS or our DC, but in case they are split between datacenters something strange happens, for example, logs of one of the executors inside the DC
> {code:java}
> 22/08/26 07:55:35 INFO TransportClientFactory: Successfully created connection to /172.19.149.92:39414 after 50 ms (1 ms spent in bootstraps)
> 22/08/26 07:55:35 TRACE TransportClient: Sending RPC to /172.19.149.92:39414
> 22/08/26 07:55:35 TRACE TransportClient: Sending request RPC 4860401977118244334 to /172.19.149.92:39414 took 3 ms
> 22/08/26 07:55:35 DEBUG TransportClient: Sending fetch chunk request 0 to /172.19.149.92:39414
> 22/08/26 07:55:35 TRACE TransportClient: Sending request StreamChunkId[streamId=1644979023003,chunkIndex=0] to /172.19.149.92:39414 took 0 ms
> 22/08/26 07:57:35 ERROR TransportChannelHandler: Connection to /172.19.149.92:39414 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.shuffle.io.connectionTimeout if this is wrong. {code}
> The executor successfully creates connection & sends the request, but the connection was assumed dead. Even stranger the executor on ip 172.19.149.92 have sent the response back, which I can confirm with following logs
> {code:java}
> 22/08/26 07:55:35 TRACE MessageDecoder: Received message ChunkFetchRequest: ChunkFetchRequest[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0]]
> 22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Received req from /172.19.123.197:37626 to fetch block StreamChunkId[streamId=1644979023003,chunkIndex=0]
> 22/08/26 07:55:35 TRACE OneForOneStreamManager: Removing stream id 1644979023003
> 22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for broadcast_0_piece0
> --
> 22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for broadcast_0_piece0
> 22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Sent result ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0],buffer=org.apache.spark.storage.BlockManagerManagedBuffer@79b43e2a] to client /172.19.123.197:37626 {code}
> A few suspicious moments here:
>  * connection to pod looks like /<IP>, while connection to driver looks like <POD_NAME>.<NAMESPACE>.svc/<IP>
>  * Task *-1024* releasing lock for broadcast_0_piece0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org