You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/17 18:43:36 UTC

samza git commit: SAMZA-1146; TaskCallbackManager commit fix.

Repository: samza
Updated Branches:
  refs/heads/master 628f0d08b -> 779c3d80e


SAMZA-1146; TaskCallbackManager commit fix.

Each task callback in samza belongs to different SystemStreamPartition. When multiple callbacks in contagious order are available for commit, callback with highest sequence number is chosen for commit. This will prevent checkpointing of completed callbacks that has commit request and doesn't have highest sequence number. Upon task restart this will lead to duplicate reprocessing of already processed messages (since completed callbacks for some SystemStreamPartition's aren't committed earlier).

This PR fixes it and commits all completed callbacks that has commit request defined. Added a test to verify the behavior.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Author: vjagadish1989 <jv...@linkedin.com>
Author: Boris Shkolnik <bo...@apache.org>
Author: Prateek Maheshwari <pm...@linkedin.com>
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Chen Song <cs...@appnexus.com>
Author: Tommy Becker <to...@tivo.com>
Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Xinyu Liu <xi...@linkedin.com>

Closes #87 from shanthoosh/Fixing_CallBackManager_Commit


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/779c3d80
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/779c3d80
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/779c3d80

Branch: refs/heads/master
Commit: 779c3d80e6c9af05a93286a593a138a2c5f03eb9
Parents: 628f0d0
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Fri Mar 17 11:42:34 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Fri Mar 17 11:42:34 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/task/AsyncRunLoop.java     |  10 +-
 .../apache/samza/task/TaskCallbackManager.java  |  41 ++----
 .../samza/task/TestTaskCallbackManager.java     | 126 ++++++++++++++-----
 3 files changed, 115 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/779c3d80/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 064402c..1561bcf 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -515,15 +515,13 @@ public class AsyncRunLoop implements Runnable, Throttleable {
             log.trace("Got callback complete for task {}, ssp {}",
                 callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
 
-            TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl);
-            if (callbackToUpdate != null) {
+            List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callbackImpl);
+            for (TaskCallbackImpl callbackToUpdate : callbacksToUpdate) {
               IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
-              log.trace("Update offset for ssp {}, offset {}",
-                  envelope.getSystemStreamPartition(), envelope.getOffset());
+              log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
 
               // update offset
-              task.offsetManager().update(task.taskName(),
-                  envelope.getSystemStreamPartition(), envelope.getOffset());
+              task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset());
 
               // update coordinator
               coordinatorRequests.update(callbackToUpdate.coordinator);

http://git-wip-us.apache.org/repos/asf/samza/blob/779c3d80/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
index 5bce778..370cb1a 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.task;
 
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -46,40 +49,22 @@ class TaskCallbackManager {
      * Adding the newly complete callback to the callback queue
      * Move the queue to the last contiguous callback to commit offset
      * @param cb new callback completed
-     * @return callback of highest watermark needed to be committed
+     * @return list of callbacks to be committed
      */
-    TaskCallbackImpl update(TaskCallbackImpl cb) {
+    List<TaskCallbackImpl> update(TaskCallbackImpl cb) {
       synchronized (lock) {
         callbacks.add(cb);
-
-        TaskCallbackImpl callback = null;
-        TaskCallbackImpl callbackToCommit = null;
-        TaskCoordinator.RequestScope shutdownRequest = null;
+        List<TaskCallbackImpl> callbacksToUpdate = new ArrayList<>();
         // look for the last contiguous callback
         while (!callbacks.isEmpty() && callbacks.peek().matchSeqNum(nextSeqNum)) {
           ++nextSeqNum;
-          callback = callbacks.poll();
-
+          TaskCallbackImpl callback = callbacks.poll();
+          callbacksToUpdate.add(callback);
           if (callback.coordinator.commitRequest().isDefined()) {
-            callbackToCommit = callback;
-          }
-
-          if (callback.coordinator.shutdownRequest().isDefined()) {
-            shutdownRequest = callback.coordinator.shutdownRequest().get();
+            break;
           }
         }
-
-        // if there is no manual commit, use the highest contiguous callback message offset
-        if (callbackToCommit == null) {
-          callbackToCommit = callback;
-        }
-
-        // if there is a shutdown request, merge it into the coordinator to commit
-        if (shutdownRequest != null) {
-          callbackToCommit.coordinator.shutdown(shutdownRequest);
-        }
-
-        return callbackToCommit;
+        return callbacksToUpdate;
       }
     }
   }
@@ -127,14 +112,14 @@ class TaskCallbackManager {
    * Update the task callbacks with the new callback completed.
    * It uses a high-watermark model to roll the callbacks for checkpointing.
    * @param callback new completed callback
-   * @return the callback for checkpointing
+   * @return the list of callbacks for checkpointing
    */
-  public TaskCallbackImpl updateCallback(TaskCallbackImpl callback) {
+  public List<TaskCallbackImpl> updateCallback(TaskCallbackImpl callback) {
     if (maxConcurrency > 1) {
       // Use the completedCallbacks queue to handle the out-of-order case when max concurrency is larger than 1
       return completedCallbacks.update(callback);
     } else {
-      return callback;
+      return ImmutableList.of(callback);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/779c3d80/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
index b2ed316..ab8a29e 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.task;
 
+import java.util.List;
 import org.apache.samza.Partition;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
@@ -29,10 +30,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-
 public class TestTaskCallbackManager {
   TaskCallbackManager callbackManager = null;
   TaskCallbackListener listener = null;
@@ -68,17 +67,21 @@ public class TestTaskCallbackManager {
 
     IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
     TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
-    TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback0);
-    assertTrue(callbackToCommit.matchSeqNum(0));
-    assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
-    assertEquals("0", callbackToCommit.envelope.getOffset());
+    List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callback0);
+    assertEquals(1, callbacksToUpdate.size());
+    TaskCallbackImpl callback = callbacksToUpdate.get(0);
+    assertTrue(callback.matchSeqNum(0));
+    assertEquals(ssp, callback.envelope.getSystemStreamPartition());
+    assertEquals("0", callback.envelope.getOffset());
 
     IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
     TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1, 0);
-    callbackToCommit = callbackManager.updateCallback(callback1);
-    assertTrue(callbackToCommit.matchSeqNum(1));
-    assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
-    assertEquals("1", callbackToCommit.envelope.getOffset());
+    callbacksToUpdate = callbackManager.updateCallback(callback1);
+    assertEquals(1, callbacksToUpdate.size());
+    callback = callbacksToUpdate.get(0);
+    assertTrue(callback.matchSeqNum(1));
+    assertEquals(ssp, callback.envelope.getSystemStreamPartition());
+    assertEquals("1", callback.envelope.getOffset());
   }
 
   @Test
@@ -90,20 +93,32 @@ public class TestTaskCallbackManager {
     // simulate out of order
     IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp, "2", null, null);
     TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator, 2, 0);
-    TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2);
-    assertNull(callbackToCommit);
+    List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callback2);
+    assertTrue(callbacksToUpdate.isEmpty());
 
     IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
     TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1, 0);
-    callbackToCommit = callbackManager.updateCallback(callback1);
-    assertNull(callbackToCommit);
+    callbacksToUpdate = callbackManager.updateCallback(callback1);
+    assertTrue(callbacksToUpdate.isEmpty());
 
     IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
     TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
-    callbackToCommit = callbackManager.updateCallback(callback0);
-    assertTrue(callbackToCommit.matchSeqNum(2));
-    assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
-    assertEquals("2", callbackToCommit.envelope.getOffset());
+    callbacksToUpdate = callbackManager.updateCallback(callback0);
+    assertEquals(3, callbacksToUpdate.size());
+    TaskCallbackImpl callback = callbacksToUpdate.get(0);
+    assertTrue(callback.matchSeqNum(0));
+    assertEquals(ssp, callback.envelope.getSystemStreamPartition());
+    assertEquals("0", callback.envelope.getOffset());
+
+    callback = callbacksToUpdate.get(1);
+    assertTrue(callback.matchSeqNum(1));
+    assertEquals(ssp, callback.envelope.getSystemStreamPartition());
+    assertEquals("1", callback.envelope.getOffset());
+
+    callback = callbacksToUpdate.get(2);
+    assertTrue(callback.matchSeqNum(2));
+    assertEquals(ssp, callback.envelope.getSystemStreamPartition());
+    assertEquals("2", callback.envelope.getOffset());
   }
 
   @Test
@@ -111,30 +126,85 @@ public class TestTaskCallbackManager {
     TaskName taskName = new TaskName("Partition 0");
     SystemStreamPartition ssp = new SystemStreamPartition("kafka", "topic", new Partition(0));
 
-
     // simulate out of order
     IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp, "2", null, null);
     ReadableCoordinator coordinator2 = new ReadableCoordinator(taskName);
     coordinator2.shutdown(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
     TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator2, 2, 0);
-    TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2);
-    assertNull(callbackToCommit);
+    List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callback2);
+    assertTrue(callbacksToUpdate.isEmpty());
 
     IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
     ReadableCoordinator coordinator1 = new ReadableCoordinator(taskName);
     coordinator1.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
     TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator1, 1, 0);
-    callbackToCommit = callbackManager.updateCallback(callback1);
-    assertNull(callbackToCommit);
+    callbacksToUpdate = callbackManager.updateCallback(callback1);
+    assertTrue(callbacksToUpdate.isEmpty());
 
     IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
     ReadableCoordinator coordinator = new ReadableCoordinator(taskName);
     TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
-    callbackToCommit = callbackManager.updateCallback(callback0);
-    assertTrue(callbackToCommit.matchSeqNum(1));
-    assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
-    assertEquals("1", callbackToCommit.envelope.getOffset());
-    assertTrue(callbackToCommit.coordinator.requestedShutdownNow());
+    callbacksToUpdate = callbackManager.updateCallback(callback0);
+    assertEquals(2, callbacksToUpdate.size());
+
+    //Check for envelope0
+    TaskCallbackImpl taskCallback = callbacksToUpdate.get(0);
+    assertTrue(taskCallback.matchSeqNum(0));
+    assertEquals(ssp, taskCallback.envelope.getSystemStreamPartition());
+    assertEquals("0", taskCallback.envelope.getOffset());
+
+    //Check for envelope1
+    taskCallback = callbacksToUpdate.get(1);
+    assertTrue(taskCallback.matchSeqNum(1));
+    assertEquals(ssp, taskCallback.envelope.getSystemStreamPartition());
+    assertEquals("1", taskCallback.envelope.getOffset());
   }
 
+  @Test
+  public void testUpdateShouldReturnAllCompletedCallbacksTillTheCommitRequestDefined() {
+    TaskName taskName = new TaskName("Partition 0");
+    SystemStreamPartition ssp1 = new SystemStreamPartition("kafka", "topic", new Partition(0));
+    SystemStreamPartition ssp2 = new SystemStreamPartition("kafka", "topic", new Partition(0));
+
+    // Callback for Envelope3 contains commit request.
+    IncomingMessageEnvelope envelope3 = new IncomingMessageEnvelope(ssp2, "0", null, null);
+    ReadableCoordinator coordinator3 = new ReadableCoordinator(taskName);
+    coordinator3.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    TaskCallbackImpl callback3 = new TaskCallbackImpl(listener, taskName, envelope3, coordinator3, 3, 0);
+    List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callback3);
+    assertTrue(callbacksToUpdate.isEmpty());
+
+    IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp1, "2", null, null);
+    ReadableCoordinator coordinator2 = new ReadableCoordinator(taskName);
+    coordinator2.shutdown(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
+    TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator2, 2, 0);
+    callbacksToUpdate = callbackManager.updateCallback(callback2);
+    assertTrue(callbacksToUpdate.isEmpty());
+
+    IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null);
+    ReadableCoordinator coordinator1 = new ReadableCoordinator(taskName);
+    coordinator1.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator1, 1, 0);
+    callbacksToUpdate = callbackManager.updateCallback(callback1);
+    assertTrue(callbacksToUpdate.isEmpty());
+
+    // Callback for Envelope0 contains commit request.
+    IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp1, "0", null, null);
+    ReadableCoordinator coordinator = new ReadableCoordinator(taskName);
+    TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
+
+    // Check for both Envelope1, Envelope2, Envelope3 in callbacks to commit.
+    // Two callbacks belonging to different system partition and has commitRequest defined is returned.
+    callbacksToUpdate = callbackManager.updateCallback(callback0);
+    assertEquals(2, callbacksToUpdate.size());
+    TaskCallbackImpl callback = callbacksToUpdate.get(0);
+    assertTrue(callback.matchSeqNum(0));
+    assertEquals(envelope0.getSystemStreamPartition(), callback.envelope.getSystemStreamPartition());
+    assertEquals(envelope0.getOffset(), callback.envelope.getOffset());
+
+    callback = callbacksToUpdate.get(1);
+    assertTrue(callback.matchSeqNum(1));
+    assertEquals(envelope1.getSystemStreamPartition(), callback.envelope.getSystemStreamPartition());
+    assertEquals(envelope1.getOffset(), callback.envelope.getOffset());
+  }
 }