You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2016/11/11 18:20:53 UTC
samza git commit: SAMZA-1043: Samza performance improvements
Repository: samza
Updated Branches:
refs/heads/master 4660d4ddd -> 715b67d08
SAMZA-1043: Samza performance improvements
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/715b67d0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/715b67d0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/715b67d0
Branch: refs/heads/master
Commit: 715b67d08165418acdfa4ade3cd5f97a0b62e098
Parents: 4660d4d
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Wed Nov 9 11:09:32 2016 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Wed Nov 9 11:09:32 2016 -0800
----------------------------------------------------------------------
.../apache/samza/container/RunLoopFactory.java | 28 ++++--
.../org/apache/samza/task/AsyncRunLoop.java | 89 +++++++++++---------
.../apache/samza/task/CoordinatorRequests.java | 16 ++--
.../org/apache/samza/task/TaskCallbackImpl.java | 7 +-
.../apache/samza/task/TaskCallbackManager.java | 41 +++++----
.../apache/samza/util/HighResolutionClock.java | 12 +--
.../samza/util/SystemHighResolutionClock.java | 16 +---
.../apache/samza/util/ThrottlingExecutor.java | 21 ++++-
.../main/java/org/apache/samza/util/Utils.java | 59 -------------
.../org/apache/samza/config/MetricsConfig.scala | 14 ++-
.../apache/samza/container/SamzaContainer.scala | 17 +++-
.../apache/samza/system/SystemConsumers.scala | 28 ++----
.../main/scala/org/apache/samza/util/Util.scala | 7 ++
.../org/apache/samza/task/TestAsyncRunLoop.java | 8 +-
.../samza/task/TestAsyncStreamAdapter.java | 6 +-
.../apache/samza/task/TestTaskCallbackImpl.java | 2 +-
.../samza/task/TestTaskCallbackManager.java | 35 ++++----
.../samza/util/TestThrottlingExecutor.java | 32 +++----
.../kv/BaseKeyValueStorageEngineFactory.scala | 15 +++-
19 files changed, 223 insertions(+), 230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 609a956..1c66c82 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -22,6 +22,7 @@ package org.apache.samza.container;
import java.util.concurrent.ExecutorService;
import org.apache.samza.SamzaException;
import org.apache.samza.config.TaskConfig;
+import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.task.AsyncRunLoop;
import org.apache.samza.task.AsyncStreamTask;
@@ -29,10 +30,10 @@ import org.apache.samza.task.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
+import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
-import static org.apache.samza.util.Utils.defaultClock;
-import static org.apache.samza.util.Utils.defaultValue;
+import static org.apache.samza.util.Util.asScalaClock;
/**
* Factory class to create runloop for a Samza task, based on the type
@@ -50,7 +51,8 @@ public class RunLoopFactory {
ExecutorService threadPool,
long maxThrottlingDelayMs,
SamzaContainerMetrics containerMetrics,
- TaskConfig config) {
+ TaskConfig config,
+ HighResolutionClock clock) {
long taskWindowMs = config.getWindowMs().getOrElse(defaultValue(DEFAULT_WINDOW_MS));
@@ -83,7 +85,7 @@ public class RunLoopFactory {
maxThrottlingDelayMs,
taskWindowMs,
taskCommitMs,
- defaultClock());
+ asScalaClock(() -> System.nanoTime()));
} else {
Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1));
@@ -106,7 +108,23 @@ public class RunLoopFactory {
taskCommitMs,
callbackTimeout,
maxThrottlingDelayMs,
- containerMetrics);
+ containerMetrics,
+ clock);
}
}
+
+ /**
+ * Returns a default value object for scala option.getOrDefault() to use
+ * @param value default value
+ * @param <T> value type
+ * @return object containing default value
+ */
+ public static <T> AbstractFunction0<T> defaultValue(final T value) {
+ return new AbstractFunction0<T>() {
+ @Override
+ public T apply() {
+ return value;
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 8fac815..ba1e1d9 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -39,6 +39,7 @@ import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
+import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
@@ -55,7 +56,7 @@ import scala.collection.JavaConversions;
public class AsyncRunLoop implements Runnable, Throttleable {
private static final Logger log = LoggerFactory.getLogger(AsyncRunLoop.class);
- private final Map<TaskName, AsyncTaskWorker> taskWorkers;
+ private final List<AsyncTaskWorker> taskWorkers;
private final SystemConsumers consumerMultiplexer;
private final Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToTaskWorkerMapping;
@@ -72,6 +73,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
private final ThrottlingScheduler callbackExecutor;
private volatile boolean shutdownNow = false;
private volatile Throwable throwable = null;
+ private final HighResolutionClock clock;
public AsyncRunLoop(Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances,
ExecutorService threadPool,
@@ -81,7 +83,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
long commitMs,
long callbackTimeoutMs,
long maxThrottlingDelayMs,
- SamzaContainerMetrics containerMetrics) {
+ SamzaContainerMetrics containerMetrics,
+ HighResolutionClock clock) {
this.threadPool = threadPool;
this.consumerMultiplexer = consumerMultiplexer;
@@ -95,13 +98,14 @@ public class AsyncRunLoop implements Runnable, Throttleable {
this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet());
this.latch = new Object();
this.workerTimer = Executors.newSingleThreadScheduledExecutor();
+ this.clock = clock;
Map<TaskName, AsyncTaskWorker> workers = new HashMap<>();
for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) {
workers.put(task.taskName(), new AsyncTaskWorker(task));
}
// Partions and tasks assigned to the container will not change during the run loop life time
- this.taskWorkers = Collections.unmodifiableMap(workers);
- this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, taskWorkers));
+ this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, workers));
+ this.taskWorkers = Collections.unmodifiableList(new ArrayList<>(workers.values()));
}
/**
@@ -130,11 +134,11 @@ public class AsyncRunLoop implements Runnable, Throttleable {
@Override
public void run() {
try {
- for (AsyncTaskWorker taskWorker : taskWorkers.values()) {
+ for (AsyncTaskWorker taskWorker : taskWorkers) {
taskWorker.init();
}
- long prevNs = System.nanoTime();
+ long prevNs = clock.nanoTime();
while (!shutdownNow) {
if (throwable != null) {
@@ -142,25 +146,29 @@ public class AsyncRunLoop implements Runnable, Throttleable {
throw new SamzaException(throwable);
}
- long startNs = System.nanoTime();
+ long startNs = clock.nanoTime();
IncomingMessageEnvelope envelope = chooseEnvelope();
- long chooseNs = System.nanoTime();
+ long chooseNs = clock.nanoTime();
containerMetrics.chooseNs().update(chooseNs - startNs);
runTasks(envelope);
- long blockNs = System.nanoTime();
+ long blockNs = clock.nanoTime();
blockIfBusy(envelope);
- long currentNs = System.nanoTime();
+ long currentNs = clock.nanoTime();
long activeNs = blockNs - chooseNs;
long totalNs = currentNs - prevNs;
prevNs = currentNs;
containerMetrics.blockNs().update(currentNs - blockNs);
- containerMetrics.utilization().set(((double) activeNs) / totalNs);
+
+ if (totalNs != 0) {
+ // totalNs is not 0 if timer metrics are enabled
+ containerMetrics.utilization().set(((double) activeNs) / totalNs);
+ }
}
} finally {
workerTimer.shutdown();
@@ -214,7 +222,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
}
}
- for (AsyncTaskWorker worker: taskWorkers.values()) {
+ for (AsyncTaskWorker worker: taskWorkers) {
worker.run();
}
}
@@ -227,7 +235,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
private void blockIfBusy(IncomingMessageEnvelope envelope) {
synchronized (latch) {
while (!shutdownNow && throwable == null) {
- for (AsyncTaskWorker worker : taskWorkers.values()) {
+ for (AsyncTaskWorker worker : taskWorkers) {
if (worker.state.isReady()) {
// should continue running if any worker state is ready
// consumerMultiplexer will block on polling for empty partitions so it won't cause busy loop
@@ -310,7 +318,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) {
this.task = task;
- this.callbackManager = new TaskCallbackManager(this, task.metrics(), callbackTimer, callbackTimeoutMs);
+ this.callbackManager = new TaskCallbackManager(this, callbackTimer, callbackTimeoutMs, maxConcurrency, clock);
Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task);
this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet);
}
@@ -430,9 +438,9 @@ public class AsyncRunLoop implements Runnable, Throttleable {
containerMetrics.windows().inc();
ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
- long startTime = System.nanoTime();
+ long startTime = clock.nanoTime();
task.window(coordinator);
- containerMetrics.windowNs().update(System.nanoTime() - startTime);
+ containerMetrics.windowNs().update(clock.nanoTime() - startTime);
coordinatorRequests.update(coordinator);
state.doneWindowOrCommit();
@@ -466,9 +474,9 @@ public class AsyncRunLoop implements Runnable, Throttleable {
try {
containerMetrics.commits().inc();
- long startTime = System.nanoTime();
+ long startTime = clock.nanoTime();
task.commit();
- containerMetrics.commitNs().update(System.nanoTime() - startTime);
+ containerMetrics.commitNs().update(clock.nanoTime() - startTime);
state.doneWindowOrCommit();
} catch (Throwable t) {
@@ -497,17 +505,17 @@ public class AsyncRunLoop implements Runnable, Throttleable {
*/
@Override
public void onComplete(final TaskCallback callback) {
- long workNanos = System.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs;
+ long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs;
callbackExecutor.schedule(new Runnable() {
@Override
public void run() {
try {
state.doneProcess();
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
- containerMetrics.processNs().update(System.nanoTime() - callbackImpl.timeCreatedNs);
+ containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.timeCreatedNs);
log.trace("Got callback complete for task {}, ssp {}", callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
- TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl, true);
+ TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl);
if (callbackToUpdate != null) {
IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
@@ -540,7 +548,6 @@ public class AsyncRunLoop implements Runnable, Throttleable {
abort(t);
// update pending count, but not offset
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
- callbackManager.updateCallback(callbackImpl, false);
log.error("Got callback failure for task {}", callbackImpl.taskName);
} catch (Throwable e) {
log.error(e.getMessage(), e);
@@ -564,7 +571,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
private volatile boolean endOfStream = false;
private volatile boolean windowOrCommitInFlight = false;
private final AtomicInteger messagesInFlight = new AtomicInteger(0);
- private final ArrayDeque<PendingEnvelope> pendingEnvelopQueue;
+ private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue;
//Set of SSPs that we are currently processing for this task instance
private final Set<SystemStreamPartition> processingSspSet;
private final TaskName taskName;
@@ -573,21 +580,20 @@ public class AsyncRunLoop implements Runnable, Throttleable {
AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics, Set<SystemStreamPartition> sspSet) {
this.taskName = taskName;
this.taskMetrics = taskMetrics;
- this.pendingEnvelopQueue = new ArrayDeque<>();
+ this.pendingEnvelopeQueue = new ArrayDeque<>();
this.processingSspSet = sspSet;
}
private boolean checkEndOfStream() {
- PendingEnvelope pendingEnvelope = pendingEnvelopQueue.peek();
-
- if (pendingEnvelope != null) {
+ if (pendingEnvelopeQueue.size() == 1) {
+ PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
if (envelope.isEndOfStream()) {
SystemStreamPartition ssp = envelope.getSystemStreamPartition();
processingSspSet.remove(ssp);
- pendingEnvelopQueue.remove();
+ pendingEnvelopeQueue.remove();
}
}
return processingSspSet.isEmpty();
@@ -597,8 +603,13 @@ public class AsyncRunLoop implements Runnable, Throttleable {
* Returns whether the task is ready to do process/window/commit.
*/
private boolean isReady() {
- endOfStream |= checkEndOfStream();
- needCommit |= coordinatorRequests.commitRequests().remove(taskName);
+ if (checkEndOfStream()) {
+ endOfStream = true;
+ }
+ if (coordinatorRequests.commitRequests().remove(taskName)) {
+ needCommit = true;
+ }
+
if (needWindow || needCommit || endOfStream) {
// ready for window or commit only when no messages are in progress and
// no window/commit in flight
@@ -621,7 +632,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
if (needCommit) return WorkerOp.COMMIT;
else if (needWindow) return WorkerOp.WINDOW;
else if (endOfStream) return WorkerOp.END_OF_STREAM;
- else if (!pendingEnvelopQueue.isEmpty()) return WorkerOp.PROCESS;
+ else if (!pendingEnvelopeQueue.isEmpty()) return WorkerOp.PROCESS;
}
return WorkerOp.NO_OP;
}
@@ -645,7 +656,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
}
private void startProcess() {
- messagesInFlight.incrementAndGet();
+ int count = messagesInFlight.incrementAndGet();
+ taskMetrics.messagesInFlight().set(count);
}
private void doneWindowOrCommit() {
@@ -653,7 +665,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
}
private void doneProcess() {
- messagesInFlight.decrementAndGet();
+ int count = messagesInFlight.decrementAndGet();
+ taskMetrics.messagesInFlight().set(count);
}
/**
@@ -662,8 +675,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
* @param pendingEnvelope
*/
private void insertEnvelope(PendingEnvelope pendingEnvelope) {
- pendingEnvelopQueue.add(pendingEnvelope);
- int queueSize = pendingEnvelopQueue.size();
+ pendingEnvelopeQueue.add(pendingEnvelope);
+ int queueSize = pendingEnvelopeQueue.size();
taskMetrics.pendingMessages().set(queueSize);
log.trace("Insert envelope to task {} queue.", taskName);
log.debug("Task {} pending envelope count is {} after insertion.", taskName, queueSize);
@@ -682,8 +695,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
* @return
*/
private IncomingMessageEnvelope fetchEnvelope() {
- PendingEnvelope pendingEnvelope = pendingEnvelopQueue.remove();
- int queueSize = pendingEnvelopQueue.size();
+ PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.remove();
+ int queueSize = pendingEnvelopeQueue.size();
taskMetrics.pendingMessages().set(queueSize);
log.trace("fetch envelope ssp {} offset {} to process.", pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset());
log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize);
@@ -691,7 +704,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
if (pendingEnvelope.markProcessed()) {
SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition();
consumerMultiplexer.tryUpdate(partition);
- log.debug("Update chooser for " + partition);
+ log.debug("Update chooser for {}", partition);
}
return pendingEnvelope.envelope;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
index 052b3b9..0283d67 100644
--- a/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
+++ b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
@@ -19,10 +19,8 @@
package org.apache.samza.task;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Set;
-
+import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.samza.container.TaskName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,8 +35,8 @@ public class CoordinatorRequests {
private static final Logger log = LoggerFactory.getLogger(CoordinatorRequests.class);
private final Set<TaskName> taskNames;
- private final Set<TaskName> taskShutdownRequests = Collections.synchronizedSet(new HashSet<TaskName>());
- private final Set<TaskName> taskCommitRequests = Collections.synchronizedSet(new HashSet<TaskName>());
+ private final Set<TaskName> taskShutdownRequests = new CopyOnWriteArraySet<>();
+ private final Set<TaskName> taskCommitRequests = new CopyOnWriteArraySet<>();
volatile private boolean shutdownNow = false;
public CoordinatorRequests(Set<TaskName> taskNames) {
@@ -67,18 +65,18 @@ public class CoordinatorRequests {
*/
private void checkCoordinator(ReadableCoordinator coordinator) {
if (coordinator.requestedCommitTask()) {
- log.info("Task " + coordinator.taskName() + " requested commit for current task only");
+ log.debug("Task {} requested commit for current task only", coordinator.taskName());
taskCommitRequests.add(coordinator.taskName());
}
if (coordinator.requestedCommitAll()) {
- log.info("Task " + coordinator.taskName() + " requested commit for all tasks in the container");
+ log.debug("Task {} requested commit for all tasks in the container", coordinator.taskName());
taskCommitRequests.addAll(taskNames);
}
if (coordinator.requestedShutdownOnConsensus()) {
taskShutdownRequests.add(coordinator.taskName());
- log.info("Shutdown has now been requested by tasks " + taskShutdownRequests);
+ log.info("Shutdown has now been requested by tasks {}", taskShutdownRequests);
}
if (coordinator.requestedShutdownNow() || taskShutdownRequests.size() == taskNames.size()) {
@@ -86,4 +84,4 @@ public class CoordinatorRequests {
shutdownNow = true;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
index 9b70099..19b9f1c 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
@@ -48,13 +48,14 @@ class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
TaskName taskName,
IncomingMessageEnvelope envelope,
ReadableCoordinator coordinator,
- long seqNum) {
+ long seqNum,
+ long timeCreatedNs) {
this.listener = listener;
this.taskName = taskName;
this.envelope = envelope;
this.coordinator = coordinator;
this.seqNum = seqNum;
- this.timeCreatedNs = System.nanoTime();
+ this.timeCreatedNs = timeCreatedNs;
}
@Override
@@ -101,4 +102,4 @@ class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
boolean matchSeqNum(long seqNum) {
return this.seqNum == seqNum;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
index 132cf59..5bce778 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -24,10 +24,9 @@ import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.util.HighResolutionClock;
/**
@@ -86,27 +85,29 @@ class TaskCallbackManager {
}
private long seqNum = 0L;
- private final AtomicInteger pendingCount = new AtomicInteger(0);
- private final TaskCallbacks completeCallbacks = new TaskCallbacks();
- private final TaskInstanceMetrics metrics;
+ private final TaskCallbacks completedCallbacks = new TaskCallbacks();
private final ScheduledExecutorService timer;
private final TaskCallbackListener listener;
- private long timeout;
-
- public TaskCallbackManager(TaskCallbackListener listener, TaskInstanceMetrics metrics, ScheduledExecutorService timer, long timeout) {
+ private final long timeout;
+ private final int maxConcurrency;
+ private final HighResolutionClock clock;
+
+ public TaskCallbackManager(TaskCallbackListener listener,
+ ScheduledExecutorService timer,
+ long timeout,
+ int maxConcurrency,
+ HighResolutionClock clock) {
this.listener = listener;
- this.metrics = metrics;
this.timer = timer;
this.timeout = timeout;
+ this.maxConcurrency = maxConcurrency;
+ this.clock = clock;
}
public TaskCallbackImpl createCallback(TaskName taskName,
IncomingMessageEnvelope envelope,
ReadableCoordinator coordinator) {
- final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++);
- int count = pendingCount.incrementAndGet();
- metrics.messagesInFlight().set(count);
-
+ final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++, clock.nanoTime());
if (timer != null) {
Runnable timerTask = new Runnable() {
@Override
@@ -126,16 +127,14 @@ class TaskCallbackManager {
* Update the task callbacks with the new callback completed.
* It uses a high-watermark model to roll the callbacks for checkpointing.
* @param callback new completed callback
- * @param success callback result status
* @return the callback for checkpointing
*/
- public TaskCallbackImpl updateCallback(TaskCallbackImpl callback, boolean success) {
- TaskCallbackImpl callbackToCommit = null;
- if (success) {
- callbackToCommit = completeCallbacks.update(callback);
+ public TaskCallbackImpl updateCallback(TaskCallbackImpl callback) {
+ if (maxConcurrency > 1) {
+ // Use the completedCallbacks queue to handle the out-of-order case when max concurrency is larger than 1
+ return completedCallbacks.update(callback);
+ } else {
+ return callback;
}
- int count = pendingCount.decrementAndGet();
- metrics.messagesInFlight().set(count);
- return callbackToCommit;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
index 69ba441..6d40149 100644
--- a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
+++ b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
@@ -25,7 +25,7 @@ package org.apache.samza.util;
* <p>
* Instances of this interface must be thread-safe.
*/
-interface HighResolutionClock {
+public interface HighResolutionClock {
/**
* Returns a time point that can be used to calculate the difference in nanoseconds with another
* time point. Resolution of the timer is platform dependent and not guaranteed to actually
@@ -34,14 +34,4 @@ interface HighResolutionClock {
* @return current time point in nanoseconds
*/
long nanoTime();
-
- /**
- * Sleeps for a period of time that approximates the requested number of nanoseconds. Actual sleep
- * time can vary significantly based on the JVM implementation and platform. This function returns
- * the measured error between expected and actual sleep time.
- *
- * @param nanos the number of nanoseconds to sleep.
- * @throws InterruptedException if the current thread is interrupted while blocked in this method.
- */
- long sleep(long nanos) throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
index 2e65b60..6bfe7c6 100644
--- a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
+++ b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
@@ -19,23 +19,9 @@
package org.apache.samza.util;
-import java.util.concurrent.TimeUnit;
-
class SystemHighResolutionClock implements HighResolutionClock {
@Override
public long nanoTime() {
return System.nanoTime();
}
-
- @Override
- public long sleep(long nanos) throws InterruptedException {
- if (nanos <= 0) {
- return nanos;
- }
-
- final long start = System.nanoTime();
- TimeUnit.NANOSECONDS.sleep(nanos);
-
- return Util.clampAdd(nanos, -(System.nanoTime() - start));
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
index d1298fc..eb956f2 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
@@ -74,7 +74,7 @@ public class ThrottlingExecutor implements Throttleable, Executor {
Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor)));
if (pendingNanos > 0) {
try {
- pendingNanos = clock.sleep(pendingNanos);
+ pendingNanos = sleep(pendingNanos);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -122,4 +122,23 @@ public class ThrottlingExecutor implements Throttleable, Executor {
void setPendingNanos(long pendingNanos) {
this.pendingNanos = pendingNanos;
}
+
+ /**
+ * Sleeps for a period of time that approximates the requested number of nanoseconds. Actual sleep
+ * time can vary significantly based on the JVM implementation and platform. This function returns
+ * the measured error between expected and actual sleep time.
+ *
+ * @param nanos the number of nanoseconds to sleep.
+ * @throws InterruptedException if the current thread is interrupted while blocked in this method.
+ */
+ long sleep(long nanos) throws InterruptedException {
+ if (nanos <= 0) {
+ return nanos;
+ }
+
+ final long start = System.nanoTime();
+ TimeUnit.NANOSECONDS.sleep(nanos);
+
+ return Util.clampAdd(nanos, -(System.nanoTime() - start));
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/Utils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/Utils.java b/samza-core/src/main/java/org/apache/samza/util/Utils.java
deleted file mode 100644
index 472e0a5..0000000
--- a/samza-core/src/main/java/org/apache/samza/util/Utils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-
-
-public class Utils {
- private static final Logger log = LoggerFactory.getLogger(Utils.class);
-
- private Utils() {}
-
- /**
- * Returns a default value object for scala option.getOrDefault() to use
- * @param value default value
- * @param <T> value type
- * @return object containing default value
- */
- public static <T> AbstractFunction0<T> defaultValue(final T value) {
- return new AbstractFunction0<T>() {
- @Override
- public T apply() {
- return value;
- }
- };
- }
-
- /**
- * Creates a nanosecond clock using default system nanotime
- * @return object invokes the system clock
- */
- public static AbstractFunction0<Object> defaultClock() {
- return new AbstractFunction0<Object>() {
- @Override
- public Object apply() {
- return System.nanoTime();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
index c3fd8bf..e9b6b76 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
@@ -18,7 +18,10 @@
*/
package org.apache.samza.config
-import scala.collection.JavaConversions._
+
+
+import org.apache.samza.util.HighResolutionClock
+
object MetricsConfig {
// metrics config constants
@@ -26,6 +29,7 @@ object MetricsConfig {
val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class"
val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream"
val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval"
+ val METRICS_TIMER_ENABLED= "metrics.timer.enabled"
implicit def Config2Metrics(config: Config) = new MetricsConfig(config)
}
@@ -53,4 +57,10 @@ class MetricsConfig(config: Config) extends ScalaMapConfig(config) {
case _ => List[String]()
}
}
-}
+
+ /**
+ * Returns the flag to turn on/off the timer metrics.
+ * @return Boolean flag to enable the timer metrics
+ */
+ def getMetricsTimerEnabled: Boolean = getBoolean(MetricsConfig.METRICS_TIMER_ENABLED, true)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e0468ee..c35da92 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -74,7 +74,9 @@ import org.apache.samza.task.AsyncStreamTask
import org.apache.samza.task.AsyncStreamTaskAdapter
import org.apache.samza.task.StreamTask
import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.util.HighResolutionClock
import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Throttleable, Util}
+import org.apache.samza.util.Util.asScalaClock
import scala.collection.JavaConversions._
@@ -150,6 +152,15 @@ object SamzaContainer extends Logging {
val systemProducersMetrics = new SystemProducersMetrics(registry)
val systemConsumersMetrics = new SystemConsumersMetrics(registry)
val offsetManagerMetrics = new OffsetManagerMetrics(registry)
+ val clock = if (config.getMetricsTimerEnabled) {
+ new HighResolutionClock {
+ override def nanoTime(): Long = System.nanoTime()
+ }
+ } else {
+ new HighResolutionClock {
+ override def nanoTime(): Long = 0L
+ }
+ }
val inputSystemStreamPartitions = containerModel
.getTasks
@@ -383,7 +394,8 @@ object SamzaContainer extends Logging {
serdeManager = serdeManager,
metrics = systemConsumersMetrics,
dropDeserializationError = dropDeserializationError,
- pollIntervalMs = pollIntervalMs)
+ pollIntervalMs = pollIntervalMs,
+ clock = clock)
val producerMultiplexer = new SystemProducers(
producers = producers,
@@ -559,7 +571,8 @@ object SamzaContainer extends Logging {
taskThreadPool,
maxThrottlingDelayMs,
samzaContainerMetrics,
- config)
+ config,
+ clock)
val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener {
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index e2aed5b..17d163d 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -62,12 +62,12 @@ class SystemConsumers (
/**
* The class that handles deserialization of incoming messages.
*/
- serdeManager: SerdeManager,
+ serdeManager: SerdeManager = new SerdeManager,
/**
* A helper class to hold all of SystemConsumers' metrics.
*/
- metrics: SystemConsumersMetrics,
+ metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
/**
* If MessageChooser returns null when it's polled, SystemConsumers will
@@ -76,14 +76,14 @@ class SystemConsumers (
* thread will sit in a tight loop polling every SystemConsumer over and
* over again if no new messages are available.
*/
- noNewMessagesTimeout: Int,
+ noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
/**
* This parameter is to define how to deal with deserialization failure. If
* set to true, the task will skip the messages when deserialization fails.
* If set to false, the task will throw SamzaException and fail the container.
*/
- dropDeserializationError: Boolean,
+ dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
/**
* <p>Defines an upper bound for how long the SystemConsumers will wait
@@ -99,29 +99,13 @@ class SystemConsumers (
* with no remaining unprocessed messages, the SystemConsumers will poll for
* it within 50ms of its availability in the stream system.</p>
*/
- val pollIntervalMs: Int,
+ val pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS,
/**
* Clock can be used to inject a custom clock when mocking this class in
* tests. The default implementation returns the current system clock time.
*/
- val clock: () => Long) extends Logging with TimerUtils {
-
- def this(chooser: MessageChooser,
- consumers: Map[String, SystemConsumer],
- serdeManager: SerdeManager = new SerdeManager,
- metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
- noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
- dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
- pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS) =
- this(chooser,
- consumers,
- serdeManager,
- metrics,
- noNewMessagesTimeout,
- dropDeserializationError,
- pollIntervalMs,
- () => System.nanoTime())
+ val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtils {
/**
* A buffer of incoming messages grouped by SystemStreamPartition. These
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index c4836f2..9019d02 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -388,4 +388,11 @@ object Util extends Logging {
sum
}
+
+ /**
+ * Implicitly convert the Java TimerClock to Scala clock function which returns long timestamp.
+ * @param c Java TimeClock
+ * @return Scala clock function
+ */
+ implicit def asScalaClock(c: HighResolutionClock): () => Long = () => c.nanoTime()
}
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 6000ffa..cc3e1b7 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.config.Config;
@@ -47,7 +46,6 @@ import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.TestSystemConsumers;
-
import org.junit.Before;
import org.junit.Test;
import scala.Option;
@@ -101,7 +99,8 @@ public class TestAsyncRunLoop {
commitMs,
callbackTimeoutMs,
maxThrottlingDelayMs,
- containerMetrics);
+ containerMetrics,
+ () -> 0L);
}
TaskInstance<AsyncStreamTask> createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) {
@@ -494,7 +493,8 @@ public class TestAsyncRunLoop {
commitMs,
callbackTimeoutMs,
maxThrottlingDelayMs,
- containerMetrics);
+ containerMetrics,
+ () -> 0L);
runLoop.run();
callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
index 99e1e18..d0b820a 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
@@ -93,7 +93,7 @@ public class TestAsyncStreamAdapter {
public void testAdapterWithoutThreadPool() throws Exception {
taskAdaptor = new AsyncStreamTaskAdapter(task, null);
TestCallbackListener listener = new TestCallbackListener();
- TaskCallback callback = new TaskCallbackImpl(listener, null, envelope, null, 0L);
+ TaskCallback callback = new TaskCallbackImpl(listener, null, envelope, null, 0L, 0L);
taskAdaptor.init(null, null);
assertTrue(task.inited);
@@ -116,10 +116,10 @@ public class TestAsyncStreamAdapter {
@Test
public void testAdapterWithThreadPool() throws Exception {
TestCallbackListener listener1 = new TestCallbackListener();
- TaskCallback callback1 = new TaskCallbackImpl(listener1, null, envelope, null, 0L);
+ TaskCallback callback1 = new TaskCallbackImpl(listener1, null, envelope, null, 0L, 0L);
TestCallbackListener listener2 = new TestCallbackListener();
- TaskCallback callback2 = new TaskCallbackImpl(listener2, null, envelope, null, 1L);
+ TaskCallback callback2 = new TaskCallbackImpl(listener2, null, envelope, null, 1L, 0L);
ExecutorService executor = Executors.newFixedThreadPool(2);
taskAdaptor = new AsyncStreamTaskAdapter(task, executor);
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java
index f1dbf35..732405b 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java
@@ -61,7 +61,7 @@ public class TestTaskCallbackImpl {
}
};
- callback = new TaskCallbackImpl(listener, null, mock(IncomingMessageEnvelope.class), null, 0);
+ callback = new TaskCallbackImpl(listener, null, mock(IncomingMessageEnvelope.class), null, 0L, 0L);
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
index d7110f3..b2ed316 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
@@ -48,8 +48,7 @@ public class TestTaskCallbackManager {
public void onFailure(TaskCallback callback, Throwable t) {
}
};
- callbackManager = new TaskCallbackManager(listener, metrics, null, -1);
-
+ callbackManager = new TaskCallbackManager(listener, null, -1, 2, () -> System.nanoTime());
}
@Test
@@ -68,15 +67,15 @@ public class TestTaskCallbackManager {
ReadableCoordinator coordinator = new ReadableCoordinator(taskName);
IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
- TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0);
- TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback0, true);
+ TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
+ TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback0);
assertTrue(callbackToCommit.matchSeqNum(0));
assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
assertEquals("0", callbackToCommit.envelope.getOffset());
IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
- TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1);
- callbackToCommit = callbackManager.updateCallback(callback1, true);
+ TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1, 0);
+ callbackToCommit = callbackManager.updateCallback(callback1);
assertTrue(callbackToCommit.matchSeqNum(1));
assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
assertEquals("1", callbackToCommit.envelope.getOffset());
@@ -90,18 +89,18 @@ public class TestTaskCallbackManager {
// simulate out of order
IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp, "2", null, null);
- TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator, 2);
- TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2, true);
+ TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator, 2, 0);
+ TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2);
assertNull(callbackToCommit);
IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
- TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1);
- callbackToCommit = callbackManager.updateCallback(callback1, true);
+ TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1, 0);
+ callbackToCommit = callbackManager.updateCallback(callback1);
assertNull(callbackToCommit);
IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
- TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0);
- callbackToCommit = callbackManager.updateCallback(callback0, true);
+ TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
+ callbackToCommit = callbackManager.updateCallback(callback0);
assertTrue(callbackToCommit.matchSeqNum(2));
assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
assertEquals("2", callbackToCommit.envelope.getOffset());
@@ -117,21 +116,21 @@ public class TestTaskCallbackManager {
IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp, "2", null, null);
ReadableCoordinator coordinator2 = new ReadableCoordinator(taskName);
coordinator2.shutdown(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
- TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator2, 2);
- TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2, true);
+ TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator2, 2, 0);
+ TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2);
assertNull(callbackToCommit);
IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
ReadableCoordinator coordinator1 = new ReadableCoordinator(taskName);
coordinator1.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
- TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator1, 1);
- callbackToCommit = callbackManager.updateCallback(callback1, true);
+ TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator1, 1, 0);
+ callbackToCommit = callbackManager.updateCallback(callback1);
assertNull(callbackToCommit);
IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
ReadableCoordinator coordinator = new ReadableCoordinator(taskName);
- TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0);
- callbackToCommit = callbackManager.updateCallback(callback0, true);
+ TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
+ callbackToCommit = callbackManager.updateCallback(callback0);
assertTrue(callbackToCommit.matchSeqNum(1));
assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
assertEquals("1", callbackToCommit.envelope.getOffset());
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
index 0276e6b..ca500fb 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import java.util.concurrent.TimeUnit;
+
public class TestThrottlingExecutor {
private static final long MAX_NANOS = Long.MAX_VALUE;
@@ -44,7 +45,7 @@ public class TestThrottlingExecutor {
@Before
public void setUp() {
clock = Mockito.mock(HighResolutionClock.class);
- executor = new ThrottlingExecutor(MAX_NANOS, clock);
+ executor = Mockito.spy(new ThrottlingExecutor(MAX_NANOS, clock));
}
@Test
@@ -85,7 +86,7 @@ public class TestThrottlingExecutor {
assertEquals(0L, executor.getPendingNanos());
// At 100% work rate sleep should not be called
- Mockito.verify(clock, Mockito.never()).sleep(Mockito.anyLong());
+ Mockito.verify(executor, Mockito.never()).sleep(Mockito.anyLong());
}
@Test
@@ -95,7 +96,7 @@ public class TestThrottlingExecutor {
final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
setWorkTime(workTimeNanos);
// Sleep time is same as work time at 50% work rate
- setExpectedAndActualSleepTime(workTimeNanos, workTimeNanos);
+ setActualSleepTime(workTimeNanos);
executor.execute(NO_OP);
verifySleepTime(workTimeNanos);
@@ -114,7 +115,7 @@ public class TestThrottlingExecutor {
final long delayTimeNanos = (long) (workToDelayFactor * workTimeNanos);
setWorkTime(workTimeNanos);
- setExpectedAndActualSleepTime(delayTimeNanos, delayTimeNanos);
+ setActualSleepTime(delayTimeNanos);
executor.execute(NO_OP);
@@ -131,7 +132,7 @@ public class TestThrottlingExecutor {
final long actualDelayTimeNanos = TimeUnit.MILLISECONDS.toNanos(6);
setWorkTime(workTimeNanos);
- setExpectedAndActualSleepTime(expectedDelayNanos, actualDelayTimeNanos);
+ setActualSleepTime(actualDelayTimeNanos);
executor.execute(NO_OP);
@@ -148,7 +149,7 @@ public class TestThrottlingExecutor {
final long actualDelayNanos = TimeUnit.MILLISECONDS.toNanos(4);
setWorkTime(workTimeNanos);
- setExpectedAndActualSleepTime(expectedDelayNanos, actualDelayNanos);
+ setActualSleepTime(actualDelayNanos);
executor.execute(NO_OP);
@@ -167,7 +168,7 @@ public class TestThrottlingExecutor {
// First execution
setWorkTime(workTimeNanos);
- setExpectedAndActualSleepTime(workTimeNanos, actualDelayNanos1);
+ setActualSleepTime(actualDelayNanos1);
executor.execute(NO_OP);
@@ -177,7 +178,7 @@ public class TestThrottlingExecutor {
// Second execution
setWorkTime(workTimeNanos);
- setExpectedAndActualSleepTime(workTimeNanos, actualDelayNanos2);
+ setActualSleepTime(actualDelayNanos2);
executor.execute(NO_OP);
@@ -190,12 +191,12 @@ public class TestThrottlingExecutor {
final long maxDelayMillis = 10;
final long maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
- executor = new ThrottlingExecutor(maxDelayMillis, clock);
+ executor = Mockito.spy(new ThrottlingExecutor(maxDelayMillis, clock));
executor.setWorkFactor(0.5);
// Note work time exceeds maxDelayMillis
setWorkTime(TimeUnit.MILLISECONDS.toNanos(100));
- setExpectedAndActualSleepTime(maxDelayNanos, maxDelayNanos);
+ setActualSleepTime(maxDelayNanos);
executor.execute(NO_OP);
@@ -221,6 +222,7 @@ public class TestThrottlingExecutor {
// At a 50% work factor we'd expect work and sleep to match. As they don't, the function will
// try to increment the pending sleep nanos, which could (but should not) result in overflow.
setWorkTime(5000);
+ setActualSleepTime(Long.MAX_VALUE);
executor.execute(NO_OP);
@@ -241,7 +243,7 @@ public class TestThrottlingExecutor {
executor.execute(NO_OP);
// Sleep should not be called with negative pending nanos
- Mockito.verify(clock, Mockito.never()).sleep(Mockito.anyLong());
+ Mockito.verify(executor, Mockito.never()).sleep(Mockito.anyLong());
assertEquals(-1000 + 500, executor.getPendingNanos());
}
@@ -253,6 +255,7 @@ public class TestThrottlingExecutor {
assertEquals(-1000, executor.getPendingNanos());
setWorkTime(1250);
+ setActualSleepTime(1250 + startPendingNanos);
executor.execute(NO_OP);
@@ -264,12 +267,11 @@ public class TestThrottlingExecutor {
Mockito.when(clock.nanoTime()).thenReturn(0L).thenReturn(workTimeNanos);
}
- private void setExpectedAndActualSleepTime(long expectedDelayTimeNanos, long actualDelayTimeNanos) throws InterruptedException {
- Mockito.when(clock.sleep(expectedDelayTimeNanos))
- .thenReturn(expectedDelayTimeNanos - actualDelayTimeNanos);
+ private void setActualSleepTime(long actualDelayTimeNanos) throws InterruptedException {
+ Mockito.when(executor.sleep(Mockito.anyLong())).thenAnswer(invocation -> (long) invocation.getArguments()[0] - actualDelayTimeNanos);
}
private void verifySleepTime(long expectedDelayTimeNanos) throws InterruptedException {
- Mockito.verify(clock).sleep(expectedDelayTimeNanos);
+ Mockito.verify(executor).sleep(expectedDelayTimeNanos);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index c975893..8ffc817 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -28,6 +28,9 @@ import org.apache.samza.serializers.Serde
import org.apache.samza.storage.{StoreProperties, StorageEngine, StorageEngineFactory}
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.task.MessageCollector
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.util.HighResolutionClock
+import org.apache.samza.util.Util.asScalaClock
/**
* A key value storage engine factory implementation
@@ -132,7 +135,17 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
// create the storage engine and return
// TODO: Decide if we should use raw bytes when restoring
val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
- new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize)
+ val clock = if (containerContext.config.getMetricsTimerEnabled) {
+ new HighResolutionClock {
+ override def nanoTime(): Long = System.nanoTime()
+ }
+ } else {
+ new HighResolutionClock {
+ override def nanoTime(): Long = 0L
+ }
+ }
+
+ new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize, clock)
}
}