You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/07/21 00:08:29 UTC

samza git commit: SAMZA-973: Disk Quotas: clamp max delay and more accurate processing time measurement

Repository: samza
Updated Branches:
  refs/heads/master e5f31c57c -> 2187d6bd9


SAMZA-973: Disk Quotas: clamp max delay and more accurate processing time measurement


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2187d6bd
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2187d6bd
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2187d6bd

Branch: refs/heads/master
Commit: 2187d6bd9d942e0d95189531c4b4db23f30c042b
Parents: e5f31c5
Author: Chris Pettitt <cp...@linkedin.com>
Authored: Wed Jul 20 17:07:32 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Jul 20 17:07:32 2016 -0700

----------------------------------------------------------------------
 .../apache/samza/util/ThrottlingExecutor.java   | 15 ++++---
 .../org/apache/samza/container/RunLoop.scala    | 42 ++++++++++----------
 .../apache/samza/container/SamzaContainer.scala |  6 ++-
 .../samza/util/TestThrottlingExecutor.java      | 28 +++++++++++--
 4 files changed, 58 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
index 214cefd..afcc4c5 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
@@ -20,6 +20,7 @@
 package org.apache.samza.util;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * An object that performs work on the current thread and optionally slows the rate of execution.
@@ -33,16 +34,18 @@ public class ThrottlingExecutor implements Executor {
   public static final double MAX_WORK_FACTOR = 1.0;
   public static final double MIN_WORK_FACTOR = 0.001;
 
+  private final long maxDelayNanos;
   private final HighResolutionClock clock;
 
   private volatile double workToIdleFactor;
   private long pendingNanos;
 
-  public ThrottlingExecutor() {
-    this(new SystemHighResolutionClock());
+  public ThrottlingExecutor(long maxDelayMillis) {
+    this(maxDelayMillis, new SystemHighResolutionClock());
   }
 
-  ThrottlingExecutor(HighResolutionClock clock) {
+  ThrottlingExecutor(long maxDelayMillis, HighResolutionClock clock) {
+    this.maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
     this.clock = clock;
   }
 
@@ -68,8 +71,10 @@ public class ThrottlingExecutor implements Executor {
       final long workNanos = clock.nanoTime() - startWorkNanos;
 
       // NOTE: we accumulate pending delay nanos here, but we later update the pending nanos during
-      // the sleep operation (if applicable), so they do not continue to grow.
-      pendingNanos = Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor));
+      // the sleep operation (if applicable), so they do not continue to grow. We also clamp the
+      // maximum sleep time to prevent excessively large sleeps between executions.
+      pendingNanos = Math.min(maxDelayNanos,
+          Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor)));
       if (pendingNanos > 0) {
         try {
           pendingNanos = clock.sleep(pendingNanos);

http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index bb2c376..538ebb8 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -21,9 +21,8 @@ package org.apache.samza.container
 
 import java.util.concurrent.Executor
 
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.CoordinatorRequests
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, SystemStreamPartition}
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.task.StreamTask
 import org.apache.samza.util.Logging
@@ -74,20 +73,26 @@ class RunLoop (
    * unhandled exception is thrown.
    */
   def run {
-    val runTask = new Runnable() {
-      override def run(): Unit = {
-        val loopStartTime = clock()
-        process
-        window
-        commit
-        val totalNs = clock() - loopStartTime
-        metrics.utilization.set(activeNs.toFloat / totalNs)
-        activeNs = 0L
+    while (!shutdownNow) {
+      val loopStartTime = clock()
+
+      trace("Attempting to choose a message to process.")
+
+      // Exclude choose time from activeNs. Although it includes deserialization time,
+      // it most closely captures idle time.
+      val envelope = updateTimer(metrics.chooseNs) {
+        consumerMultiplexer.choose()
       }
-    }
 
-    while (!shutdownNow) {
-      executor.execute(runTask)
+      executor.execute(new Runnable() {
+        override def run(): Unit = process(envelope)
+      })
+
+      window
+      commit
+      val totalNs = clock() - loopStartTime
+      metrics.utilization.set(activeNs.toFloat / totalNs)
+      activeNs = 0L
     }
   }
 
@@ -99,16 +104,9 @@ class RunLoop (
    * Chooses a message from an input stream to process, and calls the
    * process() method on the appropriate StreamTask to handle it.
    */
-  private def process {
-    trace("Attempting to choose a message to process.")
+  private def process(envelope: IncomingMessageEnvelope) {
     metrics.processes.inc
 
-    // Exclude choose time from activeNs. Although it includes deserialization time,
-    // it most closely captures idle time.
-    val envelope = updateTimer(metrics.chooseNs) {
-     consumerMultiplexer.choose()
-    }
-
     activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => {
       if (envelope != null) {
         val ssp = envelope.getSystemStreamPartition

http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index b8600d5..90d7279 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -28,7 +28,8 @@ import java.util
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Executors
 import java.util.concurrent.TimeUnit
-
+import java.lang.Thread.UncaughtExceptionHandler
+import java.net.{URL, UnknownHostException}
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.CheckpointManagerFactory
 import org.apache.samza.checkpoint.OffsetManager
@@ -552,7 +553,8 @@ object SamzaContainer extends Logging {
       (taskName, taskInstance)
     }).toMap
 
-    val executor = new ThrottlingExecutor()
+    val executor = new ThrottlingExecutor(
+      config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)))
 
     val diskQuotaBytes = config.getLong("container.disk.quota.bytes", Long.MaxValue)
     samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes)

http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
index 2659050..0276e6b 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
@@ -29,6 +29,8 @@ import org.junit.Test;
 import java.util.concurrent.TimeUnit;
 
 public class TestThrottlingExecutor {
+  private static final long MAX_NANOS = Long.MAX_VALUE;
+
   private static final Runnable NO_OP = new Runnable() {
     @Override
     public void run() {
@@ -42,12 +44,12 @@ public class TestThrottlingExecutor {
   @Before
   public void setUp() {
     clock = Mockito.mock(HighResolutionClock.class);
-    executor = new ThrottlingExecutor(clock);
+    executor = new ThrottlingExecutor(MAX_NANOS, clock);
   }
 
   @Test
   public void testInitialState() {
-    ThrottlingExecutor throttler = new ThrottlingExecutor();
+    ThrottlingExecutor throttler = new ThrottlingExecutor(MAX_NANOS);
     assertEquals(0, throttler.getPendingNanos());
     assertEquals(1.0, throttler.getWorkFactor());
   }
@@ -66,12 +68,12 @@ public class TestThrottlingExecutor {
 
   @Test(expected = IllegalArgumentException.class)
   public void testLessThan0PercentWorkRate() {
-    new ThrottlingExecutor().setWorkFactor(-0.1);
+    new ThrottlingExecutor(MAX_NANOS).setWorkFactor(-0.1);
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testGreaterThan100PercentWorkRate() {
-    new ThrottlingExecutor().setWorkFactor(1.1);
+    new ThrottlingExecutor(MAX_NANOS).setWorkFactor(1.1);
   }
 
   @Test
@@ -184,6 +186,24 @@ public class TestThrottlingExecutor {
   }
 
   @Test
+  public void testClampDelayMillis() throws InterruptedException {
+    final long maxDelayMillis = 10;
+    final long maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
+
+    executor = new ThrottlingExecutor(maxDelayMillis, clock);
+    executor.setWorkFactor(0.5);
+
+    // Note work time exceeds maxDelayMillis
+    setWorkTime(TimeUnit.MILLISECONDS.toNanos(100));
+    setExpectedAndActualSleepTime(maxDelayNanos, maxDelayNanos);
+
+    executor.execute(NO_OP);
+
+    verifySleepTime(maxDelayNanos);
+    assertEquals(0L, executor.getPendingNanos());
+  }
+
+  @Test
   public void testDecreaseWorkFactor() {
     executor.setWorkFactor(0.5);
     executor.setPendingNanos(5000);