You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/10/05 16:18:51 UTC

samza git commit: SAMZA-1429: Add callback success/failure metrics to async tasks

Repository: samza
Updated Branches:
  refs/heads/master a671288e1 -> 7ad6631bb


SAMZA-1429: Add callback success/failure metrics to async tasks

jmakes & nickpan47 please take a look when you get the chance.

Author: Daniel Nishimura <dn...@gmail.com>

Reviewers: Jacob Maes <jm...@linkedin.com>

Closes #306 from dnishimura/samza-1429-async-metrics


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7ad6631b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7ad6631b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7ad6631b

Branch: refs/heads/master
Commit: 7ad6631bbfca88d759e2cf5d4c0beeb5ef4eb013
Parents: a671288
Author: Daniel Nishimura <dn...@gmail.com>
Authored: Thu Oct 5 09:18:37 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Thu Oct 5 09:18:37 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/task/AsyncRunLoop.java     |  1 +
 .../samza/container/TaskInstanceMetrics.scala   |  1 +
 .../org/apache/samza/task/TestAsyncRunLoop.java | 40 ++++++++++++++++++++
 3 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7ad6631b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 58af820..b8f48c7 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -527,6 +527,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         public void run() {
           try {
             state.doneProcess();
+            state.taskMetrics.asyncCallbackCompleted().inc();
             TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
             containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.timeCreatedNs);
             log.trace("Got callback complete for task {}, ssp {}",

http://git-wip-us.apache.org/repos/asf/samza/blob/7ad6631b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index ffd9e7a..94cfbdc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -36,6 +36,7 @@ class TaskInstanceMetrics(
   val flushes = newCounter("flush-calls")
   val pendingMessages = newGauge("pending-messages", 0)
   val messagesInFlight = newGauge("messages-in-flight", 0)
+  val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
 
   def addOffsetGauge(systemStreamPartition: SystemStreamPartition, getValue: () => String) {
     newGauge("%s-%s-%d-offset" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), getValue)

http://git-wip-us.apache.org/repos/asf/samza/blob/7ad6631b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 5a4b4bf..b399f5f 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -46,6 +46,7 @@ import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.TestSystemConsumers;
 import org.junit.Before;
+import org.junit.Test;
 import scala.Option;
 import scala.collection.JavaConverters;
 
@@ -200,6 +201,45 @@ public class TestAsyncRunLoop {
     when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
   }
 
+  @Test
+  public void testMetrics() throws Exception {
+    CountDownLatch task0ProcessedMessages = new CountDownLatch(2);
+    CountDownLatch task1ProcessedMessages = new CountDownLatch(1);
+
+    TestTask task0 = new TestTask(true, true, false, task0ProcessedMessages);
+    TestTask task1 = new TestTask(true, true, false, task1ProcessedMessages);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+
+    Map<TaskName, TaskInstance> tasks = new HashMap<>();
+    tasks.put(taskName0, t0);
+    tasks.put(taskName1, t1);
+    //task0.callbackHandler = buildOutofOrderCallback(task0);
+
+    int maxMessagesInFlight = 1;
+    AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+        callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+
+    when(consumerMultiplexer.choose(false))
+        .thenReturn(envelope0)
+        .thenReturn(envelope3)
+        .thenReturn(envelope1)
+        .thenReturn(null)
+        .thenReturn(ssp0EndOfStream)
+        .thenReturn(ssp1EndOfStream)
+        .thenReturn(null);
+
+    runLoop.run();
+
+    task0ProcessedMessages.await();
+    task1ProcessedMessages.await();
+
+    assertEquals(2L, t0.metrics().asyncCallbackCompleted().getCount());
+    assertEquals(1L, t1.metrics().asyncCallbackCompleted().getCount());
+    assertEquals(5L, containerMetrics.envelopes().getCount());
+    assertEquals(3L, containerMetrics.processes().getCount());
+  }
+
   //@Test
   public void testProcessMultipleTasks() throws Exception {
     CountDownLatch task0ProcessedMessages = new CountDownLatch(1);