You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Alice Wong <ai...@gmail.com> on 2018/07/14 01:44:53 UTC

Submit WordCount to a Flink cluster

Hello,

I am a newbie to Beam.

Following the Beam docs, I am trying to submit the example WordCount to a
Flink cluster (one jobmanger and one taskmanager running locally in two
linked Docker containers with Maven installed).

It seems the Beam doc is a bit confusing as to how to submit jobs.

In https://beam.apache.org/get-started/quickstart-java/, it mentions I
should use

mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master>
--filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml
--output=/tmp/counts" -Pflink-runner

where <flink master> seems just a hostname.

In https://beam.apache.org/documentation/runners/flink/, it says I should
use

$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --inputFile=/path/to/pom.xml \
      --output=/path/to/counts \
      --flinkMaster=<flink master url> \
      --filesToStage=target/word-count-beam--bundled-0.1.jar"

where I can give localhost:8081 for flinkMaster.

I have tried run this command both outside Docker and in the jobmanager
container (with exec command). Either way, if I use "localhost" without
port for <flink master url>, it just runs locally and ignores flink
cluster. If I use "localhost:8081", the command hangs for about 5 seconds
and shows the following error messages. It eventually disconnects and dies.

Could you help give some hint how the Beam jobs are submitted to Flink
cluster in general? Can I do it outside jobmanager node remotely?

Thanks in advance!

--------------------------------------
Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Executing pipeline using FlinkRunner.
Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Translating pipeline to Flink program.
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkExecutionEnvironments
createBatchExecutionEnvironment
INFO: Creating a Batch Execution Environment.
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
enterCompositeTransform
INFO:  enterCompositeTransform-
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
enterCompositeTransform
INFO: |    enterCompositeTransform- ReadLines
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
visitPrimitiveTransform
INFO: |   |    visitPrimitiveTransform- ReadLines/Read
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
leaveCompositeTransform
INFO: |    leaveCompositeTransform- ReadLines
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
enterCompositeTransform
INFO: |    enterCompositeTransform- WordCount.CountWords
...
INFO: |   |    leaveCompositeTransform- WriteCounts/WriteFiles
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
leaveCompositeTransform
INFO: |    leaveCompositeTransform- WriteCounts
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
leaveCompositeTransform
INFO:  leaveCompositeTransform-
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Starting execution of Flink program.
Jul 14, 2018 12:49:49 AM org.apache.flink.api.java.ExecutionEnvironment
createProgramPlan
INFO: The job has 0 registered types and 0 default Kryo serializers
Jul 14, 2018 12:49:49 AM org.apache.beam.sdk.io.FileBasedSource
getEstimatedSizeBytes
INFO: Filepattern pom.xml matched 1 files with total size 10600
Jul 14, 2018 12:49:49 AM
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
INFO: Starting client actor system.
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils
findConnectingAddress
INFO: Trying to select the network interface and address to use by
connecting to the leading JobManager.
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils
findConnectingAddress
INFO: TaskManager will try to connect for 10000 milliseconds before falling
back to heuristics
Jul 14, 2018 12:49:49 AM
org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener
findConnectingAddress
INFO: Retrieved new target address localhost/127.0.0.1:8081.
Jul 14, 2018 12:49:49 AM
org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
INFO: Trying to start actor system at c4342d15c3f4:0
Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
applyOrElse
INFO: Slf4jLogger started
Jul 14, 2018 12:49:50 AM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Starting remoting
Jul 14, 2018 12:49:51 AM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Remoting started; listening on addresses
:[akka.tcp://flink@c4342d15c3f4:46627]
Jul 14, 2018 12:49:51 AM
org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
INFO: Actor system started at akka.tcp://flink@c4342d15c3f4:46627
Jul 14, 2018 12:49:51 AM org.apache.flink.client.program.ClusterClient
logAndSysout
INFO: Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting
for job completion.
Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for
job completion.
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor
handleMessage
INFO: Received SubmitJobAndWait(JobGraph(jobId:
87a12b5471d39a7837d6b0def6d748e2)) but there is no connection to a
JobManager yet.
Jul 14, 2018 12:49:51 AM
org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage
INFO: Received job wordcount-root-0714004948-3ee44b3d
(87a12b5471d39a7837d6b0def6d748e2).
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor
disconnectFromJobManager
INFO: Disconnect from JobManager null.
Jul 14, 2018 12:49:51 AM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2
apply$mcV$sp
WARNING: Remote connection to [localhost/127.0.0.1:8081] failed with
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 1213486164 - discarded
Jul 14, 2018 12:49:51 AM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2
apply$mcV$sp
WARNING: Association with remote system [akka.tcp://flink@localhost:8081]
has failed, address is now gated for [5000] ms. Reason: [Association failed
with [akka.tcp://flink@localhost:8081]] Caused by: [The remote system
explicitly disassociated (reason unknown).]
...
Jul 14, 2018 12:50:51 AM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Remoting shut down.
Jul 14, 2018 12:50:51 AM org.apache.beam.runners.flink.FlinkRunner run
SEVERE: Pipeline execution failed
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Couldn't retrieve the JobExecutionResult from the
JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
...

Re: Submit WordCount to a Flink cluster

Posted by Alexey Romanenko <ar...@gmail.com>.
Alice,

Totally agree with what Lukasz said. 
Also, as alternative solution for job testing, I can suggest to install Flink locally and run Beam pipeline with just CLI command like this “bin/flink run -c <main_class> /path/to/jar --runner=FlinkRunner “.

> On 18 Jul 2018, at 17:09, Lukasz Cwik <lc...@google.com> wrote:
> 
> Yes, you should be able to submit jobs to a Flink master from anywhere you have network connectivity to the Flink master.
> 
> It looks like your job is being submitted to the Flink master and we start waiting for the job to complete but something is causing the job to not complete successfully. Have you tried looking at the Flink master UI or Flink master logs?
> 
> 
> 
> On Fri, Jul 13, 2018 at 6:45 PM Alice Wong <airwaywong@gmail.com <ma...@gmail.com>> wrote:
> Hello,
> 
> I am a newbie to Beam.
> 
> Following the Beam docs, I am trying to submit the example WordCount to a Flink cluster (one jobmanger and one taskmanager running locally in two linked Docker containers with Maven installed).
> 
> It seems the Beam doc is a bit confusing as to how to submit jobs.
> 
> In https://beam.apache.org/get-started/quickstart-java/ <https://beam.apache.org/get-started/quickstart-java/>, it mentions I should use
> 
> mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
>      -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
>                   --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
> where <flink master> seems just a hostname.
> 
> In https://beam.apache.org/documentation/runners/flink/ <https://beam.apache.org/documentation/runners/flink/>, it says I should use
> 
> $ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
>     -Pflink-runner \
>     -Dexec.args="--runner=FlinkRunner \
>       --inputFile=/path/to/pom.xml \
>       --output=/path/to/counts \
>       --flinkMaster=<flink master url> \
>       --filesToStage=target/word-count-beam--bundled-0.1.jar"
> where I can give localhost:8081 for flinkMaster.
> 
> I have tried run this command both outside Docker and in the jobmanager container (with exec command). Either way, if I use "localhost" without port for <flink master url>, it just runs locally and ignores flink cluster. If I use "localhost:8081", the command hangs for about 5 seconds and shows the following error messages. It eventually disconnects and dies.
> 
> Could you help give some hint how the Beam jobs are submitted to Flink cluster in general? Can I do it outside jobmanager node remotely?
> 
> Thanks in advance!
> 
> --------------------------------------
> Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Executing pipeline using FlinkRunner.
> Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Translating pipeline to Flink program.
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkExecutionEnvironments createBatchExecutionEnvironment
> INFO: Creating a Batch Execution Environment.
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform
> INFO:  enterCompositeTransform-
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform
> INFO: |    enterCompositeTransform- ReadLines
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator visitPrimitiveTransform
> INFO: |   |    visitPrimitiveTransform- ReadLines/Read
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform
> INFO: |    leaveCompositeTransform- ReadLines
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform
> INFO: |    enterCompositeTransform- WordCount.CountWords
> ...
> INFO: |   |    leaveCompositeTransform- WriteCounts/WriteFiles
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform
> INFO: |    leaveCompositeTransform- WriteCounts
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform
> INFO:  leaveCompositeTransform-
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Starting execution of Flink program.
> Jul 14, 2018 12:49:49 AM org.apache.flink.api.java.ExecutionEnvironment createProgramPlan
> INFO: The job has 0 registered types and 0 default Kryo serializers
> Jul 14, 2018 12:49:49 AM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
> INFO: Filepattern pom.xml matched 1 files with total size 10600
> Jul 14, 2018 12:49:49 AM org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
> INFO: Starting client actor system.
> Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress
> INFO: Trying to select the network interface and address to use by connecting to the leading JobManager.
> Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress
> INFO: TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
> Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener findConnectingAddress
> INFO: Retrieved new target address localhost/127.0.0.1:8081 <http://127.0.0.1:8081/>.
> Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
> INFO: Trying to start actor system at c4342d15c3f4:0
> Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 applyOrElse
> INFO: Slf4jLogger started
> Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
> INFO: Starting remoting
> Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
> INFO: Remoting started; listening on addresses :[akka.tcp://flink@c4342d15c3f4:46627]
> Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
> INFO: Actor system started at akka.tcp://flink@c4342d15c3f4:46627
> Jul 14, 2018 12:49:51 AM org.apache.flink.client.program.ClusterClient logAndSysout
> INFO: Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for job completion.
> Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for job completion.
> Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor handleMessage
> INFO: Received SubmitJobAndWait(JobGraph(jobId: 87a12b5471d39a7837d6b0def6d748e2)) but there is no connection to a JobManager yet.
> Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage
> INFO: Received job wordcount-root-0714004948-3ee44b3d (87a12b5471d39a7837d6b0def6d748e2).
> Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor disconnectFromJobManager
> INFO: Disconnect from JobManager null.
> Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 apply$mcV$sp
> WARNING: Remote connection to [localhost/127.0.0.1:8081 <http://127.0.0.1:8081/>] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 1213486164 - discarded
> Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 apply$mcV$sp
> WARNING: Association with remote system [akka.tcp://flink@localhost:8081] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@localhost:8081]] Caused by: [The remote system explicitly disassociated (reason unknown).]
> ...
> Jul 14, 2018 12:50:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
> INFO: Remoting shut down.
> Jul 14, 2018 12:50:51 AM org.apache.beam.runners.flink.FlinkRunner run
> SEVERE: Pipeline execution failed
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
> 	at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
> 	at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
> 	at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
> ...


Re: Submit WordCount to a Flink cluster

Posted by Lukasz Cwik <lc...@google.com>.
Yes, you should be able to submit jobs to a Flink master from anywhere you
have network connectivity to the Flink master.

It looks like your job is being submitted to the Flink master and we start
waiting for the job to complete but something is causing the job to not
complete successfully. Have you tried looking at the Flink master UI or
Flink master logs?



On Fri, Jul 13, 2018 at 6:45 PM Alice Wong <ai...@gmail.com> wrote:

> Hello,
>
> I am a newbie to Beam.
>
> Following the Beam docs, I am trying to submit the example WordCount to a
> Flink cluster (one jobmanger and one taskmanager running locally in two
> linked Docker containers with Maven installed).
>
> It seems the Beam doc is a bit confusing as to how to submit jobs.
>
> In https://beam.apache.org/get-started/quickstart-java/, it mentions I
> should use
>
> mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
>      -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
>                   --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
>
> where <flink master> seems just a hostname.
>
> In https://beam.apache.org/documentation/runners/flink/, it says I should
> use
>
> $ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
>     -Pflink-runner \
>     -Dexec.args="--runner=FlinkRunner \
>       --inputFile=/path/to/pom.xml \
>       --output=/path/to/counts \
>       --flinkMaster=<flink master url> \
>       --filesToStage=target/word-count-beam--bundled-0.1.jar"
>
> where I can give localhost:8081 for flinkMaster.
>
> I have tried run this command both outside Docker and in the jobmanager
> container (with exec command). Either way, if I use "localhost" without
> port for <flink master url>, it just runs locally and ignores flink
> cluster. If I use "localhost:8081", the command hangs for about 5 seconds
> and shows the following error messages. It eventually disconnects and dies.
>
> Could you help give some hint how the Beam jobs are submitted to Flink
> cluster in general? Can I do it outside jobmanager node remotely?
>
> Thanks in advance!
>
> --------------------------------------
> Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Executing pipeline using FlinkRunner.
> Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Translating pipeline to Flink program.
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkExecutionEnvironments
> createBatchExecutionEnvironment
> INFO: Creating a Batch Execution Environment.
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> enterCompositeTransform
> INFO:  enterCompositeTransform-
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> enterCompositeTransform
> INFO: |    enterCompositeTransform- ReadLines
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> visitPrimitiveTransform
> INFO: |   |    visitPrimitiveTransform- ReadLines/Read
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> leaveCompositeTransform
> INFO: |    leaveCompositeTransform- ReadLines
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> enterCompositeTransform
> INFO: |    enterCompositeTransform- WordCount.CountWords
> ...
> INFO: |   |    leaveCompositeTransform- WriteCounts/WriteFiles
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> leaveCompositeTransform
> INFO: |    leaveCompositeTransform- WriteCounts
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> leaveCompositeTransform
> INFO:  leaveCompositeTransform-
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Starting execution of Flink program.
> Jul 14, 2018 12:49:49 AM org.apache.flink.api.java.ExecutionEnvironment
> createProgramPlan
> INFO: The job has 0 registered types and 0 default Kryo serializers
> Jul 14, 2018 12:49:49 AM org.apache.beam.sdk.io.FileBasedSource
> getEstimatedSizeBytes
> INFO: Filepattern pom.xml matched 1 files with total size 10600
> Jul 14, 2018 12:49:49 AM
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
> INFO: Starting client actor system.
> Jul 14, 2018 12:49:49 AM
> org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress
> INFO: Trying to select the network interface and address to use by
> connecting to the leading JobManager.
> Jul 14, 2018 12:49:49 AM
> org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress
> INFO: TaskManager will try to connect for 10000 milliseconds before
> falling back to heuristics
> Jul 14, 2018 12:49:49 AM
> org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener
> findConnectingAddress
> INFO: Retrieved new target address localhost/127.0.0.1:8081.
> Jul 14, 2018 12:49:49 AM
> org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
> INFO: Trying to start actor system at c4342d15c3f4:0
> Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
> applyOrElse
> INFO: Slf4jLogger started
> Jul 14, 2018 12:49:50 AM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Starting remoting
> Jul 14, 2018 12:49:51 AM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Remoting started; listening on addresses
> :[akka.tcp://flink@c4342d15c3f4:46627]
> Jul 14, 2018 12:49:51 AM
> org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
> INFO: Actor system started at akka.tcp://flink@c4342d15c3f4:46627
> Jul 14, 2018 12:49:51 AM org.apache.flink.client.program.ClusterClient
> logAndSysout
> INFO: Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting
> for job completion.
> Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for
> job completion.
> Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor
> handleMessage
> INFO: Received SubmitJobAndWait(JobGraph(jobId:
> 87a12b5471d39a7837d6b0def6d748e2)) but there is no connection to a
> JobManager yet.
> Jul 14, 2018 12:49:51 AM
> org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage
> INFO: Received job wordcount-root-0714004948-3ee44b3d
> (87a12b5471d39a7837d6b0def6d748e2).
> Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor
> disconnectFromJobManager
> INFO: Disconnect from JobManager null.
> Jul 14, 2018 12:49:51 AM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2
> apply$mcV$sp
> WARNING: Remote connection to [localhost/127.0.0.1:8081] failed with
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
> Adjusted frame length exceeds 10485760: 1213486164 - discarded
> Jul 14, 2018 12:49:51 AM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2
> apply$mcV$sp
> WARNING: Association with remote system [akka.tcp://flink@localhost:8081]
> has failed, address is now gated for [5000] ms. Reason: [Association failed
> with [akka.tcp://flink@localhost:8081]] Caused by: [The remote system
> explicitly disassociated (reason unknown).]
> ...
> Jul 14, 2018 12:50:51 AM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Remoting shut down.
> Jul 14, 2018 12:50:51 AM org.apache.beam.runners.flink.FlinkRunner run
> SEVERE: Pipeline execution failed
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Couldn't retrieve the JobExecutionResult from the
> JobManager.
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
> at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
> at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
> ...
>