You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/24 22:23:50 UTC

kafka git commit: KAFKA-4527: task status was being updated before actual pause/resume

Repository: kafka
Updated Branches:
  refs/heads/trunk 9eb665c39 -> d09214624


KAFKA-4527: task status was being updated before actual pause/resume

h/t ewencp for pointing out the issue

Author: Shikhar Bhushan <sh...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2277 from shikhar/kafka-4527


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0921462
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0921462
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0921462

Branch: refs/heads/trunk
Commit: d092146247f3b3de061ecdb4ddfeac9321d8cf73
Parents: 9eb665c
Author: Shikhar Bhushan <sh...@confluent.io>
Authored: Sat Dec 24 17:25:01 2016 -0500
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sat Dec 24 17:25:01 2016 -0500

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSinkTask.java   |  3 +
 .../kafka/connect/runtime/WorkerSourceTask.java |  5 +-
 .../kafka/connect/runtime/WorkerTask.java       | 71 ++++++++++----------
 .../connect/runtime/WorkerSinkTaskTest.java     |  7 +-
 .../connect/runtime/WorkerSourceTaskTest.java   | 20 ++----
 5 files changed, 53 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0921462/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index b941469..a284ec7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -181,8 +181,11 @@ class WorkerSinkTask extends WorkerTask {
 
             if (shouldPause()) {
                 pauseAll();
+                onPause();
+                context.requestCommit();
             } else if (!pausedForRedelivery) {
                 resumeAll();
+                onResume();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0921462/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 260015e..05a07b8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -146,7 +146,10 @@ class WorkerSourceTask extends WorkerTask {
 
             while (!isStopping()) {
                 if (shouldPause()) {
-                    awaitUnpause();
+                    onPause();
+                    if (awaitUnpause()) {
+                        onResume();
+                    }
                     continue;
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0921462/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 2f2ebb5..04fb333 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -23,8 +23,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Handles processing for an individual task. This interface only provides the basic methods
@@ -41,21 +39,20 @@ abstract class WorkerTask implements Runnable {
     private static final Logger log = LoggerFactory.getLogger(WorkerTask.class);
 
     protected final ConnectorTaskId id;
-    private final AtomicBoolean stopping;   // indicates whether the Worker has asked the task to stop
-    private final AtomicBoolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
-    private final CountDownLatch shutdownLatch;
     private final TaskStatus.Listener statusListener;
-    private final AtomicReference<TargetState> targetState;
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private volatile TargetState targetState;
+    private volatile boolean stopping;   // indicates whether the Worker has asked the task to stop
+    private volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
 
     public WorkerTask(ConnectorTaskId id,
                       TaskStatus.Listener statusListener,
                       TargetState initialState) {
         this.id = id;
-        this.stopping = new AtomicBoolean(false);
-        this.cancelled = new AtomicBoolean(false);
-        this.shutdownLatch = new CountDownLatch(1);
         this.statusListener = statusListener;
-        this.targetState = new AtomicReference<>(initialState);
+        this.targetState = initialState;
+        this.stopping = false;
+        this.cancelled = false;
     }
 
     public ConnectorTaskId id() {
@@ -71,7 +68,7 @@ abstract class WorkerTask implements Runnable {
 
     private void triggerStop() {
         synchronized (this) {
-            this.stopping.set(true);
+            stopping = true;
 
             // wakeup any threads that are waiting for unpause
             this.notifyAll();
@@ -91,7 +88,7 @@ abstract class WorkerTask implements Runnable {
      * updated when it eventually does shutdown.
      */
     public void cancel() {
-        this.cancelled.set(true);
+        cancelled = true;
     }
 
     /**
@@ -113,7 +110,7 @@ abstract class WorkerTask implements Runnable {
     protected abstract void close();
 
     protected boolean isStopping() {
-        return stopping.get();
+        return stopping;
     }
 
     private void doClose() {
@@ -125,16 +122,18 @@ abstract class WorkerTask implements Runnable {
         }
     }
 
-    private void doRun() {
+    private void doRun() throws InterruptedException {
         try {
             synchronized (this) {
-                if (stopping.get())
+                if (stopping)
                     return;
 
-                if (targetState.get() == TargetState.PAUSED)
-                    statusListener.onPause(id);
-                else
-                    statusListener.onStartup(id);
+                if (targetState == TargetState.PAUSED) {
+                    onPause();
+                    if (!awaitUnpause()) return;
+                }
+
+                statusListener.onStartup(id);
             }
 
             execute();
@@ -153,7 +152,7 @@ abstract class WorkerTask implements Runnable {
 
             // if we were cancelled, skip the status update since the task may have already been
             // started somewhere else
-            if (!cancelled.get())
+            if (!cancelled)
                 statusListener.onShutdown(id);
         }
     }
@@ -164,11 +163,19 @@ abstract class WorkerTask implements Runnable {
 
             // if we were cancelled, skip the status update since the task may have already been
             // started somewhere else
-            if (!cancelled.get())
+            if (!cancelled)
                 statusListener.onFailure(id, t);
         }
     }
 
+    protected synchronized void onPause() {
+        statusListener.onPause(id);
+    }
+
+    protected synchronized void onResume() {
+        statusListener.onResume(id);
+    }
+
     @Override
     public void run() {
         try {
@@ -178,14 +185,14 @@ abstract class WorkerTask implements Runnable {
             onFailure(t);
 
             if (t instanceof Error)
-                throw t;
+                throw (Error) t;
         } finally {
             shutdownLatch.countDown();
         }
     }
 
     public boolean shouldPause() {
-        return this.targetState.get() == TargetState.PAUSED;
+        return this.targetState == TargetState.PAUSED;
     }
 
     /**
@@ -195,8 +202,8 @@ abstract class WorkerTask implements Runnable {
      */
     protected boolean awaitUnpause() throws InterruptedException {
         synchronized (this) {
-            while (targetState.get() == TargetState.PAUSED) {
-                if (stopping.get())
+            while (targetState == TargetState.PAUSED) {
+                if (stopping)
                     return false;
                 this.wait();
             }
@@ -207,19 +214,11 @@ abstract class WorkerTask implements Runnable {
     public void transitionTo(TargetState state) {
         synchronized (this) {
             // ignore the state change if we are stopping
-            if (stopping.get())
+            if (stopping)
                 return;
 
-            TargetState oldState = this.targetState.getAndSet(state);
-            if (state != oldState) {
-                if (state == TargetState.PAUSED) {
-                    statusListener.onPause(id);
-                } else if (state == TargetState.STARTED) {
-                    statusListener.onResume(id);
-                    this.notifyAll();
-                } else
-                    throw new IllegalArgumentException("Unhandled target state " + state);
-            }
+            this.targetState = state;
+            this.notifyAll();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0921462/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 1f9e56b..d4427d1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -178,7 +178,9 @@ public class WorkerSinkTaskTest {
         consumer.pause(partitions);
         PowerMock.expectLastCall();
 
-        // No records returned
+        // Offset commit as requested when pausing; No records returned by consumer.poll()
+        sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
+        EasyMock.expectLastCall().andStubReturn(Collections.emptyMap());
         expectConsumerPoll(0);
         sinkTask.put(Collections.<SinkRecord>emptyList());
         EasyMock.expectLastCall();
@@ -353,6 +355,9 @@ public class WorkerSinkTaskTest {
         consumer.resume(Collections.singleton(TOPIC_PARTITION2));
         EasyMock.expectLastCall();
 
+        statusListener.onResume(taskId);
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0921462/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 86f5797..71b315f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -130,41 +130,31 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     @Test
     public void testStartPaused() throws Exception {
-        final CountDownLatch startupLatch = new CountDownLatch(1);
+        final CountDownLatch pauseLatch = new CountDownLatch(1);
 
         createWorkerTask(TargetState.PAUSED);
 
-        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
-        EasyMock.expectLastCall();
-        sourceTask.start(TASK_PROPS);
+        statusListener.onPause(taskId);
         EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
             @Override
             public Void answer() throws Throwable {
-                startupLatch.countDown();
+                pauseLatch.countDown();
                 return null;
             }
         });
-        statusListener.onPause(taskId);
-        EasyMock.expectLastCall();
 
-        // we shouldn't get any calls to poll()
-
-        sourceTask.stop();
+        producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall();
-        expectOffsetFlush(true);
 
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
 
-        producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
-        EasyMock.expectLastCall();
-
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
         Future<?> taskFuture = executor.submit(workerTask);
 
-        assertTrue(startupLatch.await(5, TimeUnit.SECONDS));
+        assertTrue(pauseLatch.await(5, TimeUnit.SECONDS));
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));