You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Miguel E. Coimbra (JIRA)" <ji...@apache.org> on 2018/04/24 00:54:00 UTC

[jira] [Created] (FLINK-9242) LocalEnvironment - Operator threads stuck on java.lang.Thread.State: WAITING

Miguel E. Coimbra created FLINK-9242:
----------------------------------------

             Summary: LocalEnvironment - Operator threads stuck on java.lang.Thread.State: WAITING
                 Key: FLINK-9242
                 URL: https://issues.apache.org/jira/browse/FLINK-9242
             Project: Flink
          Issue Type: Bug
          Components: Cluster Management
    Affects Versions: 1.5.0, 1.6.0
         Environment: *SETUP 1*
- Windows 7 Pro x64
- Java 1.8.0_162 x64
- 8 GB RAM
- Intel i7 620M

*SETUP 2*
- Slackware 14.2 x64 GNU/Linux 4.4.88
- Java openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
- 256 GB RAM
- 8x Intel(R) Xeon(R) CPU E7- 4830


            Reporter: Miguel E. Coimbra
         Attachments: flink_debugging.PNG

Hello,

As per Fabian Hueske's advice on the mailing list, I am detailing the problem here.
 This happens on my code in both 1.5-SNAPSHOT and 1.6-SNAPSHOT but not on 1.4.2 (stable).
 I believe it might be some sort of regression which was introduced post 1.4.2.

I'm getting different DataSet operators blocked on java.lang.Thread.State: WAITING for no apparent reason.
 I only tested this using a LocalEnvironment which is created like so:
{code:java}
final Configuration conf = new Configuration();
conf.setString("web.log.path", logPath);
conf.setString("jobmanager.rpc.address", "127.0.0.1");
conf.setString("web.port", "8081-9000");
conf.setString("query.server.ports", "2000-30000");
conf.setString("query.proxy.ports", "30001-60000");

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
{code}
(also tried creating the LocalEnvironment without the web interface and it also happens)

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.

I cannot share my code at the moment, but essentially I have a series of jobs and some use common data (I made sure it was written to disk in job _i_ and read back from disk in job _i + 1_)

There are three major threads that I find to be in this waiting state.

I'm running on local mode with a parallelism of one.
 The thread dumps I obtained show me where the wait calls originated:

 
{code:java}
Number 1:

"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)
{code}
{code:java}
Number 2:

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)
{code}
{code:java}
Number 3:

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748){code}
 
While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
 But if it was resource scarcity I believe the program would terminate with an exception.
 And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, my apologies. I will do so as soon a I get a chance.

To highlight the symptoms:
 - The memory assigned to the JVM is fully used, but there are no exceptions about lack of memory (and the system had plenty more memory available).
 - The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

I noticed something suspicious as well: I have chains of operators where the first operator will ingest the expected amount of records but will not emit any, leaving the following operator empty in a "RUNNING" state (see attached image).

I think we may consider there is some complexity in my scenario, at least when compared to samples in the Flink documentation. When visualizing the job plan, it is necessary to zoom in and out to check on specific parts of the execution scheme.

Among the sequence of operations, I am producing:

1 - Creating a DataSet

2 - Using it as an initial workset in a DeltaIteration

2.1 - Joining the workset on each iteration with the edges of a graph

3 - Using the final solution set resulting from the DeltaIteration to build a graph and execute an algorithm over it (.run method).

- The graph is not prohibitively big and I have a very low limit on the number of iterations (at most 4 or 5).

 I will add more information as soon as it is available.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)