You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/27 22:55:17 UTC
samza git commit: SAMZA-1219;
Add metrics for operator message received and execution times
Repository: samza
Updated Branches:
refs/heads/master f7e173651 -> 92ae4c628
SAMZA-1219; Add metrics for operator message received and execution times
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #142 from prateekm/operator-metrics
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/92ae4c62
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/92ae4c62
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/92ae4c62
Branch: refs/heads/master
Commit: 92ae4c628abb3d113520ec47ca82f08c480123ad
Parents: f7e1736
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Thu Apr 27 15:55:11 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Thu Apr 27 15:55:11 2017 -0700
----------------------------------------------------------------------
.../samza/operators/impl/OperatorImpl.java | 136 +++++++++--
.../samza/operators/impl/OperatorImplGraph.java | 34 ++-
.../operators/impl/PartialJoinOperatorImpl.java | 48 ++--
.../samza/operators/impl/RootOperatorImpl.java | 36 ++-
.../impl/SessionWindowOperatorImpl.java | 52 -----
.../samza/operators/impl/SinkOperatorImpl.java | 25 +-
.../operators/impl/StreamOperatorImpl.java | 27 ++-
.../operators/impl/WindowOperatorImpl.java | 93 +++++---
.../samza/operators/spec/OperatorSpec.java | 14 +-
.../operators/spec/PartialJoinOperatorSpec.java | 8 -
.../samza/operators/spec/SinkOperatorSpec.java | 7 -
.../operators/spec/StreamOperatorSpec.java | 7 -
.../operators/spec/WindowOperatorSpec.java | 9 -
.../apache/samza/task/StreamOperatorTask.java | 8 +-
.../samza/operators/TestJoinOperator.java | 12 +-
.../samza/operators/TestWindowOperator.java | 6 +-
.../samza/operators/impl/TestOperatorImpl.java | 226 +++++++++++++++----
.../samza/operators/impl/TestOperatorImpls.java | 29 +--
.../operators/impl/TestSinkOperatorImpl.java | 7 +-
.../operators/impl/TestStreamOperatorImpl.java | 22 +-
20 files changed, 547 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index b9a606b..d547869 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -18,9 +18,19 @@
*/
package org.apache.samza.operators.impl;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.HighResolutionClock;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -29,60 +39,140 @@ import java.util.Set;
* Abstract base class for all stream operator implementations.
*/
public abstract class OperatorImpl<M, RM> {
+ private static final String METRICS_GROUP = OperatorImpl.class.getName();
- private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>();
+ private boolean initialized;
+ private Set<OperatorImpl<RM, ?>> registeredOperators;
+ private HighResolutionClock highResClock;
+ private Counter numMessage;
+ private Timer handleMessageNs;
+ private Timer handleTimerNs;
/**
- * Register the next operator in the chain that this operator should propagate its output to.
- * @param nextOperator the next operator in the chain.
+ * Initialize this {@link OperatorImpl} and its user-defined functions.
+ *
+ * @param config the {@link Config} for the task
+ * @param context the {@link TaskContext} for the task
+ */
+ public final void init(Config config, TaskContext context) {
+ String opName = getOperatorSpec().getOpName();
+
+ if (initialized) {
+ throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName));
+ }
+
+ this.highResClock = createHighResClock(config);
+ registeredOperators = new HashSet<>();
+ MetricsRegistry metricsRegistry = context.getMetricsRegistry();
+ this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages");
+ this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns");
+ this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns");
+
+ handleInit(config, context);
+
+ initialized = true;
+ }
+
+ /**
+ * Initialize this {@link OperatorImpl} and its user-defined functions.
+ *
+ * @param config the {@link Config} for the task
+ * @param context the {@link TaskContext} for the task
+ */
+ protected abstract void handleInit(Config config, TaskContext context);
+
+ /**
+ * Register an operator that this operator should propagate its results to.
+ *
+ * @param nextOperator the next operator to propagate results to
*/
void registerNextOperator(OperatorImpl<RM, ?> nextOperator) {
- nextOperators.add(nextOperator);
+ if (!initialized) {
+ throw new IllegalStateException(
+ String.format("Attempted to register next operator before initializing operator %s.",
+ getOperatorSpec().getOpName()));
+ }
+ this.registeredOperators.add(nextOperator);
}
/**
- * Perform the transformation required for this operator and call the downstream operators.
+ * Handle the incoming {@code message} for this {@link OperatorImpl} and propagate results to registered operators.
+ * <p>
+ * Delegates to {@link #handleMessage(Object, MessageCollector, TaskCoordinator)} for handling the message.
*
- * Must call {@link #propagateResult} to propagate the output to registered downstream operators correctly.
+ * @param message the input message
+ * @param collector the {@link MessageCollector} for this message
+ * @param coordinator the {@link TaskCoordinator} for this message
+ */
+ public final void onMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
+ this.numMessage.inc();
+ long startNs = this.highResClock.nanoTime();
+ Collection<RM> results = handleMessage(message, collector, coordinator);
+ long endNs = this.highResClock.nanoTime();
+ this.handleMessageNs.update(endNs - startNs);
+
+ results.forEach(rm ->
+ this.registeredOperators.forEach(op ->
+ op.onMessage(rm, collector, coordinator)));
+ }
+
+ /**
+ * Handle the incoming {@code message} and return the results to be propagated to registered operators.
*
* @param message the input message
* @param collector the {@link MessageCollector} in the context
* @param coordinator the {@link TaskCoordinator} in the context
+ * @return results of the transformation
*/
- public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator);
+ protected abstract Collection<RM> handleMessage(M message, MessageCollector collector,
+ TaskCoordinator coordinator);
/**
- * Invoked at every tick. This method delegates to {@link #onTimer(MessageCollector, TaskCoordinator)}
+ * Handle timer ticks for this {@link OperatorImpl} and propagate the results and timer tick to registered operators.
+ * <p>
+ * Delegates to {@link #handleTimer(MessageCollector, TaskCoordinator)} for handling the timer tick.
*
* @param collector the {@link MessageCollector} in the context
* @param coordinator the {@link TaskCoordinator} in the context
*/
- public final void onTick(MessageCollector collector, TaskCoordinator coordinator) {
- onTimer(collector, coordinator);
- nextOperators.forEach(sub -> sub.onTick(collector, coordinator));
+ public final void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+ long startNs = this.highResClock.nanoTime();
+ Collection<RM> results = handleTimer(collector, coordinator);
+ long endNs = this.highResClock.nanoTime();
+ this.handleTimerNs.update(endNs - startNs);
+
+ results.forEach(rm ->
+ this.registeredOperators.forEach(op ->
+ op.onMessage(rm, collector, coordinator)));
+ this.registeredOperators.forEach(op ->
+ op.onTimer(collector, coordinator));
}
/**
- * Invoked at every tick. Implementations must call {@link #propagateResult} to propagate any generated output
- * to registered downstream operators.
+ * Handle the the timer tick for this operator and return the results to be propagated to registered operators.
+ * <p>
+ * Defaults to a no-op implementation.
*
* @param collector the {@link MessageCollector} in the context
* @param coordinator the {@link TaskCoordinator} in the context
+ * @return results of the timed operation
*/
- public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+ protected Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
+ return Collections.emptyList();
}
/**
- * Helper method to propagate the output of this operator to all registered downstream operators.
- *
- * This method <b>must</b> be called from {@link #onNext} and {@link #onTimer}
- * to propagate the operator output correctly.
+ * Get the {@link OperatorSpec} for this {@link OperatorImpl}.
*
- * @param outputMessage output message
- * @param collector the {@link MessageCollector} in the context
- * @param coordinator the {@link TaskCoordinator} in the context
+ * @return the {@link OperatorSpec} for this {@link OperatorImpl}
*/
- void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) {
- nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator));
+ protected abstract OperatorSpec<RM> getOperatorSpec();
+
+ private HighResolutionClock createHighResClock(Config config) {
+ if (new MetricsConfig(config).getMetricsTimerEnabled()) {
+ return System::nanoTime;
+ } else {
+ return () -> 0;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 8e492dc..d8ea592 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -104,20 +104,22 @@ public class OperatorImplGraph {
* creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
*
* @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
- * @param <M> the type of messagess in the {@code source} {@link MessageStreamImpl}
* @param config the {@link Config} required to instantiate operators
* @param context the {@link TaskContext} required to instantiate operators
+ * @param <M> the type of messages in the {@code source} {@link MessageStreamImpl}
* @return root node for the {@link OperatorImpl} DAG
*/
- private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config, TaskContext context) {
+ private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source,
+ Config config, TaskContext context) {
// since the source message stream might have multiple operator specs registered on it,
// create a new root node as a single point of entry for the DAG.
RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
+ rootOperator.init(config, context);
// create the pipeline/topology starting from the source
source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
- // pass in the source and context s.t. stateful stream operators can initialize their stores
+ // pass in the context so that operator implementations can initialize their functions
OperatorImpl<M, ?> operatorImpl =
- createAndRegisterOperatorImpl(registeredOperator, source, config, context);
+ createAndRegisterOperatorImpl(registeredOperator, config, context);
rootOperator.registerNextOperator(operatorImpl);
});
return rootOperator;
@@ -127,27 +129,26 @@ public class OperatorImplGraph {
* Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
* {@link OperatorImpl}s.
*
- * @param operatorSpec the operatorSpec registered with the {@code source}
- * @param source the source {@link MessageStreamImpl}
- * @param <M> type of input message
+ * @param operatorSpec the operatorSpec to create the {@link OperatorImpl} for
* @param config the {@link Config} required to instantiate operators
* @param context the {@link TaskContext} required to instantiate operators
+ * @param <M> type of input message
* @return the operator implementation for the operatorSpec
*/
private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
- MessageStreamImpl<M> source, Config config, TaskContext context) {
+ Config config, TaskContext context) {
if (!operatorImpls.containsKey(operatorSpec)) {
- OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context);
+ OperatorImpl<M, ?> operatorImpl = createOperatorImpl(operatorSpec, config, context);
if (operatorImpls.putIfAbsent(operatorSpec, operatorImpl) == null) {
// this is the first time we've added the operatorImpl corresponding to the operatorSpec,
// so traverse and initialize and register the rest of the DAG.
// initialize the corresponding operator function
- operatorSpec.init(config, context);
+ operatorImpl.init(config, context);
MessageStreamImpl nextStream = operatorSpec.getNextStream();
if (nextStream != null) {
Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
registeredSpecs.forEach(registeredSpec -> {
- OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
+ OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, config, context);
operatorImpl.registerNextOperator(subImpl);
});
}
@@ -163,24 +164,21 @@ public class OperatorImplGraph {
/**
* Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
*
- * @param source the source {@link MessageStreamImpl}
- * @param <M> type of input message
* @param operatorSpec the immutable {@link OperatorSpec} definition.
* @param config the {@link Config} required to instantiate operators
* @param context the {@link TaskContext} required to instantiate operators
+ * @param <M> type of input message
* @return the {@link OperatorImpl} implementation instance
*/
- private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source,
- OperatorSpec operatorSpec, Config config, TaskContext context) {
+ private <M> OperatorImpl<M, ?> createOperatorImpl(OperatorSpec operatorSpec, Config config, TaskContext context) {
if (operatorSpec instanceof StreamOperatorSpec) {
- StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
- return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
+ return new StreamOperatorImpl<>((StreamOperatorSpec<M, ?>) operatorSpec, config, context);
} else if (operatorSpec instanceof SinkOperatorSpec) {
return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
} else if (operatorSpec instanceof WindowOperatorSpec) {
return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
} else if (operatorSpec instanceof PartialJoinOperatorSpec) {
- return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context, clock);
+ return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, config, context, clock);
}
throw new IllegalArgumentException(
String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index e4cb9c2..c7bdc22 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -18,12 +18,11 @@
*/
package org.apache.samza.operators.impl;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.metrics.Counter;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
@@ -35,6 +34,11 @@ import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
/**
* Implementation of a {@link PartialJoinOperatorSpec} that joins messages of type {@code M} in this stream
* with buffered messages of type {@code JM} in the other stream.
@@ -47,35 +51,45 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
private static final Logger LOGGER = LoggerFactory.getLogger(PartialJoinOperatorImpl.class);
+ private final PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec;
private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
private final long ttlMs;
- private final int opId;
private final Clock clock;
- PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOperatorSpec, MessageStreamImpl<M> source,
+ private Counter keysRemoved;
+
+ PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec,
Config config, TaskContext context, Clock clock) {
- this.thisPartialJoinFn = partialJoinOperatorSpec.getThisPartialJoinFn();
- this.otherPartialJoinFn = partialJoinOperatorSpec.getOtherPartialJoinFn();
- this.ttlMs = partialJoinOperatorSpec.getTtlMs();
- this.opId = partialJoinOperatorSpec.getOpId();
+ this.partialJoinOpSpec = partialJoinOpSpec;
+ this.thisPartialJoinFn = partialJoinOpSpec.getThisPartialJoinFn();
+ this.otherPartialJoinFn = partialJoinOpSpec.getOtherPartialJoinFn();
+ this.ttlMs = partialJoinOpSpec.getTtlMs();
this.clock = clock;
}
@Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+ protected void handleInit(Config config, TaskContext context) {
+ keysRemoved = context.getMetricsRegistry()
+ .newCounter(OperatorImpl.class.getName(), this.partialJoinOpSpec.getOpName() + "-keys-removed");
+ this.thisPartialJoinFn.init(config, context);
+ }
+
+ @Override
+ public Collection<RM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
K key = thisPartialJoinFn.getKey(message);
thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, clock.currentTimeMillis()));
PartialJoinMessage<JM> otherMessage = otherPartialJoinFn.getState().get(key);
long now = clock.currentTimeMillis();
if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) {
RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage());
- this.propagateResult(joinResult, collector, coordinator);
+ return Collections.singletonList(joinResult);
}
+ return Collections.emptyList();
}
@Override
- public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+ public Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
long now = clock.currentTimeMillis();
KeyValueStore<K, PartialJoinMessage<M>> thisState = thisPartialJoinFn.getState();
@@ -87,14 +101,18 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
if (entry.getValue().getReceivedTimeMs() < now - ttlMs) {
keysToRemove.add(entry.getKey());
} else {
- break;
+ break; // InternalInMemoryStore uses a LinkedHashMap and will return entries in insertion order
}
}
iterator.close();
thisState.deleteAll(keysToRemove);
-
- LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, clock.currentTimeMillis() - now);
+ keysRemoved.inc(keysToRemove.size());
+ return Collections.emptyList();
}
+ @Override
+ protected OperatorSpec<RM> getOperatorSpec() {
+ return partialJoinOpSpec;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
index eb9b5e2..0f18e97 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
@@ -18,9 +18,16 @@
*/
package org.apache.samza.operators.impl;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import java.util.Collection;
+import java.util.Collections;
+
/**
* A no-op operator implementation that forwards incoming messages to all of its subscribers.
@@ -29,7 +36,32 @@ import org.apache.samza.task.TaskCoordinator;
public final class RootOperatorImpl<M> extends OperatorImpl<M, M> {
@Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
- this.propagateResult(message, collector, coordinator);
+ protected void handleInit(Config config, TaskContext context) {
+ }
+
+ @Override
+ public Collection<M> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
+ return Collections.singletonList(message);
+ }
+
+ // TODO: SAMZA-1221 - Change to InputOperatorSpec that also builds the message
+ @Override
+ protected OperatorSpec<M> getOperatorSpec() {
+ return new OperatorSpec<M>() {
+ @Override
+ public MessageStreamImpl<M> getNextStream() {
+ return null;
+ }
+
+ @Override
+ public OpCode getOpCode() {
+ return OpCode.INPUT;
+ }
+
+ @Override
+ public int getOpId() {
+ return -1;
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
deleted file mode 100644
index 2bb362c..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Default implementation class of a {@link WindowOperatorSpec} for a session window.
- *
- * @param <M> the type of input message
- * @param <RK> the type of window key
- * @param <WV> the type of window state
- */
-class SessionWindowOperatorImpl<M, RK, WV> extends OperatorImpl<M, WindowPane<RK, WV>> {
-
- private final WindowOperatorSpec<M, RK, WV> windowSpec;
-
- SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WV> windowSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
- this.windowSpec = windowSpec;
- }
-
- @Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
- }
-
- public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
- // This is to periodically check the timeout triggers to get the list of window states to be updated
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index f92fbfb..e82737f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -20,27 +20,44 @@ package org.apache.samza.operators.impl;
import org.apache.samza.config.Config;
import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import java.util.Collection;
+import java.util.Collections;
+
/**
* Implementation for {@link SinkOperatorSpec}
*/
class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
+ private final SinkOperatorSpec<M> sinkOpSpec;
private final SinkFunction<M> sinkFn;
- SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext context) {
- this.sinkFn = sinkOp.getSinkFn();
+ SinkOperatorImpl(SinkOperatorSpec<M> sinkOpSpec, Config config, TaskContext context) {
+ this.sinkOpSpec = sinkOpSpec;
+ this.sinkFn = sinkOpSpec.getSinkFn();
+ }
+
+ @Override
+ protected void handleInit(Config config, TaskContext context) {
+ this.sinkFn.init(config, context);
}
@Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+ public Collection<M> handleMessage(M message, MessageCollector collector,
+ TaskCoordinator coordinator) {
this.sinkFn.apply(message, collector, coordinator);
// there should be no further chained operators since this is a terminal operator.
- // hence we don't call #propogateResult() here.
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected OperatorSpec<M> getOperatorSpec() {
+ return sinkOpSpec;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
index 644de20..bd4dce1 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -19,13 +19,15 @@
package org.apache.samza.operators.impl;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import java.util.Collection;
+
/**
* A StreamOperator that accepts a 1:n transform function and applies it to each incoming message.
@@ -35,15 +37,28 @@ import org.apache.samza.task.TaskCoordinator;
*/
class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
+ private final StreamOperatorSpec<M, RM> streamOpSpec;
private final FlatMapFunction<M, RM> transformFn;
- StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
- this.transformFn = streamOperatorSpec.getTransformFn();
+ StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOpSpec,
+ Config config, TaskContext context) {
+ this.streamOpSpec = streamOpSpec;
+ this.transformFn = streamOpSpec.getTransformFn();
+ }
+
+ @Override
+ protected void handleInit(Config config, TaskContext context) {
+ transformFn.init(config, context);
+ }
+
+ @Override
+ public Collection<RM> handleMessage(M message, MessageCollector collector,
+ TaskCoordinator coordinator) {
+ return this.transformFn.apply(message);
}
@Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
- // call the transform function and then for each output call propagateResult()
- this.transformFn.apply(message).forEach(r -> this.propagateResult(r, collector, coordinator));
+ protected OperatorSpec<RM> getOperatorSpec() {
+ return streamOpSpec;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index cd3b1bc..b99f719 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -20,14 +20,16 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.triggers.FiringType;
import org.apache.samza.operators.triggers.RepeatingTriggerImpl;
import org.apache.samza.operators.triggers.TimeTrigger;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.TriggerImpl;
import org.apache.samza.operators.triggers.TriggerImpls;
-import org.apache.samza.operators.triggers.FiringType;
import org.apache.samza.operators.util.InternalInMemoryStore;
import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.WindowKey;
@@ -36,6 +38,7 @@ import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.operators.windows.internal.WindowType;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.Clock;
import org.slf4j.Logger;
@@ -46,6 +49,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Function;
/**
@@ -61,9 +65,8 @@ import java.util.function.Function;
* for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, Object, MessageCollector, TaskCoordinator)}.
* The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance along with whether it has been canceled yet
* or not. Then, the {@link TriggerImplHandler} invokes onMessage on underlying its {@link TriggerImpl} instance. A
- * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. The
- * {@link WindowOperatorImpl} checks if the trigger fired, and propagates the result of the firing to its downstream
- * operators.
+ * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted.
+ * The {@link WindowOperatorImpl} checks if the trigger fired and returns the result of the firing.
*
* @param <M> the type of the incoming message
* @param <WK> the type of the key in this {@link org.apache.samza.operators.MessageStream}
@@ -74,56 +77,84 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorImpl.class);
+ private final WindowOperatorSpec<M, WK, WV> windowOpSpec;
+ private final Clock clock;
private final WindowInternal<M, WK, WV> window;
private final KeyValueStore<WindowKey<WK>, WindowState<WV>> store = new InternalInMemoryStore<>();
- TriggerScheduler<WK> triggerScheduler ;
+ private TriggerScheduler<WK> triggerScheduler;
// The trigger state corresponding to each {@link TriggerKey}.
private final Map<TriggerKey<WK>, TriggerImplHandler> triggers = new HashMap<>();
- private final Clock clock;
- public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> spec, Clock clock) {
+ public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> windowOpSpec, Clock clock) {
+ this.windowOpSpec = windowOpSpec;
this.clock = clock;
- this.window = spec.getWindow();
+ this.window = windowOpSpec.getWindow();
this.triggerScheduler= new TriggerScheduler(clock);
}
@Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+ protected void handleInit(Config config, TaskContext context) {
+ WindowInternal<M, WK, WV> window = windowOpSpec.getWindow();
+ if (window.getFoldLeftFunction() != null) {
+ window.getFoldLeftFunction().init(config, context);
+ }
+ }
+
+ @Override
+ public Collection<WindowPane<WK, WV>> handleMessage(
+ M message, MessageCollector collector, TaskCoordinator coordinator) {
LOG.trace("Processing message envelope: {}", message);
+ List<WindowPane<WK, WV>> results = new ArrayList<>();
WindowKey<WK> storeKey = getStoreKey(message);
WindowState<WV> existingState = store.get(storeKey);
WindowState<WV> newState = applyFoldFunction(existingState, message);
- LOG.trace("New window value: {}, earliest timestamp: {}", newState.getWindowValue(), newState.getEarliestTimestamp());
+ LOG.trace("New window value: {}, earliest timestamp: {}",
+ newState.getWindowValue(), newState.getEarliestTimestamp());
store.put(storeKey, newState);
if (window.getEarlyTrigger() != null) {
TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey);
- getOrCreateTriggerImplWrapper(triggerKey, window.getEarlyTrigger())
- .onMessage(triggerKey, message, collector, coordinator);
+ TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getEarlyTrigger());
+ Optional<WindowPane<WK, WV>> maybeTriggeredPane =
+ triggerImplHandler.onMessage(triggerKey, message, collector, coordinator);
+ maybeTriggeredPane.ifPresent(results::add);
}
if (window.getDefaultTrigger() != null) {
TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.DEFAULT, storeKey);
- getOrCreateTriggerImplWrapper(triggerKey, window.getDefaultTrigger())
- .onMessage(triggerKey, message, collector, coordinator);
+ TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getDefaultTrigger());
+ Optional<WindowPane<WK, WV>> maybeTriggeredPane =
+ triggerImplHandler.onMessage(triggerKey, message, collector, coordinator);
+ maybeTriggeredPane.ifPresent(results::add);
}
+
+ return results;
}
@Override
- public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+ public Collection<WindowPane<WK, WV>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
+ List<WindowPane<WK, WV>> results = new ArrayList<>();
+
List<TriggerKey<WK>> keys = triggerScheduler.runPendingCallbacks();
for (TriggerKey<WK> key : keys) {
TriggerImplHandler triggerImplHandler = triggers.get(key);
if (triggerImplHandler != null) {
- triggerImplHandler.onTimer(key, collector, coordinator);
+ Optional<WindowPane<WK, WV>> maybeTriggeredPane = triggerImplHandler.onTimer(key, collector, coordinator);
+ maybeTriggeredPane.ifPresent(results::add);
}
}
+ return results;
+ }
+
+ @Override
+ protected OperatorSpec<WindowPane<WK, WV>> getOperatorSpec() {
+ return windowOpSpec;
}
/**
@@ -168,7 +199,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
return newState;
}
- private TriggerImplHandler getOrCreateTriggerImplWrapper(TriggerKey<WK> triggerKey, Trigger<M> trigger) {
+ private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<WK> triggerKey, Trigger<M> trigger) {
TriggerImplHandler wrapper = triggers.get(triggerKey);
if (wrapper != null) {
LOG.trace("Returning existing trigger wrapper for {}", triggerKey);
@@ -185,9 +216,10 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
}
/**
- * Handles trigger firings, and propagates results to downstream operators.
+ * Handles trigger firings and returns the optional result.
*/
- private void onTriggerFired(TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator coordinator) {
+ private Optional<WindowPane<WK, WV>> onTriggerFired(
+ TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator coordinator) {
LOG.trace("Trigger key {} fired." , triggerKey);
TriggerImplHandler wrapper = triggers.get(triggerKey);
@@ -196,11 +228,10 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
if (state == null) {
LOG.trace("No state found for triggerKey: {}", triggerKey);
- return;
+ return Optional.empty();
}
WindowPane<WK, WV> paneOutput = computePaneOutput(triggerKey, state);
- super.propagateResult(paneOutput, collector, coordinator);
// Handle accumulation modes.
if (window.getAccumulationMode() == AccumulationMode.DISCARDING) {
@@ -228,6 +259,8 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
if (triggerKey.getType() == FiringType.EARLY && !wrapper.isRepeating()) {
cancelTrigger(triggerKey, false);
}
+
+ return Optional.of(paneOutput);
}
/**
@@ -248,7 +281,8 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
windowVal = (WV) new ArrayList<>((Collection<WV>) windowVal);
}
- WindowPane<WK, WV> paneOutput = new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
+ WindowPane<WK, WV> paneOutput =
+ new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
LOG.trace("Emitting pane output for trigger key {}", triggerKey);
return paneOutput;
}
@@ -279,7 +313,8 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
this.impl = impl;
}
- public void onMessage(TriggerKey<WK> triggerKey, M message, MessageCollector collector, TaskCoordinator coordinator) {
+ public Optional<WindowPane<WK, WV>> onMessage(TriggerKey<WK> triggerKey, M message,
+ MessageCollector collector, TaskCoordinator coordinator) {
if (!isCancelled) {
LOG.trace("Forwarding callbacks for {}", message);
impl.onMessage(message, triggerScheduler);
@@ -289,12 +324,14 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
if (impl instanceof RepeatingTriggerImpl) {
((RepeatingTriggerImpl<M, WK>) impl).clear();
}
- onTriggerFired(triggerKey, collector, coordinator);
+ return onTriggerFired(triggerKey, collector, coordinator);
}
}
+ return Optional.empty();
}
- public void onTimer(TriggerKey<WK> key, MessageCollector collector, TaskCoordinator coordinator) {
+ public Optional<WindowPane<WK, WV>> onTimer(
+ TriggerKey<WK> key, MessageCollector collector, TaskCoordinator coordinator) {
if (impl.shouldFire() && !isCancelled) {
LOG.trace("Triggering timer triggers");
@@ -302,8 +339,9 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
if (impl instanceof RepeatingTriggerImpl) {
((RepeatingTriggerImpl<M, WK>) impl).clear();
}
- onTriggerFired(key, collector, coordinator);
+ return onTriggerFired(key, collector, coordinator);
}
+ return Optional.empty();
}
public void cancel() {
@@ -315,5 +353,4 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
return this.impl instanceof RepeatingTriggerImpl;
}
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 18090e2..cc3c4ab 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -19,9 +19,7 @@
package org.apache.samza.operators.spec;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.task.TaskContext;
/**
@@ -34,6 +32,7 @@ import org.apache.samza.task.TaskContext;
public interface OperatorSpec<OM> {
enum OpCode {
+ INPUT,
MAP,
FLAT_MAP,
FILTER,
@@ -64,10 +63,11 @@ public interface OperatorSpec<OM> {
int getOpId();
/**
- * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP.
- *
- * @param config the {@link Config} object for this task
- * @param context the {@link TaskContext} object for this task
+ * Get the name for this operator based on its opCode and opId.
+ * @return the name for this operator
*/
- default void init(Config config, TaskContext context) { }
+ default String getOpName() {
+ return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index b1dc529..e85626f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -18,10 +18,8 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.task.TaskContext;
/**
@@ -88,10 +86,4 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
public int getOpId() {
return this.opId;
}
-
- @Override
- public void init(Config config, TaskContext context) {
- this.thisPartialJoinFn.init(config, context);
- }
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 7de85f3..0d135d3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -18,14 +18,12 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.stream.OutputStreamInternal;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
@@ -101,11 +99,6 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
return this.opId;
}
- @Override
- public void init(Config config, TaskContext context) {
- this.sinkFn.init(config, context);
- }
-
/**
* Creates a {@link SinkFunction} to send messages to the provided {@code output}.
* @param outputStream the {@link OutputStreamInternal} to send messages to
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index f9bbe2d..204e566 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -18,10 +18,8 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.task.TaskContext;
/**
@@ -71,9 +69,4 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
public int getOpId() {
return this.opId;
}
-
- @Override
- public void init(Config config, TaskContext context) {
- this.transformFn.init(config, context);
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 9515e38..73b17b5 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,11 +19,9 @@
package org.apache.samza.operators.spec;
-import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.task.TaskContext;
/**
@@ -53,13 +51,6 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
}
@Override
- public void init(Config config, TaskContext context) {
- if (window.getFoldLeftFunction() != null) {
- window.getFoldLeftFunction().init(config, context);
- }
- }
-
- @Override
public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
return this.nextStream;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index be52565..4720298 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -118,17 +118,19 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream);
- // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder.
RootOperatorImpl rootOperatorImpl = operatorImplGraph.getRootOperator(systemStream);
if (rootOperatorImpl != null) {
- rootOperatorImpl.onNext(inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()), collector, coordinator);
+ // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde
+ // before applying the msgBuilder.
+ Object message = inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage());
+ rootOperatorImpl.onMessage(message, collector, coordinator);
}
}
@Override
public final void window(MessageCollector collector, TaskCoordinator coordinator) {
operatorImplGraph.getAllRootOperators()
- .forEach(rootOperator -> rootOperator.onTick(collector, coordinator));
+ .forEach(rootOperator -> rootOperator.onTimer(collector, coordinator));
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 7a6f959..23b67aa 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -19,13 +19,10 @@
package org.apache.samza.operators;
import com.google.common.collect.ImmutableSet;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -42,6 +39,11 @@ import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
import org.junit.Test;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -237,6 +239,8 @@ public class TestJoinOperator {
when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
+ when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+
Config config = mock(Config.class);
StreamApplication sgb = new TestStreamApplication();
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index 6603137..597244e 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -26,9 +26,8 @@ import junit.framework.Assert;
import org.apache.samza.Partition;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.triggers.FiringType;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.testUtils.TestClock;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.AccumulationMode;
@@ -36,11 +35,13 @@ import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamOperatorTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.testUtils.TestClock;
import org.junit.Before;
import org.junit.Test;
@@ -71,6 +72,7 @@ public class TestWindowOperator {
runner = mock(ApplicationRunner.class);
when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+ when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
when(runner.getStreamSpec("integer-stream")).thenReturn(new StreamSpec("integer-stream", "integers", "kafka"));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index f978c3c..bd18f0b 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,61 +18,205 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.data.TestOutputMessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
-import org.hamcrest.core.IsEqual;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.argThat;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestOperatorImpl {
- TestMessageEnvelope curInputMsg;
- MessageCollector curCollector;
- TaskCoordinator curCoordinator;
+ @Test(expected = IllegalStateException.class)
+ public void testMultipleInitShouldThrow() {
+ OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class));
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ opImpl.init(mock(Config.class), mockTaskContext);
+ opImpl.init(mock(Config.class), mockTaskContext);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRegisterNextOperatorBeforeInitShouldThrow() {
+ OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class));
+ opImpl.registerNextOperator(mock(OperatorImpl.class));
+ }
+
+ @Test
+ public void testOnMessagePropagatesResults() {
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+
+ Object mockTestOpImplOutput = mock(Object.class);
+ OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
+ opImpl.init(mock(Config.class), mockTaskContext);
+
+ // register a couple of operators
+ OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
+ when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
+ when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
+ mockNextOpImpl1.init(mock(Config.class), mockTaskContext);
+ opImpl.registerNextOperator(mockNextOpImpl1);
+
+ OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
+ when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
+ when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
+ mockNextOpImpl2.init(mock(Config.class), mockTaskContext);
+ opImpl.registerNextOperator(mockNextOpImpl2);
+
+ // send a message to this operator
+ MessageCollector mockCollector = mock(MessageCollector.class);
+ TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+ opImpl.onMessage(mock(Object.class), mockCollector, mockCoordinator);
+
+ // verify that it propagates its handleMessage results to next operators
+ verify(mockNextOpImpl1, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator);
+ verify(mockNextOpImpl2, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator);
+ }
+
+ @Test
+ public void testOnMessageUpdatesMetrics() {
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
+ Counter mockCounter = mock(Counter.class);
+ Timer mockTimer = mock(Timer.class);
+ when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter);
+ when(mockMetricsRegistry.newTimer(anyString(), anyString())).thenReturn(mockTimer);
+
+ Object mockTestOpImplOutput = mock(Object.class);
+ OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
+ opImpl.init(mock(Config.class), mockTaskContext);
+
+ // send a message to this operator
+ MessageCollector mockCollector = mock(MessageCollector.class);
+ TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+ opImpl.onMessage(mock(Object.class), mockCollector, mockCoordinator);
+
+ // verify that it updates message count and timer metrics
+ verify(mockCounter, times(1)).inc();
+ verify(mockTimer, times(1)).update(anyLong());
+ }
+
+ @Test
+ public void testOnTimerPropagatesResultsAndTimer() {
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+
+ Object mockTestOpImplOutput = mock(Object.class);
+ OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
+ opImpl.init(mock(Config.class), mockTaskContext);
+
+ // register a couple of operators
+ OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
+ when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
+ when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
+ mockNextOpImpl1.init(mock(Config.class), mockTaskContext);
+ opImpl.registerNextOperator(mockNextOpImpl1);
+
+ OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
+ when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
+ when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
+ mockNextOpImpl2.init(mock(Config.class), mockTaskContext);
+ opImpl.registerNextOperator(mockNextOpImpl2);
+
+ // send a timer tick to this operator
+ MessageCollector mockCollector = mock(MessageCollector.class);
+ TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+ opImpl.onTimer(mockCollector, mockCoordinator);
+
+ // verify that it propagates its handleTimer results to next operators
+ verify(mockNextOpImpl1, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator);
+ verify(mockNextOpImpl2, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator);
+
+ // verify that it propagates the timer tick to next operators
+ verify(mockNextOpImpl1, times(1)).handleTimer(mockCollector, mockCoordinator);
+ verify(mockNextOpImpl2, times(1)).handleTimer(mockCollector, mockCoordinator);
+ }
@Test
- public void testSubscribers() {
- this.curInputMsg = null;
- this.curCollector = null;
- this.curCoordinator = null;
- OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() {
- @Override
- public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
- TestOperatorImpl.this.curInputMsg = message;
- TestOperatorImpl.this.curCollector = collector;
- TestOperatorImpl.this.curCoordinator = coordinator;
- }
- @Override
- public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
-
- }
-
- };
- // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext()
- OperatorImpl mockSub = mock(OperatorImpl.class);
- opImpl.registerNextOperator(mockSub);
- TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class);
+ public void testOnTimerUpdatesMetrics() {
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
+ Counter mockMessageCounter = mock(Counter.class);
+ Timer mockTimer = mock(Timer.class);
+ when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockMessageCounter);
+ when(mockMetricsRegistry.newTimer(anyString(), anyString())).thenReturn(mockTimer);
+
+ Object mockTestOpImplOutput = mock(Object.class);
+ OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
+ opImpl.init(mock(Config.class), mockTaskContext);
+
+ // send a message to this operator
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
- opImpl.propagateResult(xOutput, mockCollector, mockCoordinator);
- verify(mockSub, times(1)).onNext(
- argThat(new IsEqual<>(xOutput)),
- argThat(new IsEqual<>(mockCollector)),
- argThat(new IsEqual<>(mockCoordinator))
- );
- // verify onNext() is invoked correctly
- TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
- opImpl.onNext(mockInput, mockCollector, mockCoordinator);
- assertEquals(mockInput, this.curInputMsg);
- assertEquals(mockCollector, this.curCollector);
- assertEquals(mockCoordinator, this.curCoordinator);
+ opImpl.onTimer(mockCollector, mockCoordinator);
+
+ // verify that it updates metrics
+ verify(mockMessageCounter, times(0)).inc();
+ verify(mockTimer, times(1)).update(anyLong());
+ }
+
+ private static class TestOpImpl extends OperatorImpl<Object, Object> {
+ private final Object mockOutput;
+
+ TestOpImpl(Object mockOutput) {
+ this.mockOutput = mockOutput;
+ }
+
+ @Override
+ protected void handleInit(Config config, TaskContext context) {}
+
+ @Override
+ public Collection<Object> handleMessage(Object message,
+ MessageCollector collector, TaskCoordinator coordinator) {
+ return Collections.singletonList(mockOutput);
+ }
+
+ @Override
+ public Collection<Object> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
+ return Collections.singletonList(mockOutput);
+ }
+
+ @Override
+ protected OperatorSpec<Object> getOperatorSpec() {
+ return new TestOpSpec();
+ }
+ }
+
+ private static class TestOpSpec implements OperatorSpec<Object> {
+ @Override
+ public MessageStreamImpl<Object> getNextStream() {
+ return null;
+ }
+
+ @Override
+ public OpCode getOpCode() {
+ return OpCode.INPUT;
+ }
+
+ @Override
+ public int getOpId() {
+ return -1;
+ }
}
}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index 267cdfc..a75fadb 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.impl;
import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.TestMessageStreamImplUtil;
@@ -26,11 +27,10 @@ import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.windows.Windows;
@@ -62,14 +62,15 @@ public class TestOperatorImpls {
@Before
public void prep() throws NoSuchFieldException, NoSuchMethodException {
- nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
+ nextOperatorsField = OperatorImpl.class.getDeclaredField("registeredOperators");
nextOperatorsField.setAccessible(true);
- createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
+ createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl",
OperatorSpec.class, Config.class, TaskContext.class);
createOpMethod.setAccessible(true);
- createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
+ createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class,
+ Config.class, TaskContext.class);
createOpsMethod.setAccessible(true);
}
@@ -79,13 +80,12 @@ public class TestOperatorImpls {
WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING);
when(mockWnd.getWindow()).thenReturn(windowInternal);
- MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class);
Config mockConfig = mock(Config.class);
TaskContext mockContext = mock(TaskContext.class);
OperatorImplGraph opGraph = new OperatorImplGraph();
OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>)
- createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
+ createOpMethod.invoke(opGraph, mockWnd, mockConfig, mockContext);
assertTrue(opImpl instanceof WindowOperatorImpl);
Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
wndInternalField.setAccessible(true);
@@ -96,7 +96,7 @@ public class TestOperatorImpls {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockSimpleOp, mockConfig, mockContext);
assertTrue(opImpl instanceof StreamOperatorImpl);
Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
txfmFnField.setAccessible(true);
@@ -106,7 +106,7 @@ public class TestOperatorImpls {
SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
when(sinkOp.getSinkFn()).thenReturn(sinkFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, sinkOp, mockConfig, mockContext);
assertTrue(opImpl instanceof SinkOperatorImpl);
Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
sinkFnField.setAccessible(true);
@@ -114,8 +114,7 @@ public class TestOperatorImpls {
// get join operator
PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
- PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
- opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, joinOp, mockConfig, mockContext);
assertTrue(opImpl instanceof PartialJoinOperatorImpl);
}
@@ -124,6 +123,7 @@ public class TestOperatorImpls {
// test creation of empty chain
MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
Config mockConfig = mock(Config.class);
OperatorImplGraph opGraph = new OperatorImplGraph();
RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
@@ -134,8 +134,9 @@ public class TestOperatorImpls {
public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
// test creation of linear chain
StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+ MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
Config mockConfig = mock(Config.class);
testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
OperatorImplGraph opGraph = new OperatorImplGraph();
@@ -154,8 +155,9 @@ public class TestOperatorImpls {
public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
// test creation of broadcast chain
StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+ MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
Config mockConfig = mock(Config.class);
testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
@@ -187,6 +189,7 @@ public class TestOperatorImpls {
MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
Config mockConfig = mock(Config.class);
input1
.join(input2,
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index abd7740..1c01e57 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -27,7 +27,10 @@ import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestSinkOperatorImpl {
@@ -44,7 +47,7 @@ public class TestSinkOperatorImpl {
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
- sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator);
+ sinkImpl.handleMessage(mockMsg, mockCollector, mockCoordinator);
verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 9dd161a..36d7b92 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -18,10 +18,7 @@
*/
package org.apache.samza.operators.impl;
-import java.util.ArrayList;
-import java.util.Collection;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
@@ -31,7 +28,15 @@ import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
-import static org.mockito.Mockito.*;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestStreamOperatorImpl {
@@ -42,10 +47,10 @@ public class TestStreamOperatorImpl {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
- MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class);
Config mockConfig = mock(Config.class);
TaskContext mockContext = mock(TaskContext.class);
- StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext));
+ StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
+ spy(new StreamOperatorImpl<>(mockOp, mockConfig, mockContext));
TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
@@ -54,8 +59,9 @@ public class TestStreamOperatorImpl {
when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
- opImpl.onNext(inMsg, mockCollector, mockCoordinator);
+ Collection<TestOutputMessageEnvelope> results = opImpl
+ .handleMessage(inMsg, mockCollector, mockCoordinator);
verify(txfmFn, times(1)).apply(inMsg);
- verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator);
+ assertEquals(results, mockOutputs);
}
}