You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/07/21 00:08:29 UTC
samza git commit: SAMZA-973: Disk Quotas: clamp max delay and more
accurate processing time measurement
Repository: samza
Updated Branches:
refs/heads/master e5f31c57c -> 2187d6bd9
SAMZA-973: Disk Quotas: clamp max delay and more accurate processing time measurement
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2187d6bd
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2187d6bd
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2187d6bd
Branch: refs/heads/master
Commit: 2187d6bd9d942e0d95189531c4b4db23f30c042b
Parents: e5f31c5
Author: Chris Pettitt <cp...@linkedin.com>
Authored: Wed Jul 20 17:07:32 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Jul 20 17:07:32 2016 -0700
----------------------------------------------------------------------
.../apache/samza/util/ThrottlingExecutor.java | 15 ++++---
.../org/apache/samza/container/RunLoop.scala | 42 ++++++++++----------
.../apache/samza/container/SamzaContainer.scala | 6 ++-
.../samza/util/TestThrottlingExecutor.java | 28 +++++++++++--
4 files changed, 58 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
index 214cefd..afcc4c5 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
@@ -20,6 +20,7 @@
package org.apache.samza.util;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
/**
* An object that performs work on the current thread and optionally slows the rate of execution.
@@ -33,16 +34,18 @@ public class ThrottlingExecutor implements Executor {
public static final double MAX_WORK_FACTOR = 1.0;
public static final double MIN_WORK_FACTOR = 0.001;
+ private final long maxDelayNanos;
private final HighResolutionClock clock;
private volatile double workToIdleFactor;
private long pendingNanos;
- public ThrottlingExecutor() {
- this(new SystemHighResolutionClock());
+ public ThrottlingExecutor(long maxDelayMillis) {
+ this(maxDelayMillis, new SystemHighResolutionClock());
}
- ThrottlingExecutor(HighResolutionClock clock) {
+ ThrottlingExecutor(long maxDelayMillis, HighResolutionClock clock) {
+ this.maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
this.clock = clock;
}
@@ -68,8 +71,10 @@ public class ThrottlingExecutor implements Executor {
final long workNanos = clock.nanoTime() - startWorkNanos;
// NOTE: we accumulate pending delay nanos here, but we later update the pending nanos during
- // the sleep operation (if applicable), so they do not continue to grow.
- pendingNanos = Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor));
+ // the sleep operation (if applicable), so they do not continue to grow. We also clamp the
+ // maximum sleep time to prevent excessively large sleeps between executions.
+ pendingNanos = Math.min(maxDelayNanos,
+ Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor)));
if (pendingNanos > 0) {
try {
pendingNanos = clock.sleep(pendingNanos);
http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index bb2c376..538ebb8 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -21,9 +21,8 @@ package org.apache.samza.container
import java.util.concurrent.Executor
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.task.CoordinatorRequests
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, SystemStreamPartition}
import org.apache.samza.task.ReadableCoordinator
import org.apache.samza.task.StreamTask
import org.apache.samza.util.Logging
@@ -74,20 +73,26 @@ class RunLoop (
* unhandled exception is thrown.
*/
def run {
- val runTask = new Runnable() {
- override def run(): Unit = {
- val loopStartTime = clock()
- process
- window
- commit
- val totalNs = clock() - loopStartTime
- metrics.utilization.set(activeNs.toFloat / totalNs)
- activeNs = 0L
+ while (!shutdownNow) {
+ val loopStartTime = clock()
+
+ trace("Attempting to choose a message to process.")
+
+ // Exclude choose time from activeNs. Although it includes deserialization time,
+ // it most closely captures idle time.
+ val envelope = updateTimer(metrics.chooseNs) {
+ consumerMultiplexer.choose()
}
- }
- while (!shutdownNow) {
- executor.execute(runTask)
+ executor.execute(new Runnable() {
+ override def run(): Unit = process(envelope)
+ })
+
+ window
+ commit
+ val totalNs = clock() - loopStartTime
+ metrics.utilization.set(activeNs.toFloat / totalNs)
+ activeNs = 0L
}
}
@@ -99,16 +104,9 @@ class RunLoop (
* Chooses a message from an input stream to process, and calls the
* process() method on the appropriate StreamTask to handle it.
*/
- private def process {
- trace("Attempting to choose a message to process.")
+ private def process(envelope: IncomingMessageEnvelope) {
metrics.processes.inc
- // Exclude choose time from activeNs. Although it includes deserialization time,
- // it most closely captures idle time.
- val envelope = updateTimer(metrics.chooseNs) {
- consumerMultiplexer.choose()
- }
-
activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => {
if (envelope != null) {
val ssp = envelope.getSystemStreamPartition
http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index b8600d5..90d7279 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -28,7 +28,8 @@ import java.util
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
-
+import java.lang.Thread.UncaughtExceptionHandler
+import java.net.{URL, UnknownHostException}
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.CheckpointManagerFactory
import org.apache.samza.checkpoint.OffsetManager
@@ -552,7 +553,8 @@ object SamzaContainer extends Logging {
(taskName, taskInstance)
}).toMap
- val executor = new ThrottlingExecutor()
+ val executor = new ThrottlingExecutor(
+ config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)))
val diskQuotaBytes = config.getLong("container.disk.quota.bytes", Long.MaxValue)
samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes)
http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
index 2659050..0276e6b 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
@@ -29,6 +29,8 @@ import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class TestThrottlingExecutor {
+ private static final long MAX_NANOS = Long.MAX_VALUE;
+
private static final Runnable NO_OP = new Runnable() {
@Override
public void run() {
@@ -42,12 +44,12 @@ public class TestThrottlingExecutor {
@Before
public void setUp() {
clock = Mockito.mock(HighResolutionClock.class);
- executor = new ThrottlingExecutor(clock);
+ executor = new ThrottlingExecutor(MAX_NANOS, clock);
}
@Test
public void testInitialState() {
- ThrottlingExecutor throttler = new ThrottlingExecutor();
+ ThrottlingExecutor throttler = new ThrottlingExecutor(MAX_NANOS);
assertEquals(0, throttler.getPendingNanos());
assertEquals(1.0, throttler.getWorkFactor());
}
@@ -66,12 +68,12 @@ public class TestThrottlingExecutor {
@Test(expected = IllegalArgumentException.class)
public void testLessThan0PercentWorkRate() {
- new ThrottlingExecutor().setWorkFactor(-0.1);
+ new ThrottlingExecutor(MAX_NANOS).setWorkFactor(-0.1);
}
@Test(expected = IllegalArgumentException.class)
public void testGreaterThan100PercentWorkRate() {
- new ThrottlingExecutor().setWorkFactor(1.1);
+ new ThrottlingExecutor(MAX_NANOS).setWorkFactor(1.1);
}
@Test
@@ -184,6 +186,24 @@ public class TestThrottlingExecutor {
}
@Test
+ public void testClampDelayMillis() throws InterruptedException {
+ final long maxDelayMillis = 10;
+ final long maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
+
+ executor = new ThrottlingExecutor(maxDelayMillis, clock);
+ executor.setWorkFactor(0.5);
+
+ // Note work time exceeds maxDelayMillis
+ setWorkTime(TimeUnit.MILLISECONDS.toNanos(100));
+ setExpectedAndActualSleepTime(maxDelayNanos, maxDelayNanos);
+
+ executor.execute(NO_OP);
+
+ verifySleepTime(maxDelayNanos);
+ assertEquals(0L, executor.getPendingNanos());
+ }
+
+ @Test
public void testDecreaseWorkFactor() {
executor.setWorkFactor(0.5);
executor.setPendingNanos(5000);