You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2017/10/09 08:51:37 UTC

PartitionNotFoundException when running in yarn-session.

Hi,

I'm having some trouble running a java based Flink job in a yarn-session.

The job itself consists of reading a set of files resulting in a DataStream
(I use DataStream because in the future I intend to change the file with a
Kafka feed), then does some parsing and eventually writes the data into
HBase.

Most of the time running this works fine yet sometimes it fails with this
exception:

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
not found.
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
	at akka.dispatch.OnComplete.internal(Future.scala:248)
	at akka.dispatch.OnComplete.internal(Future.scala:245)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I went through all logs at the Hadoop side of all the related containers
and other than this exception I did not see any warning/error that might
explain what is going on here.

Now the "Most of the time running this works fine" makes this hard to
troubleshoot. When I run the same job again it may run perfectly that time.

I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked my
pom.xml and I use the same version for Flink / Scala in there.

The command used to start the yarn-session on my experimental cluster (no
security, no other users):

/usr/local/flink-1.3.2/bin/yarn-session.sh \
    --container 180 \
    --name "Flink on Yarn Experiments" \
    --slots                     1     \
    --jobManagerMemory          4000  \
    --taskManagerMemory         4000  \
    --streaming                       \
    --detached

Two relevant fragments from my application pom.xml:

<flink.version>1.3.2</flink.version>
<flink.scala.version>2.11</flink.scala.version>



<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_${flink.scala.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-hbase_${flink.scala.version}</artifactId>
  <version>${flink.version}</version>
</dependency>


I could really use some suggestions where to look for the root cause of
this.
Is this something in my application? My Hadoop cluster? Or is this a
problem in Flink 1.3.2?

Thanks.

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: PartitionNotFoundException when running in yarn-session.

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi

I did some tests and it turns out I was really overloading the cluster
which caused the problems.
I tried the timeout setting but that didn't help. Simply 'not overloading'
the system did help.

Thanks.

Niels


On Thu, Oct 12, 2017 at 10:42 AM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Niels,
>
> Flink currently restarts the complete job if you have a restart
> strategy configured:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/restart_
> strategies.html.
>
> I agree that only restarting the required parts of the pipeline is an
> important optimization. Flink has not implemented this (fully) yet but
> it's on the agenda [1] and work has already started [2].
>
> In this particular case, everything is just slow and we don't need the
> restart at all if you give the consumer a higher max timeout.
>
> Please report back when you have more info :-)
>
> – Ufuk
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 1+%3A+Fine+Grained+Recovery+from+Task+Failures
>
> [2] https://issues.apache.org/jira/browse/FLINK-4256
>
> On Thu, Oct 12, 2017 at 10:17 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> > Hi,
> >
> > I'm currently doing some tests to see it this info helps.
> > I was running a different high CPU task on one of the nodes outside
> Yarn, so
> > I took that one out of the cluster to see if that helps.
> >
> > What I do find strange that in this kind of error scenario the entire job
> > fails.
> > I would have expected something similar as with 'good old' MapReduce: The
> > missing task is simply resubmitted and ran again.
> > Why doesn't that happen?
> >
> >
> > Niels
> >
> > On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi <uc...@apache.org> wrote:
> >>
> >> Hey Niels,
> >>
> >> any update on this?
> >>
> >> – Ufuk
> >>
> >>
> >> On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <uc...@apache.org> wrote:
> >> > Hey Niels,
> >> >
> >> > thanks for the detailed report. I don't think that it is related to
> >> > the Hadoop or Scala version. I think the following happens:
> >> >
> >> > - Occasionally, one of your tasks seems to be extremely slow in
> >> > registering its produced intermediate result (the data shuffled
> >> > between TaskManagers)
> >> > - Another task is already requesting to consume data from this task
> >> > but cannot find it (after multiple retries) and it fails the complete
> >> > job (your stack trace)
> >> >
> >> > That happens only occasionally probably due to load in your cluster.
> >> > The slow down could have multiple reasons...
> >> > - Is your Hadoop cluster resource constrained and the tasks are slow
> to
> >> > deploy?
> >> > - Is your application JAR very large and needs a lot of time
> >> > downloading?
> >> >
> >> > We have two options at this point:
> >> > 1) You can increase the maximum retries via the config option:
> >> > "taskmanager.network.request-backoff.max" The default is 10000
> >> > (milliseconds) and specifies what the maximum request back off is [1].
> >> > Increasing this to 30000 would give you two extra retries with pretty
> >> > long delays (see [1]).
> >> >
> >> > 2) To be sure that this is really what is happening we could increase
> >> > the log level of certain classes and check whether they have
> >> > registered their results or not. If you want to do this, I'm more than
> >> > happy to provide you with some classes to enable DEBUG logging for.
> >> >
> >> > What do you think?
> >> >
> >> > – Ufuk
> >> >
> >> > DETAILS
> >> > =======
> >> >
> >> > - The TaskManagers produce and consume intermediate results
> >> > - When a TaskManager wants to consume a result, it directly queries
> >> > the producing TaskManager for it
> >> > - An intermediate result becomes ready for consumption during initial
> >> > task setup (state DEPLOYING)
> >> > - When a TaskManager is slow to register its intermediate result and
> >> > the consumer requests the result before it is ready, it can happen
> >> > that a requested partition is "not found"
> >> >
> >> > This is what is also happening here. We retry to request the
> >> > intermediate result multiple times with timed backoff [1] and only
> >> > fail the request (your stack trace) if the partition is still not
> >> > ready although we expect it to be ready (that is there was no failure
> >> > at the producing task).
> >> >
> >> > [1] Starting by default at 100 millis and going up to 10_000 millis by
> >> > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000)
> >> >
> >> >
> >> > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <Ni...@basjes.nl>
> wrote:
> >> >> Hi,
> >> >>
> >> >> I'm having some trouble running a java based Flink job in a
> >> >> yarn-session.
> >> >>
> >> >> The job itself consists of reading a set of files resulting in a
> >> >> DataStream
> >> >> (I use DataStream because in the future I intend to change the file
> >> >> with a
> >> >> Kafka feed), then does some parsing and eventually writes the data
> into
> >> >> HBase.
> >> >>
> >> >> Most of the time running this works fine yet sometimes it fails with
> >> >> this
> >> >> exception:
> >> >>
> >> >>
> >> >> org.apache.flink.runtime.io.network.partition.
> PartitionNotFoundException:
> >> >> Partition
> >> >> 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
> >> >> not found.
> >> >>       at
> >> >>
> >> >> org.apache.flink.runtime.io.network.partition.consumer.
> RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
> >> >>       at
> >> >>
> >> >> org.apache.flink.runtime.io.network.partition.consumer.
> RemoteInputChannel.retriggerSubpartitionRequest(
> RemoteInputChannel.java:128)
> >> >>       at
> >> >>
> >> >> org.apache.flink.runtime.io.network.partition.consumer.
> SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
> >> >>       at
> >> >>
> >> >> org.apache.flink.runtime.taskmanager.Task.
> onPartitionStateUpdate(Task.java:1286)
> >> >>       at
> >> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
> >> >>       at
> >> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
> >> >>       at
> >> >>
> >> >> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.
> onComplete(FlinkFuture.java:272)
> >> >>       at akka.dispatch.OnComplete.internal(Future.scala:248)
> >> >>       at akka.dispatch.OnComplete.internal(Future.scala:245)
> >> >>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
> >> >>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
> >> >>       at scala.concurrent.impl.CallbackRunnable.run(Promise.
> scala:32)
> >> >>       at
> >> >>
> >> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
> BatchingExecutor.scala:55)
> >> >>       at
> >> >>
> >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
> apply$mcV$sp(BatchingExecutor.scala:91)
> >> >>       at
> >> >>
> >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
> apply(BatchingExecutor.scala:91)
> >> >>       at
> >> >>
> >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
> apply(BatchingExecutor.scala:91)
> >> >>       at
> >> >> scala.concurrent.BlockContext$.withBlockContext(
> BlockContext.scala:72)
> >> >>       at
> >> >>
> >> >> akka.dispatch.BatchingExecutor$BlockableBatch.run(
> BatchingExecutor.scala:90)
> >> >>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:
> 40)
> >> >>       at
> >> >>
> >> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
> >> >>       at
> >> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >>       at
> >> >>
> >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> >> >>       at
> >> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> >> >>       at
> >> >>
> >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> >> >>
> >> >> I went through all logs at the Hadoop side of all the related
> >> >> containers and
> >> >> other than this exception I did not see any warning/error that might
> >> >> explain
> >> >> what is going on here.
> >> >>
> >> >> Now the "Most of the time running this works fine" makes this hard to
> >> >> troubleshoot. When I run the same job again it may run perfectly that
> >> >> time.
> >> >>
> >> >> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double
> checked
> >> >> my
> >> >> pom.xml and I use the same version for Flink / Scala in there.
> >> >>
> >> >> The command used to start the yarn-session on my experimental cluster
> >> >> (no
> >> >> security, no other users):
> >> >>
> >> >> /usr/local/flink-1.3.2/bin/yarn-session.sh \
> >> >>     --container 180 \
> >> >>     --name "Flink on Yarn Experiments" \
> >> >>     --slots                     1     \
> >> >>     --jobManagerMemory          4000  \
> >> >>     --taskManagerMemory         4000  \
> >> >>     --streaming                       \
> >> >>     --detached
> >> >>
> >> >> Two relevant fragments from my application pom.xml:
> >> >>
> >> >> <flink.version>1.3.2</flink.version>
> >> >> <flink.scala.version>2.11</flink.scala.version>
> >> >>
> >> >>
> >> >>
> >> >> <dependency>
> >> >>   <groupId>org.apache.flink</groupId>
> >> >>   <artifactId>flink-java</artifactId>
> >> >>   <version>${flink.version}</version>
> >> >> </dependency>
> >> >>
> >> >> <dependency>
> >> >>   <groupId>org.apache.flink</groupId>
> >> >>   <artifactId>flink-streaming-java_${flink.scala.version}</
> artifactId>
> >> >>   <version>${flink.version}</version>
> >> >> </dependency>
> >> >>
> >> >> <dependency>
> >> >>   <groupId>org.apache.flink</groupId>
> >> >>   <artifactId>flink-clients_${flink.scala.version}</artifactId>
> >> >>   <version>${flink.version}</version>
> >> >> </dependency>
> >> >>
> >> >> <dependency>
> >> >>   <groupId>org.apache.flink</groupId>
> >> >>   <artifactId>flink-hbase_${flink.scala.version}</artifactId>
> >> >>   <version>${flink.version}</version>
> >> >> </dependency>
> >> >>
> >> >>
> >> >> I could really use some suggestions where to look for the root cause
> of
> >> >> this.
> >> >> Is this something in my application? My Hadoop cluster? Or is this a
> >> >> problem
> >> >> in Flink 1.3.2?
> >> >>
> >> >> Thanks.
> >> >>
> >> >> --
> >> >> Best regards / Met vriendelijke groeten,
> >> >>
> >> >> Niels Basjes
> >
> >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>



-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: PartitionNotFoundException when running in yarn-session.

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Niels,

Flink currently restarts the complete job if you have a restart
strategy configured:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/restart_strategies.html.

I agree that only restarting the required parts of the pipeline is an
important optimization. Flink has not implemented this (fully) yet but
it's on the agenda [1] and work has already started [2].

In this particular case, everything is just slow and we don't need the
restart at all if you give the consumer a higher max timeout.

Please report back when you have more info :-)

– Ufuk

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures

[2] https://issues.apache.org/jira/browse/FLINK-4256

On Thu, Oct 12, 2017 at 10:17 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> Hi,
>
> I'm currently doing some tests to see it this info helps.
> I was running a different high CPU task on one of the nodes outside Yarn, so
> I took that one out of the cluster to see if that helps.
>
> What I do find strange that in this kind of error scenario the entire job
> fails.
> I would have expected something similar as with 'good old' MapReduce: The
> missing task is simply resubmitted and ran again.
> Why doesn't that happen?
>
>
> Niels
>
> On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi <uc...@apache.org> wrote:
>>
>> Hey Niels,
>>
>> any update on this?
>>
>> – Ufuk
>>
>>
>> On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <uc...@apache.org> wrote:
>> > Hey Niels,
>> >
>> > thanks for the detailed report. I don't think that it is related to
>> > the Hadoop or Scala version. I think the following happens:
>> >
>> > - Occasionally, one of your tasks seems to be extremely slow in
>> > registering its produced intermediate result (the data shuffled
>> > between TaskManagers)
>> > - Another task is already requesting to consume data from this task
>> > but cannot find it (after multiple retries) and it fails the complete
>> > job (your stack trace)
>> >
>> > That happens only occasionally probably due to load in your cluster.
>> > The slow down could have multiple reasons...
>> > - Is your Hadoop cluster resource constrained and the tasks are slow to
>> > deploy?
>> > - Is your application JAR very large and needs a lot of time
>> > downloading?
>> >
>> > We have two options at this point:
>> > 1) You can increase the maximum retries via the config option:
>> > "taskmanager.network.request-backoff.max" The default is 10000
>> > (milliseconds) and specifies what the maximum request back off is [1].
>> > Increasing this to 30000 would give you two extra retries with pretty
>> > long delays (see [1]).
>> >
>> > 2) To be sure that this is really what is happening we could increase
>> > the log level of certain classes and check whether they have
>> > registered their results or not. If you want to do this, I'm more than
>> > happy to provide you with some classes to enable DEBUG logging for.
>> >
>> > What do you think?
>> >
>> > – Ufuk
>> >
>> > DETAILS
>> > =======
>> >
>> > - The TaskManagers produce and consume intermediate results
>> > - When a TaskManager wants to consume a result, it directly queries
>> > the producing TaskManager for it
>> > - An intermediate result becomes ready for consumption during initial
>> > task setup (state DEPLOYING)
>> > - When a TaskManager is slow to register its intermediate result and
>> > the consumer requests the result before it is ready, it can happen
>> > that a requested partition is "not found"
>> >
>> > This is what is also happening here. We retry to request the
>> > intermediate result multiple times with timed backoff [1] and only
>> > fail the request (your stack trace) if the partition is still not
>> > ready although we expect it to be ready (that is there was no failure
>> > at the producing task).
>> >
>> > [1] Starting by default at 100 millis and going up to 10_000 millis by
>> > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000)
>> >
>> >
>> > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>> >> Hi,
>> >>
>> >> I'm having some trouble running a java based Flink job in a
>> >> yarn-session.
>> >>
>> >> The job itself consists of reading a set of files resulting in a
>> >> DataStream
>> >> (I use DataStream because in the future I intend to change the file
>> >> with a
>> >> Kafka feed), then does some parsing and eventually writes the data into
>> >> HBase.
>> >>
>> >> Most of the time running this works fine yet sometimes it fails with
>> >> this
>> >> exception:
>> >>
>> >>
>> >> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
>> >> Partition
>> >> 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
>> >> not found.
>> >>       at
>> >>
>> >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>> >>       at
>> >>
>> >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>> >>       at
>> >>
>> >> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>> >>       at
>> >>
>> >> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>> >>       at
>> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>> >>       at
>> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>> >>       at
>> >>
>> >> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>> >>       at akka.dispatch.OnComplete.internal(Future.scala:248)
>> >>       at akka.dispatch.OnComplete.internal(Future.scala:245)
>> >>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>> >>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>> >>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> >>       at
>> >>
>> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> >>       at
>> >>
>> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> >>       at
>> >>
>> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> >>       at
>> >>
>> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> >>       at
>> >> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> >>       at
>> >>
>> >> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> >>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> >>       at
>> >>
>> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> >>       at
>> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>       at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>       at
>> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>       at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>
>> >> I went through all logs at the Hadoop side of all the related
>> >> containers and
>> >> other than this exception I did not see any warning/error that might
>> >> explain
>> >> what is going on here.
>> >>
>> >> Now the "Most of the time running this works fine" makes this hard to
>> >> troubleshoot. When I run the same job again it may run perfectly that
>> >> time.
>> >>
>> >> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked
>> >> my
>> >> pom.xml and I use the same version for Flink / Scala in there.
>> >>
>> >> The command used to start the yarn-session on my experimental cluster
>> >> (no
>> >> security, no other users):
>> >>
>> >> /usr/local/flink-1.3.2/bin/yarn-session.sh \
>> >>     --container 180 \
>> >>     --name "Flink on Yarn Experiments" \
>> >>     --slots                     1     \
>> >>     --jobManagerMemory          4000  \
>> >>     --taskManagerMemory         4000  \
>> >>     --streaming                       \
>> >>     --detached
>> >>
>> >> Two relevant fragments from my application pom.xml:
>> >>
>> >> <flink.version>1.3.2</flink.version>
>> >> <flink.scala.version>2.11</flink.scala.version>
>> >>
>> >>
>> >>
>> >> <dependency>
>> >>   <groupId>org.apache.flink</groupId>
>> >>   <artifactId>flink-java</artifactId>
>> >>   <version>${flink.version}</version>
>> >> </dependency>
>> >>
>> >> <dependency>
>> >>   <groupId>org.apache.flink</groupId>
>> >>   <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
>> >>   <version>${flink.version}</version>
>> >> </dependency>
>> >>
>> >> <dependency>
>> >>   <groupId>org.apache.flink</groupId>
>> >>   <artifactId>flink-clients_${flink.scala.version}</artifactId>
>> >>   <version>${flink.version}</version>
>> >> </dependency>
>> >>
>> >> <dependency>
>> >>   <groupId>org.apache.flink</groupId>
>> >>   <artifactId>flink-hbase_${flink.scala.version}</artifactId>
>> >>   <version>${flink.version}</version>
>> >> </dependency>
>> >>
>> >>
>> >> I could really use some suggestions where to look for the root cause of
>> >> this.
>> >> Is this something in my application? My Hadoop cluster? Or is this a
>> >> problem
>> >> in Flink 1.3.2?
>> >>
>> >> Thanks.
>> >>
>> >> --
>> >> Best regards / Met vriendelijke groeten,
>> >>
>> >> Niels Basjes
>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes

Re: PartitionNotFoundException when running in yarn-session.

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi,

I'm currently doing some tests to see it this info helps.
I was running a different high CPU task on one of the nodes outside Yarn,
so I took that one out of the cluster to see if that helps.

What I do find strange that in this kind of error scenario the entire job
fails.
I would have expected something similar as with 'good old' MapReduce: The
missing task is simply resubmitted and ran again.
Why doesn't that happen?


Niels

On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Niels,
>
> any update on this?
>
> – Ufuk
>
>
> On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <uc...@apache.org> wrote:
> > Hey Niels,
> >
> > thanks for the detailed report. I don't think that it is related to
> > the Hadoop or Scala version. I think the following happens:
> >
> > - Occasionally, one of your tasks seems to be extremely slow in
> > registering its produced intermediate result (the data shuffled
> > between TaskManagers)
> > - Another task is already requesting to consume data from this task
> > but cannot find it (after multiple retries) and it fails the complete
> > job (your stack trace)
> >
> > That happens only occasionally probably due to load in your cluster.
> > The slow down could have multiple reasons...
> > - Is your Hadoop cluster resource constrained and the tasks are slow to
> deploy?
> > - Is your application JAR very large and needs a lot of time downloading?
> >
> > We have two options at this point:
> > 1) You can increase the maximum retries via the config option:
> > "taskmanager.network.request-backoff.max" The default is 10000
> > (milliseconds) and specifies what the maximum request back off is [1].
> > Increasing this to 30000 would give you two extra retries with pretty
> > long delays (see [1]).
> >
> > 2) To be sure that this is really what is happening we could increase
> > the log level of certain classes and check whether they have
> > registered their results or not. If you want to do this, I'm more than
> > happy to provide you with some classes to enable DEBUG logging for.
> >
> > What do you think?
> >
> > – Ufuk
> >
> > DETAILS
> > =======
> >
> > - The TaskManagers produce and consume intermediate results
> > - When a TaskManager wants to consume a result, it directly queries
> > the producing TaskManager for it
> > - An intermediate result becomes ready for consumption during initial
> > task setup (state DEPLOYING)
> > - When a TaskManager is slow to register its intermediate result and
> > the consumer requests the result before it is ready, it can happen
> > that a requested partition is "not found"
> >
> > This is what is also happening here. We retry to request the
> > intermediate result multiple times with timed backoff [1] and only
> > fail the request (your stack trace) if the partition is still not
> > ready although we expect it to be ready (that is there was no failure
> > at the producing task).
> >
> > [1] Starting by default at 100 millis and going up to 10_000 millis by
> > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000)
> >
> >
> > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> >> Hi,
> >>
> >> I'm having some trouble running a java based Flink job in a
> yarn-session.
> >>
> >> The job itself consists of reading a set of files resulting in a
> DataStream
> >> (I use DataStream because in the future I intend to change the file
> with a
> >> Kafka feed), then does some parsing and eventually writes the data into
> >> HBase.
> >>
> >> Most of the time running this works fine yet sometimes it fails with
> this
> >> exception:
> >>
> >> org.apache.flink.runtime.io.network.partition.
> PartitionNotFoundException:
> >> Partition 794b5ce385c296b7943fa4c1f072d6b9@
> 13aa7ef02a5d9e0898204ec8ce283363
> >> not found.
> >>       at
> >> org.apache.flink.runtime.io.network.partition.consumer.
> RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
> >>       at
> >> org.apache.flink.runtime.io.network.partition.consumer.
> RemoteInputChannel.retriggerSubpartitionRequest(
> RemoteInputChannel.java:128)
> >>       at
> >> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.
> retriggerPartitionRequest(SingleInputGate.java:345)
> >>       at
> >> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.
> java:1286)
> >>       at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.
> java:1123)
> >>       at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.
> java:1118)
> >>       at
> >> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.
> onComplete(FlinkFuture.java:272)
> >>       at akka.dispatch.OnComplete.internal(Future.scala:248)
> >>       at akka.dispatch.OnComplete.internal(Future.scala:245)
> >>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
> >>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
> >>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> >>       at
> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
> BatchingExecutor.scala:55)
> >>       at
> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
> apply$mcV$sp(BatchingExecutor.scala:91)
> >>       at
> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
> apply(BatchingExecutor.scala:91)
> >>       at
> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
> apply(BatchingExecutor.scala:91)
> >>       at scala.concurrent.BlockContext$.withBlockContext(
> BlockContext.scala:72)
> >>       at
> >> akka.dispatch.BatchingExecutor$BlockableBatch.run(
> BatchingExecutor.scala:90)
> >>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >>       at
> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
> >>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
> >>       at
> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> >>       at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> >>       at
> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> >>
> >> I went through all logs at the Hadoop side of all the related
> containers and
> >> other than this exception I did not see any warning/error that might
> explain
> >> what is going on here.
> >>
> >> Now the "Most of the time running this works fine" makes this hard to
> >> troubleshoot. When I run the same job again it may run perfectly that
> time.
> >>
> >> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked
> my
> >> pom.xml and I use the same version for Flink / Scala in there.
> >>
> >> The command used to start the yarn-session on my experimental cluster
> (no
> >> security, no other users):
> >>
> >> /usr/local/flink-1.3.2/bin/yarn-session.sh \
> >>     --container 180 \
> >>     --name "Flink on Yarn Experiments" \
> >>     --slots                     1     \
> >>     --jobManagerMemory          4000  \
> >>     --taskManagerMemory         4000  \
> >>     --streaming                       \
> >>     --detached
> >>
> >> Two relevant fragments from my application pom.xml:
> >>
> >> <flink.version>1.3.2</flink.version>
> >> <flink.scala.version>2.11</flink.scala.version>
> >>
> >>
> >>
> >> <dependency>
> >>   <groupId>org.apache.flink</groupId>
> >>   <artifactId>flink-java</artifactId>
> >>   <version>${flink.version}</version>
> >> </dependency>
> >>
> >> <dependency>
> >>   <groupId>org.apache.flink</groupId>
> >>   <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
> >>   <version>${flink.version}</version>
> >> </dependency>
> >>
> >> <dependency>
> >>   <groupId>org.apache.flink</groupId>
> >>   <artifactId>flink-clients_${flink.scala.version}</artifactId>
> >>   <version>${flink.version}</version>
> >> </dependency>
> >>
> >> <dependency>
> >>   <groupId>org.apache.flink</groupId>
> >>   <artifactId>flink-hbase_${flink.scala.version}</artifactId>
> >>   <version>${flink.version}</version>
> >> </dependency>
> >>
> >>
> >> I could really use some suggestions where to look for the root cause of
> >> this.
> >> Is this something in my application? My Hadoop cluster? Or is this a
> problem
> >> in Flink 1.3.2?
> >>
> >> Thanks.
> >>
> >> --
> >> Best regards / Met vriendelijke groeten,
> >>
> >> Niels Basjes
>



-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: PartitionNotFoundException when running in yarn-session.

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Niels,

any update on this?

– Ufuk


On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <uc...@apache.org> wrote:
> Hey Niels,
>
> thanks for the detailed report. I don't think that it is related to
> the Hadoop or Scala version. I think the following happens:
>
> - Occasionally, one of your tasks seems to be extremely slow in
> registering its produced intermediate result (the data shuffled
> between TaskManagers)
> - Another task is already requesting to consume data from this task
> but cannot find it (after multiple retries) and it fails the complete
> job (your stack trace)
>
> That happens only occasionally probably due to load in your cluster.
> The slow down could have multiple reasons...
> - Is your Hadoop cluster resource constrained and the tasks are slow to deploy?
> - Is your application JAR very large and needs a lot of time downloading?
>
> We have two options at this point:
> 1) You can increase the maximum retries via the config option:
> "taskmanager.network.request-backoff.max" The default is 10000
> (milliseconds) and specifies what the maximum request back off is [1].
> Increasing this to 30000 would give you two extra retries with pretty
> long delays (see [1]).
>
> 2) To be sure that this is really what is happening we could increase
> the log level of certain classes and check whether they have
> registered their results or not. If you want to do this, I'm more than
> happy to provide you with some classes to enable DEBUG logging for.
>
> What do you think?
>
> – Ufuk
>
> DETAILS
> =======
>
> - The TaskManagers produce and consume intermediate results
> - When a TaskManager wants to consume a result, it directly queries
> the producing TaskManager for it
> - An intermediate result becomes ready for consumption during initial
> task setup (state DEPLOYING)
> - When a TaskManager is slow to register its intermediate result and
> the consumer requests the result before it is ready, it can happen
> that a requested partition is "not found"
>
> This is what is also happening here. We retry to request the
> intermediate result multiple times with timed backoff [1] and only
> fail the request (your stack trace) if the partition is still not
> ready although we expect it to be ready (that is there was no failure
> at the producing task).
>
> [1] Starting by default at 100 millis and going up to 10_000 millis by
> doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000)
>
>
> On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>> Hi,
>>
>> I'm having some trouble running a java based Flink job in a yarn-session.
>>
>> The job itself consists of reading a set of files resulting in a DataStream
>> (I use DataStream because in the future I intend to change the file with a
>> Kafka feed), then does some parsing and eventually writes the data into
>> HBase.
>>
>> Most of the time running this works fine yet sometimes it fails with this
>> exception:
>>
>> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
>> Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
>> not found.
>>       at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>>       at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>>       at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>>       at
>> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>>       at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>>       at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>>       at
>> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>>       at akka.dispatch.OnComplete.internal(Future.scala:248)
>>       at akka.dispatch.OnComplete.internal(Future.scala:245)
>>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>       at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>       at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>       at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>       at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>       at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>       at
>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>       at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>       at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>       at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>       at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> I went through all logs at the Hadoop side of all the related containers and
>> other than this exception I did not see any warning/error that might explain
>> what is going on here.
>>
>> Now the "Most of the time running this works fine" makes this hard to
>> troubleshoot. When I run the same job again it may run perfectly that time.
>>
>> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked my
>> pom.xml and I use the same version for Flink / Scala in there.
>>
>> The command used to start the yarn-session on my experimental cluster (no
>> security, no other users):
>>
>> /usr/local/flink-1.3.2/bin/yarn-session.sh \
>>     --container 180 \
>>     --name "Flink on Yarn Experiments" \
>>     --slots                     1     \
>>     --jobManagerMemory          4000  \
>>     --taskManagerMemory         4000  \
>>     --streaming                       \
>>     --detached
>>
>> Two relevant fragments from my application pom.xml:
>>
>> <flink.version>1.3.2</flink.version>
>> <flink.scala.version>2.11</flink.scala.version>
>>
>>
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-java</artifactId>
>>   <version>${flink.version}</version>
>> </dependency>
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
>>   <version>${flink.version}</version>
>> </dependency>
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-clients_${flink.scala.version}</artifactId>
>>   <version>${flink.version}</version>
>> </dependency>
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-hbase_${flink.scala.version}</artifactId>
>>   <version>${flink.version}</version>
>> </dependency>
>>
>>
>> I could really use some suggestions where to look for the root cause of
>> this.
>> Is this something in my application? My Hadoop cluster? Or is this a problem
>> in Flink 1.3.2?
>>
>> Thanks.
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes

Re: PartitionNotFoundException when running in yarn-session.

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Niels,

thanks for the detailed report. I don't think that it is related to
the Hadoop or Scala version. I think the following happens:

- Occasionally, one of your tasks seems to be extremely slow in
registering its produced intermediate result (the data shuffled
between TaskManagers)
- Another task is already requesting to consume data from this task
but cannot find it (after multiple retries) and it fails the complete
job (your stack trace)

That happens only occasionally probably due to load in your cluster.
The slow down could have multiple reasons...
- Is your Hadoop cluster resource constrained and the tasks are slow to deploy?
- Is your application JAR very large and needs a lot of time downloading?

We have two options at this point:
1) You can increase the maximum retries via the config option:
"taskmanager.network.request-backoff.max" The default is 10000
(milliseconds) and specifies what the maximum request back off is [1].
Increasing this to 30000 would give you two extra retries with pretty
long delays (see [1]).

2) To be sure that this is really what is happening we could increase
the log level of certain classes and check whether they have
registered their results or not. If you want to do this, I'm more than
happy to provide you with some classes to enable DEBUG logging for.

What do you think?

– Ufuk

DETAILS
=======

- The TaskManagers produce and consume intermediate results
- When a TaskManager wants to consume a result, it directly queries
the producing TaskManager for it
- An intermediate result becomes ready for consumption during initial
task setup (state DEPLOYING)
- When a TaskManager is slow to register its intermediate result and
the consumer requests the result before it is ready, it can happen
that a requested partition is "not found"

This is what is also happening here. We retry to request the
intermediate result multiple times with timed backoff [1] and only
fail the request (your stack trace) if the partition is still not
ready although we expect it to be ready (that is there was no failure
at the producing task).

[1] Starting by default at 100 millis and going up to 10_000 millis by
doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000)


On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> Hi,
>
> I'm having some trouble running a java based Flink job in a yarn-session.
>
> The job itself consists of reading a set of files resulting in a DataStream
> (I use DataStream because in the future I intend to change the file with a
> Kafka feed), then does some parsing and eventually writes the data into
> HBase.
>
> Most of the time running this works fine yet sometimes it fails with this
> exception:
>
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
> not found.
> 	at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
> 	at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
> 	at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
> 	at
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
> 	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
> 	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
> 	at
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
> 	at akka.dispatch.OnComplete.internal(Future.scala:248)
> 	at akka.dispatch.OnComplete.internal(Future.scala:245)
> 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
> 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
> 	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> 	at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 	at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> 	at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 	at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> 	at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I went through all logs at the Hadoop side of all the related containers and
> other than this exception I did not see any warning/error that might explain
> what is going on here.
>
> Now the "Most of the time running this works fine" makes this hard to
> troubleshoot. When I run the same job again it may run perfectly that time.
>
> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked my
> pom.xml and I use the same version for Flink / Scala in there.
>
> The command used to start the yarn-session on my experimental cluster (no
> security, no other users):
>
> /usr/local/flink-1.3.2/bin/yarn-session.sh \
>     --container 180 \
>     --name "Flink on Yarn Experiments" \
>     --slots                     1     \
>     --jobManagerMemory          4000  \
>     --taskManagerMemory         4000  \
>     --streaming                       \
>     --detached
>
> Two relevant fragments from my application pom.xml:
>
> <flink.version>1.3.2</flink.version>
> <flink.scala.version>2.11</flink.scala.version>
>
>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-java</artifactId>
>   <version>${flink.version}</version>
> </dependency>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
>   <version>${flink.version}</version>
> </dependency>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-clients_${flink.scala.version}</artifactId>
>   <version>${flink.version}</version>
> </dependency>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-hbase_${flink.scala.version}</artifactId>
>   <version>${flink.version}</version>
> </dependency>
>
>
> I could really use some suggestions where to look for the root cause of
> this.
> Is this something in my application? My Hadoop cluster? Or is this a problem
> in Flink 1.3.2?
>
> Thanks.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes