You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by uce <gi...@git.apache.org> on 2016/03/10 14:58:29 UTC

[GitHub] flink pull request: [FLINK-3595] [runtime] Eagerly destroy buffer ...

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/1780

    [FLINK-3595] [runtime] Eagerly destroy buffer pools on cancelling

    When canceling a job, the Kafka 0.9 Consumer Thread may be stuck in a blocking method (output emitting) and never wakes up.
    
    The thread as a whole cannot be simply interrupted, because of a bug in Kafka that makes the consumer freeze/hang up on interrupt.
    
    With this PR, we destroy the buffer pools eagerly on canceling. The Kafka thread will then throw an exception if it is stuck in emitting elements and it will terminate, which is accepted in case the status is canceled.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 3595-close_bufferpools

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1780.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1780
    
----
commit 52320e184f0af61d1d1bf5f9e81b9da2309033c5
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-03-10T11:02:25Z

    [FLINK-3595] [runtime] Eagerly destroy buffer pools on cancelling

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3595] [runtime] Eagerly destroy buffer ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1780#issuecomment-205299566
  
    Makes sense. Given that the CI tests run, I am okay with merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3595] [runtime] Eagerly destroy buffer ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1780#discussion_r55864951
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerTest.java ---
    @@ -0,0 +1,309 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskmanager;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
    +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
    +import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
    +import org.apache.flink.runtime.testingUtils.TestingCluster;
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Test;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class TaskCancelAsyncProducerConsumerTest extends TestLogger {
    --- End diff --
    
    If this fires up a test cluster, it should probably be an IT case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3595] [runtime] Eagerly destroy buffer ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1780#issuecomment-195491044
  
    Regarding the stack trace of the Kafka test: the log shows two failures.
    - The stack trace you posted is part of a test time out (that's why you have the interruption of the client in your stack trace), and
    - a job restart when not enough task managers are connected (NoResourceAvailableException).
    
    Looking at the code, the failures seem unrelated. Do you think that's not the case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3595] [runtime] Eagerly destroy buffer ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1780#issuecomment-205319812
  
    Just rebased. Waiting for Travis and then merging to `master` and `release-1.0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3595] [runtime] Eagerly destroy buffer ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1780#issuecomment-195483345
  
    Test failures
      - CheckpointNotifierITCase (2x) - known instability
      - Kafka08 test
    
    This seems to be the relevant failure
    ```
    03/11/2016 15:44:58	Job execution switched to status FAILED.
    org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: null
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:348)
    	at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:206)
    	at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:172)
    	at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:31)
    	at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.readSequence(KafkaConsumerTestBase.java:1207)
    	at org.apache.flink.streaming.connectors.kafka.Kafka08ITCase.testOffsetInZookeeper(Kafka08ITCase.java:216)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:483)
    	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    	at org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74)
    Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: null
    	at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:141)
    	at org.apache.flink.client.program.Client.runBlocking(Client.java:379)
    	... 16 more
    Caused by: java.lang.InterruptedException
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039)
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
    	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
    	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    	at scala.concurrent.Await$.result(package.scala:107)
    	at scala.concurrent.Await.result(package.scala)
    	at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:133)
    	... 17 more
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3595] [runtime] Eagerly destroy buffer ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1780


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---