You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/10/22 04:41:24 UTC
samza git commit: SAMZA-915: implementation of operator classes
Repository: samza
Updated Branches:
refs/heads/samza-sql fbdd76daa -> adcd26678
SAMZA-915: implementation of operator classes
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/adcd2667
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/adcd2667
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/adcd2667
Branch: refs/heads/samza-sql
Commit: adcd26678d3fb6ea632de7117bc40a9c4f343d59
Parents: fbdd76d
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Fri Oct 21 21:27:28 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Oct 21 21:31:47 2016 -0700
----------------------------------------------------------------------
.../samza/operators/api/MessageStream.java | 50 ++++--
.../samza/operators/api/internal/Operators.java | 2 +-
.../samza/operators/impl/ChainedOperators.java | 52 +++++-
.../samza/operators/impl/OperatorFactory.java | 86 +++++++++
.../samza/operators/impl/OperatorImpl.java | 15 +-
.../operators/impl/join/PartialJoinOpImpl.java | 44 +++++
.../impl/window/SessionWindowImpl.java | 17 +-
.../samza/operators/api/TestMessageStream.java | 180 +++++++++++++++++++
.../samza/operators/api/TestOutputMessage.java | 47 +++++
.../operators/api/internal/TestOperators.java | 1 -
.../operators/impl/TestChainedOperators.java | 129 +++++++++++++
.../operators/impl/TestOperatorFactory.java | 95 ++++++++++
.../samza/operators/impl/TestOperatorImpl.java | 1 +
.../samza/operators/impl/TestOutputMessage.java | 47 -----
.../operators/impl/TestSimpleOperatorImpl.java | 1 +
.../operators/impl/TestSinkOperatorImpl.java | 1 +
.../impl/data/serializers/SqlAvroSerdeTest.java | 1 +
.../impl/window/TestSessionWindowImpl.java | 53 ++----
.../samza/task/BroadcastOperatorTask.java | 41 +++--
.../samza/task/InputJsonSystemMessage.java | 2 +-
20 files changed, 729 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
index a01cee9..b5e1028 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
@@ -19,16 +19,20 @@
package org.apache.samza.operators.api;
+import org.apache.samza.operators.api.Windows.Window;
import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.WindowOutput;
import org.apache.samza.operators.api.internal.Operators;
-import org.apache.samza.operators.api.Windows.Window;
+import org.apache.samza.operators.api.internal.Operators.Operator;
+import org.apache.samza.operators.api.internal.WindowOutput;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -41,6 +45,19 @@ import java.util.function.Function;
*/
public class MessageStream<M extends Message> {
+ private final Set<Operator> subscribers = new HashSet<>();
+
+ /**
+ * Helper method to get the corresponding list of subscribers to a specific {@link MessageStream}.
+ *
+ * NOTE: This should only be used by implementation of {@link org.apache.samza.operators.impl.ChainedOperators}, not directly by programmers.
+ *
+ * @return A unmodifiable set containing all {@link Operator}s that subscribe to this {@link MessageStream} object
+ */
+ public Collection<Operator> getSubscribers() {
+ return Collections.unmodifiableSet(this.subscribers);
+ }
+
/**
* Public API methods start here
*/
@@ -65,12 +82,14 @@ public class MessageStream<M extends Message> {
* @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
*/
public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) {
- return Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() {{
+ Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() {{
OM r = mapper.apply(m);
if (r != null) {
this.add(r);
}
- }}).getOutputStream();
+ }});
+ this.subscribers.add(op);
+ return op.getOutputStream();
}
/**
@@ -81,7 +100,9 @@ public class MessageStream<M extends Message> {
* @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
*/
public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) {
- return Operators.getStreamOperator(flatMapper).getOutputStream();
+ Operator<OM> op = Operators.getStreamOperator(flatMapper);
+ this.subscribers.add(op);
+ return op.getOutputStream();
}
/**
@@ -91,11 +112,13 @@ public class MessageStream<M extends Message> {
* @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream}
*/
public MessageStream<M> filter(Function<M, Boolean> filter) {
- return Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() {{
+ Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() {{
if (filter.apply(t)) {
this.add(t);
}
- }}).getOutputStream();
+ }});
+ this.subscribers.add(op);
+ return op.getOutputStream();
}
/**
@@ -105,7 +128,7 @@ public class MessageStream<M extends Message> {
* @param sink the user-defined sink function to send the input {@link Message}s to the external output systems
*/
public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
- Operators.getSinkOperator(sink);
+ this.subscribers.add(Operators.getSinkOperator(sink));
}
/**
@@ -119,7 +142,9 @@ public class MessageStream<M extends Message> {
* @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream}
*/
public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Window<M, WK, WV, WM> window) {
- return Operators.getWindowOperator(Windows.getInternalWindowFn(window)).getOutputStream();
+ Operator<WM> wndOp = Operators.getWindowOperator(Windows.getInternalWindowFn(window));
+ this.subscribers.add(wndOp);
+ return wndOp.getOutputStream();
}
/**
@@ -141,8 +166,8 @@ public class MessageStream<M extends Message> {
// TODO: need to add default store functions for the two partial join functions
- Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream);
- Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream);
+ other.subscribers.add(Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream));
+ this.subscribers.add(Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream));
return outputStream;
}
@@ -156,7 +181,8 @@ public class MessageStream<M extends Message> {
MessageStream<M> outputStream = new MessageStream<>();
others.add(this);
- others.forEach(other -> Operators.getMergeOperator(outputStream));
+ others.forEach(other -> other.subscribers.add(Operators.getMergeOperator(outputStream)));
return outputStream;
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
index f220285..e9bfe0b 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
@@ -55,7 +55,7 @@ public class Operators {
* Private interface for stream operator functions. The interface class defines the output of the stream operator function.
*
*/
- private interface Operator<OM extends Message> {
+ public interface Operator<OM extends Message> {
MessageStream<OM> getOutputStream();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
index 49cfdeb..59de16b 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
@@ -21,10 +21,16 @@ package org.apache.samza.operators.impl;
import org.apache.samza.operators.api.MessageStream;
import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.Operator;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
/**
* Implementation class for a chain of operators from the single input {@code source}
@@ -33,6 +39,8 @@ import org.apache.samza.task.TaskCoordinator;
*/
public class ChainedOperators<M extends Message> {
+ private final Set<OperatorImpl> subscribers = new HashSet<>();
+
/**
* Private constructor
*
@@ -41,7 +49,38 @@ public class ChainedOperators<M extends Message> {
*/
private ChainedOperators(MessageStream<M> source, TaskContext context) {
// create the pipeline/topology starting from source
- // pass in the context s.t. stateful stream operators can initialize their stores
+ source.getSubscribers().forEach(sub -> {
+ // pass in the context s.t. stateful stream operators can initialize their stores
+ OperatorImpl subImpl = this.createAndSubscribe(sub, source, context);
+ this.subscribers.add(subImpl);
+ });
+ }
+
+ /**
+ * Private function to recursively instantiate the implementation of operators and the chains
+ *
+ * @param operator the operator that subscribe to {@code source}
+ * @param source the source {@link MessageStream}
+ * @param context the context of the task
+ * @return the implementation object of the corresponding {@code operator}
+ */
+ private OperatorImpl<M, ? extends Message> createAndSubscribe(Operator operator, MessageStream source,
+ TaskContext context) {
+ Entry<OperatorImpl<M, ? extends Message>, Boolean> factoryEntry = OperatorFactory.getOperator(operator);
+ if (factoryEntry.getValue()) {
+ // The operator has already been instantiated and we do not need to traverse and create the subscribers any more.
+ return factoryEntry.getKey();
+ }
+ OperatorImpl<M, ? extends Message> opImpl = factoryEntry.getKey();
+ MessageStream outStream = operator.getOutputStream();
+ Collection<Operator> subs = outStream.getSubscribers();
+ subs.forEach(sub -> {
+ OperatorImpl subImpl = this.createAndSubscribe(sub, operator.getOutputStream(), context);
+ opImpl.subscribe(subImpl);
+ });
+ // initialize the operator's state store
+ opImpl.init(source, context);
+ return opImpl;
}
/**
@@ -64,10 +103,17 @@ public class ChainedOperators<M extends Message> {
* @param coordinator the {@link TaskCoordinator} object within the process context
*/
public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
- // TODO: add implementation of onNext() that actually triggers the process pipeline
+ this.subscribers.forEach(sub -> sub.onNext(message, collector, coordinator));
}
+ /**
+ * Method to handle timer events
+ *
+ * @param collector the {@link MessageCollector} object within the process context
+ * @param coordinator the {@link TaskCoordinator} object within the process context
+ */
public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
- // TODO: add implementation of onTimer() that actually calls the corresponding window operator's onTimer() methods
+ long nanoTime = System.nanoTime();
+ this.subscribers.forEach(sub -> sub.onTimer(nanoTime, collector, coordinator));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
new file mode 100644
index 0000000..f16cbc6
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.commons.collections.keyvalue.AbstractMapEntry;
+import org.apache.samza.operators.api.WindowState;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.*;
+import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
+import org.apache.samza.operators.impl.window.SessionWindowImpl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map.Entry;
+
+
+/**
+ * The factory class that instantiates all implementation of {@link OperatorImpl} classes.
+ */
+public class OperatorFactory {
+
+ /**
+ * the static operatorMap that includes all operator implementation instances
+ */
+ private static final Map<Operator, OperatorImpl<? extends Message, ? extends Message>> operatorMap = new ConcurrentHashMap<>();
+
+ /**
+ * The method to actually create the implementation instances of operators
+ *
+ * @param operator the immutable definition of {@link Operator}
+ * @param <M> type of input {@link Message}
+ * @param <RM> type of output {@link Message}
+ * @return the implementation object of {@link OperatorImpl}
+ */
+ private static <M extends Message, RM extends Message> OperatorImpl<M, ? extends Message> createOperator(Operator<RM> operator) {
+ if (operator instanceof StreamOperator) {
+ return new SimpleOperatorImpl<>((StreamOperator<M, RM>) operator);
+ } else if (operator instanceof SinkOperator) {
+ return new SinkOperatorImpl<>((SinkOperator<M>) operator);
+ } else if (operator instanceof WindowOperator) {
+ return new SessionWindowImpl<>((WindowOperator<M, ?, ? extends WindowState, ? extends WindowOutput>) operator);
+ } else if (operator instanceof PartialJoinOperator) {
+ return new PartialJoinOpImpl<>((PartialJoinOperator) operator);
+ }
+ throw new IllegalArgumentException(
+ String.format("The type of operator is not supported. Operator class name: %s", operator.getClass().getName()));
+ }
+
+ /**
+ * The method to get the unique implementation instance of {@link Operator}
+ *
+ * @param operator the {@link Operator} to instantiate
+ * @param <M> type of input {@link Message}
+ * @param <RM> type of output {@link Message}
+ * @return A pair of entry that include the unique implementation instance to the {@code operator} and a boolean value indicating whether
+ * the operator instance has already been created or not. True means the operator instance has already created, false means the operator
+ * was not created.
+ */
+ public static <M extends Message, RM extends Message> Entry<OperatorImpl<M, ? extends Message>, Boolean> getOperator(Operator<RM> operator) {
+ if (!operatorMap.containsKey(operator)) {
+ OperatorImpl<M, ? extends Message> operatorImpl = OperatorFactory.createOperator(operator);
+ if( operatorMap.putIfAbsent(operator, operatorImpl) == null ) {
+ return new AbstractMapEntry(operatorImpl, false) {};
+ }
+ }
+ return new AbstractMapEntry((OperatorImpl<M, ? extends Message>) operatorMap.get(operator), true) {};
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 81a7ede..3ca8bde 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.impl;
import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.MessageStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
@@ -61,6 +62,17 @@ public abstract class OperatorImpl<M extends Message, RM extends Message>
}
/**
+ * Default method for timer event
+ *
+ * @param nanoTime the system nano-second when the timer event is triggered
+ * @param collector the {@link MessageCollector} in the context
+ * @param coordinator the {@link TaskCoordinator} in the context
+ */
+ public void onTimer(long nanoTime, MessageCollector collector, TaskCoordinator coordinator) {
+ this.subscribers.forEach(sub -> ((OperatorImpl)sub).onTimer(nanoTime, collector, coordinator));
+ }
+
+ /**
* Each sub-class will implement this method to actually perform the transformation and call the downstream subscribers.
*
* @param message the input {@link Message}
@@ -72,9 +84,10 @@ public abstract class OperatorImpl<M extends Message, RM extends Message>
/**
* Stateful operators will need to override this method to initialize the operators
*
+ * @param source the source that this {@link OperatorImpl} object subscribe to
* @param context the task context to initialize the operators within
*/
- protected void init(TaskContext context) {};
+ protected void init(MessageStream<M> source, TaskContext context) {};
/**
* Method to trigger all downstream operators that consumes the output {@link org.apache.samza.operators.api.MessageStream}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
new file mode 100644
index 0000000..bbe08a4
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.join;
+
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator;
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation of a {@link org.apache.samza.operators.api.internal.Operators.PartialJoinOperator}. This class implements function
+ * that only takes in one input stream among all inputs to the join and generate the join output.
+ *
+ * @param <M> Type of input stream {@link org.apache.samza.operators.api.data.Message}
+ * @param <RM> Type of join output stream {@link org.apache.samza.operators.api.data.Message}
+ */
+public class PartialJoinOpImpl<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> extends OperatorImpl<M, RM> {
+
+ public PartialJoinOpImpl(PartialJoinOperator<M, K, JM, RM> joinOp) {
+ // TODO: implement PartialJoinOpImpl constructor
+ }
+
+ @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+ // TODO: implement PartialJoinOpImpl processing logic
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
index 2de53aa..59e2dec 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
@@ -30,29 +30,27 @@ import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
-import java.util.function.BiFunction;
-
/**
* Default implementation class of a {@link WindowOperator} for a session window.
*
* @param <M> the type of input {@link Message}
* @param <RK> the type of window key
+ * @param <WS> the type of window state
* @param <RM> the type of aggregated value of the window
*/
public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, RM extends WindowOutput<RK, ?>> extends
OperatorImpl<M, RM> {
- private final BiFunction<M, Entry<RK, WS>, RM> txfmFunction;
- private final StateStoreImpl<M, RK, WS> wndStore;
+ private final WindowOperator<M, RK, WS, RM> sessWnd;
+ private StateStoreImpl<M, RK, WS> wndStore = null;
- SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd, MessageStream<M> input) {
- this.txfmFunction = sessWnd.getFunction();
- this.wndStore = new StateStoreImpl<>(sessWnd.getStoreFunctions(), sessWnd.getStoreName(input));
+ public SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd) {
+ this.sessWnd = sessWnd;
}
@Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
Entry<RK, WS> state = this.wndStore.getState(message);
- this.nextProcessors(this.txfmFunction.apply(message, state), collector, coordinator);
+ this.nextProcessors(this.sessWnd.getFunction().apply(message, state), collector, coordinator);
this.wndStore.updateState(message, state);
}
@@ -60,7 +58,8 @@ public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, RM
// This is to periodically check the timeout triggers to get the list of window states to be updated
}
- @Override protected void init(TaskContext context) {
+ @Override protected void init(MessageStream<M> source, TaskContext context) {
+ this.wndStore = new StateStoreImpl<>(this.sessWnd.getStoreFunctions(), sessWnd.getStoreName(source));
this.wndStore.init(context);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
new file mode 100644
index 0000000..9f9ad6b
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.apache.samza.operators.api.internal.Operators.*;
+import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestMessageStream {
+
+ @Test public void testMap() {
+ MessageStream<TestMessage> inputStream = new MessageStream<>();
+ Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2);
+ MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap);
+ Collection<Operator> subs = inputStream.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestOutputMessage> mapOp = subs.iterator().next();
+ assertTrue(mapOp instanceof StreamOperator);
+ assertEquals(mapOp.getOutputStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ TestMessage xTestMsg = mock(TestMessage.class);
+ when(xTestMsg.getKey()).thenReturn("test-msg-key");
+ when(xTestMsg.getMessage()).thenReturn("123456789");
+ when(xTestMsg.getTimestamp()).thenReturn(12345L);
+ Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg);
+ assertEquals(cOutputMsg.size(), 1);
+ TestOutputMessage outputMessage = cOutputMsg.iterator().next();
+ assertEquals(outputMessage.getKey(), xTestMsg.getKey());
+ assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1));
+ assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2);
+ }
+
+ @Test public void testFlatMap() {
+ MessageStream<TestMessage> inputStream = new MessageStream<>();
+ Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() {{
+ this.add(mock(TestOutputMessage.class));
+ this.add(mock(TestOutputMessage.class));
+ this.add(mock(TestOutputMessage.class));
+ }};
+ Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts;
+ MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap);
+ Collection<Operator> subs = inputStream.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestOutputMessage> flatMapOp = subs.iterator().next();
+ assertTrue(flatMapOp instanceof StreamOperator);
+ assertEquals(flatMapOp.getOutputStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap);
+ }
+
+ @Test public void testFilter() {
+ MessageStream<TestMessage> inputStream = new MessageStream<>();
+ Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L;
+ MessageStream<TestMessage> outputStream = inputStream.filter(xFilter);
+ Collection<Operator> subs = inputStream.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> filterOp = subs.iterator().next();
+ assertTrue(filterOp instanceof StreamOperator);
+ assertEquals(filterOp.getOutputStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction();
+ TestMessage mockMsg = mock(TestMessage.class);
+ when(mockMsg.getTimestamp()).thenReturn(11111L);
+ Collection<TestMessage> output = txfmFn.apply(mockMsg);
+ assertTrue(output.isEmpty());
+ when(mockMsg.getTimestamp()).thenReturn(999999L);
+ output = txfmFn.apply(mockMsg);
+ assertEquals(output.size(), 1);
+ assertEquals(output.iterator().next(), mockMsg);
+ }
+
+ @Test public void testSink() {
+ MessageStream<TestMessage> inputStream = new MessageStream<>();
+ MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> {
+ mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
+ tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+ };
+ inputStream.sink(xSink);
+ Collection<Operator> subs = inputStream.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> sinkOp = subs.iterator().next();
+ assertTrue(sinkOp instanceof SinkOperator);
+ assertEquals(((SinkOperator) sinkOp).getFunction(), xSink);
+ assertNull(((SinkOperator) sinkOp).getOutputStream());
+ }
+
+ @Test public void testWindow() {
+ MessageStream<TestMessage> inputStream = new MessageStream<>();
+ Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class);
+ MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window);
+ Collection<Operator> subs = inputStream.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> wndOp = subs.iterator().next();
+ assertTrue(wndOp instanceof WindowOperator);
+ assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream);
+ }
+
+ @Test public void testJoin() {
+ MessageStream<TestMessage> source1 = new MessageStream<>();
+ MessageStream<TestMessage> source2 = new MessageStream<>();
+ BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp());
+ MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner);
+ Collection<Operator> subs = source1.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> joinOp1 = subs.iterator().next();
+ assertTrue(joinOp1 instanceof PartialJoinOperator);
+ assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput);
+ subs = source2.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> joinOp2 = subs.iterator().next();
+ assertTrue(joinOp2 instanceof PartialJoinOperator);
+ assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput);
+ TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L);
+ TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L);
+ TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2);
+ assertEquals(xOut.getKey(), "test-join-1");
+ assertEquals(xOut.getMessage(), Integer.valueOf(24));
+ assertEquals(xOut.getTimestamp(), 11111L);
+ xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1);
+ assertEquals(xOut.getKey(), "test-join-1");
+ assertEquals(xOut.getMessage(), Integer.valueOf(24));
+ assertEquals(xOut.getTimestamp(), 11111L);
+ }
+
+ @Test public void testMerge() {
+ MessageStream<TestMessage> merge1 = new MessageStream<>();
+ Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>(){{
+ this.add(new MessageStream<>());
+ this.add(new MessageStream<>());
+ }};
+ MessageStream<TestMessage> mergeOutput = merge1.merge(others);
+ validateMergeOperator(merge1, mergeOutput);
+
+ others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+ }
+
+ private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) {
+ Collection<Operator> subs = mergeSource.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> mergeOp = subs.iterator().next();
+ assertTrue(mergeOp instanceof StreamOperator);
+ assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput);
+ TestMessage mockMsg = mock(TestMessage.class);
+ Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg);
+ assertEquals(outputs.size(), 1);
+ assertEquals(outputs.iterator().next(), mockMsg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
new file mode 100644
index 0000000..225e77f
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestOutputMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.apache.samza.operators.api.data.Message;
+
+
+public class TestOutputMessage implements Message<String, Integer> {
+ private final String key;
+ private final Integer value;
+ private final long timestamp;
+
+ public TestOutputMessage(String key, Integer value, long timestamp) {
+ this.key = key;
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ @Override public Integer getMessage() {
+ return this.value;
+ }
+
+ @Override public String getKey() {
+ return this.key;
+ }
+
+ @Override public long getTimestamp() {
+ return this.timestamp;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
index 65c37e9..6dc77e5 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
@@ -117,7 +117,6 @@ public class TestOperators {
@Test public void testGetMergeOperator() {
MessageStream<TestMessage> output = new MessageStream<>();
-
Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output);
Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() {{
this.add(t);
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
new file mode 100644
index 0000000..d4d2378
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.TestOutputMessage;
+import org.apache.samza.operators.api.Windows;
+import org.apache.samza.task.TaskContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.reactivestreams.Subscriber;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+
+public class TestChainedOperators {
+ Field subsField = null;
+ Field opSubsField = null;
+
+ @Before public void prep() throws NoSuchFieldException {
+ subsField = ChainedOperators.class.getDeclaredField("subscribers");
+ subsField.setAccessible(true);
+ opSubsField = OperatorImpl.class.getDeclaredField("subscribers");
+ opSubsField.setAccessible(true);
+ }
+
+ @Test public void testCreate() {
+ // test creation of empty chain
+ MessageStream<TestMessage> testStream = new MessageStream<>();
+ TaskContext mockContext = mock(TaskContext.class);
+ ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testStream, mockContext);
+ assertTrue(operatorChain != null);
+ }
+
+ @Test public void testLinearChain() throws IllegalAccessException {
+ // test creation of linear chain
+ MessageStream<TestMessage> testInput = new MessageStream<>();
+ TaskContext mockContext = mock(TaskContext.class);
+ testInput.map(m -> m).window(Windows.intoSessionCounter(TestMessage::getKey));
+ ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext);
+ Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain);
+ assertEquals(subsSet.size(), 1);
+ OperatorImpl<TestMessage, TestMessage> firstOpImpl = subsSet.iterator().next();
+ Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(firstOpImpl);
+ assertEquals(subsOps.size(), 1);
+ Subscriber<? super ProcessorContext<TestMessage>> wndOpImpl = subsOps.iterator().next();
+ subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(wndOpImpl);
+ assertEquals(subsOps.size(), 0);
+ }
+
+ @Test public void testBroadcastChain() throws IllegalAccessException {
+ // test creation of broadcast chain
+ MessageStream<TestMessage> testInput = new MessageStream<>();
+ TaskContext mockContext = mock(TaskContext.class);
+ testInput.filter(m -> m.getTimestamp() > 123456L).flatMap(m -> new ArrayList() {{ this.add(m); this.add(m); }});
+ testInput.filter(m -> m.getTimestamp() < 123456L).map(m -> m);
+ ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext);
+ Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain);
+ assertEquals(subsSet.size(), 2);
+ Iterator<OperatorImpl> iter = subsSet.iterator();
+ // check the first branch w/ flatMap
+ OperatorImpl<TestMessage, TestMessage> opImpl = iter.next();
+ Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl);
+ assertEquals(subsOps.size(), 1);
+ Subscriber<? super ProcessorContext<TestMessage>> flatMapImpl = subsOps.iterator().next();
+ subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(flatMapImpl);
+ assertEquals(subsOps.size(), 0);
+ // check the second branch w/ map
+ opImpl = iter.next();
+ subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl);
+ assertEquals(subsOps.size(), 1);
+ Subscriber<? super ProcessorContext<TestMessage>> mapImpl = subsOps.iterator().next();
+ subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(mapImpl);
+ assertEquals(subsOps.size(), 0);
+ }
+
+ @Test public void testJoinChain() throws IllegalAccessException {
+ // test creation of join chain
+ MessageStream<TestMessage> input1 = new MessageStream<>();
+ MessageStream<TestMessage> input2 = new MessageStream<>();
+ TaskContext mockContext = mock(TaskContext.class);
+ input1.join(input2, (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp())).map(m -> m);
+ // now, we create chained operators from each input sources
+ ChainedOperators<TestMessage> chain1 = ChainedOperators.create(input1, mockContext);
+ ChainedOperators<TestMessage> chain2 = ChainedOperators.create(input2, mockContext);
+ // check that those two chains will merge at map operator
+ // first branch of the join
+ Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(chain1);
+ assertEquals(subsSet.size(), 1);
+ OperatorImpl<TestMessage, TestOutputMessage> joinOp1 = subsSet.iterator().next();
+ Set<Subscriber<? super ProcessorContext<TestOutputMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp1);
+ assertEquals(subsOps.size(), 1);
+ // the map operator consumes the common join output, where two branches merge
+ Subscriber<? super ProcessorContext<TestOutputMessage>> mapImpl = subsOps.iterator().next();
+ // second branch of the join
+ subsSet = (Set<OperatorImpl>) subsField.get(chain2);
+ assertEquals(subsSet.size(), 1);
+ OperatorImpl<TestMessage, TestOutputMessage> joinOp2 = subsSet.iterator().next();
+ assertNotSame(joinOp1, joinOp2);
+ subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp2);
+ assertEquals(subsOps.size(), 1);
+ // make sure that the map operator is the same
+ assertEquals(mapImpl, subsOps.iterator().next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
new file mode 100644
index 0000000..d228784
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.TestOutputMessage;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator;
+import org.apache.samza.operators.api.internal.Operators.SinkOperator;
+import org.apache.samza.operators.api.internal.Operators.StreamOperator;
+import org.apache.samza.operators.api.internal.Operators.WindowOperator;
+import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
+import org.apache.samza.operators.impl.window.SessionWindowImpl;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperatorFactory {
+
+ @Test public void testGetOperator() throws NoSuchFieldException, IllegalAccessException {
+ // get window operator
+ WindowOperator mockWnd = mock(WindowOperator.class);
+ Map.Entry<OperatorImpl<TestMessage, ? extends Message>, Boolean>
+ factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockWnd);
+ assertFalse(factoryEntry.getValue());
+ OperatorImpl<TestMessage, TestOutputMessage> opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
+ assertTrue(opImpl instanceof SessionWindowImpl);
+ Field sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd");
+ sessWndField.setAccessible(true);
+ WindowOperator sessWnd = (WindowOperator) sessWndField.get(opImpl);
+ assertEquals(sessWnd, mockWnd);
+
+ // get simple operator
+ StreamOperator<TestMessage, TestOutputMessage> mockSimpleOp = mock(StreamOperator.class);
+ Function<TestMessage, Collection<TestOutputMessage>> mockTxfmFn = mock(Function.class);
+ when(mockSimpleOp.getFunction()).thenReturn(mockTxfmFn);
+ factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockSimpleOp);
+ opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
+ assertTrue(opImpl instanceof SimpleOperatorImpl);
+ Field txfmFnField = SimpleOperatorImpl.class.getDeclaredField("transformFn");
+ txfmFnField.setAccessible(true);
+ assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
+
+ // get sink operator
+ MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, mc, tc) -> {};
+ SinkOperator<TestMessage> sinkOp = mock(SinkOperator.class);
+ when(sinkOp.getFunction()).thenReturn(sinkFn);
+ factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(sinkOp);
+ opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
+ assertTrue(opImpl instanceof SinkOperatorImpl);
+ Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFunc");
+ sinkFnField.setAccessible(true);
+ assertEquals(sinkFn, sinkFnField.get(opImpl));
+
+ // get join operator
+ PartialJoinOperator<TestMessage, String, TestMessage, TestOutputMessage> joinOp = mock(PartialJoinOperator.class);
+ TestOutputMessage mockOutput = mock(TestOutputMessage.class);
+ BiFunction<TestMessage, TestMessage, TestOutputMessage> joinFn = (m1, m2) -> mockOutput;
+ when(joinOp.getFunction()).thenReturn(joinFn);
+ factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(joinOp);
+ opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
+ assertTrue(opImpl instanceof PartialJoinOpImpl);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 9445f3a..d296111 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.impl;
import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.TestOutputMessage;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
deleted file mode 100644
index 4bcf767..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.api.data.Message;
-
-
-class TestOutputMessage implements Message<String, Integer> {
- private final String key;
- private final Integer value;
- private final long timestamp;
-
- TestOutputMessage(String key, Integer value, long timestamp) {
- this.key = key;
- this.value = value;
- this.timestamp = timestamp;
- }
-
- @Override public Integer getMessage() {
- return this.value;
- }
-
- @Override public String getKey() {
- return this.key;
- }
-
- @Override public long getTimestamp() {
- return this.timestamp;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
index 50154f0..c8c4944 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.impl;
import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.api.TestOutputMessage;
import org.apache.samza.operators.api.internal.Operators.StreamOperator;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index eb8a23a..e711bc5 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.impl;
import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.TestOutputMessage;
import org.apache.samza.operators.api.internal.Operators.SinkOperator;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
index 10ee2c7..5aa28bb 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
@@ -64,6 +64,7 @@ public class SqlAvroSerdeTest {
@Test
public void testSqlAvroSerialization() throws IOException {
AvroData decodedDatumOriginal = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
+ @SuppressWarnings("unchecked")
byte[] encodedDatum = serde.toBytes(decodedDatumOriginal);
AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum);
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
index 6947464..75cb00c 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
@@ -24,7 +24,6 @@ import org.apache.samza.operators.api.WindowState;
import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
import org.apache.samza.operators.api.internal.Operators.WindowOperator;
import org.apache.samza.operators.api.internal.WindowOutput;
-import org.apache.samza.operators.impl.StateStoreImpl;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.MessageCollector;
@@ -44,67 +43,41 @@ import static org.mockito.Mockito.*;
public class TestSessionWindowImpl {
Field wndStoreField = null;
- Field txfmFnField = null;
+ Field sessWndField = null;
@Before public void prep() throws NoSuchFieldException {
wndStoreField = SessionWindowImpl.class.getDeclaredField("wndStore");
- txfmFnField = SessionWindowImpl.class.getDeclaredField("txfmFunction");
+ sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd");
wndStoreField.setAccessible(true);
- txfmFnField.setAccessible(true);
+ sessWndField.setAccessible(true);
}
@Test public void testConstructor() throws IllegalAccessException, NoSuchFieldException {
// test constructing a SessionWindowImpl w/ expected mock functions
- MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
- StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class);
- when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns);
- when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
- BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
- when(wndOp.getFunction()).thenReturn(mockTxfmFn);
- SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp, mockInputStrm);
- BiFunction<TestMessage, WindowState<Integer>, WindowOutput<String, Integer>> txfmFn =
- (BiFunction<TestMessage, WindowState<Integer>, WindowOutput<String, Integer>>) txfmFnField.get(sessWnd);
- assertEquals(mockTxfmFn, txfmFn);
- StateStoreImpl<TestMessage, String, WindowState<Integer>> storeImpl =
- (StateStoreImpl<TestMessage, String, WindowState<Integer>>) wndStoreField.get(sessWnd);
-
- // test init() and make sure the wndStore is initialized as expected
- TestMessage mockMsg = mock(TestMessage.class);
- TaskContext mockContext = mock(TaskContext.class);
- KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class);
- when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
- Function<TestMessage, String> wndKeyFn = m -> "test-msg-key";
- when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn);
- WindowState<Integer> mockState = mock(WindowState.class);
- when(mockKvStore.get("test-msg-key")).thenReturn(mockState);
- storeImpl.init(mockContext);
- Entry<String, WindowState<Integer>> stateEntry = storeImpl.getState(mockMsg);
- verify(mockStoreFns, times(1)).getStoreKeyFinder();
- verify(mockKvStore, times(1)).get("test-msg-key");
- assertEquals(stateEntry.getKey(), "test-msg-key");
- assertEquals(stateEntry.getValue(), mockState);
+ SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp);
+ assertEquals(wndOp, sessWndField.get(sessWnd));
}
- @Test public void testInitAndProcess() {
- MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
+ @Test public void testInitAndProcess() throws IllegalAccessException {
WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
+ BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
+ SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp);
+
+ // construct and init the SessionWindowImpl object
+ MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class);
Function<TestMessage, String> wndKeyFn = m -> "test-msg-key";
when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn);
when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns);
when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
- BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
when(wndOp.getFunction()).thenReturn(mockTxfmFn);
-
- // construct and init the SessionWindowImpl object
- SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp, mockInputStrm);
TaskContext mockContext = mock(TaskContext.class);
KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class);
when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
- sessWnd.init(mockContext);
+ sessWnd.init(mockInputStrm, mockContext);
- // test onNext() method. Make sure the right methods are invoked.
+ // test onNext() method. Make sure the transformation function and the state update functions are invoked.
TestMessage mockMsg = mock(TestMessage.class);
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
index 91b0074..724bbba 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -56,27 +56,26 @@ public class BroadcastOperatorTask implements StreamOperatorTask {
@Override public void initOperators(Collection<SystemMessageStream> sources) {
sources.forEach(source -> {
- MessageStream<JsonMessage> inputStream = source.map(this::getInputMessage);
-
- inputStream.filter(this::myFilter1).
- window(Windows.<JsonMessage, String>intoSessionCounter(
- m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)).
- setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100).
- addLateTriggerOnSizeLimit(10).
- addTimeoutSinceLastMessage(30000)));
-
- inputStream.filter(this::myFilter2).
- window(Windows.<JsonMessage, String>intoSessions(
- m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)).
- setTriggers(TriggerBuilder.<JsonMessage, Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100).
- addTimeoutSinceLastMessage(30000)));
-
- inputStream.filter(this::myFilter3).
- window(Windows.<JsonMessage, String, MessageType>intoSessions(
- m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()).
- setTriggers(TriggerBuilder
- .<JsonMessage, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000).
- addTimeoutSinceFirstMessage(60000)));
+ MessageStream<JsonMessage> inputStream = source.map(this::getInputMessage);
+
+ inputStream.filter(this::myFilter1).
+ window(Windows.<JsonMessage, String>intoSessionCounter(
+ m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)).
+ setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100).
+ addLateTriggerOnSizeLimit(10).
+ addTimeoutSinceLastMessage(30000)));
+
+ inputStream.filter(this::myFilter2).
+ window(Windows.<JsonMessage, String>intoSessions(
+ m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)).
+ setTriggers(TriggerBuilder.<JsonMessage, Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100).
+ addTimeoutSinceLastMessage(30000)));
+
+ inputStream.filter(this::myFilter3).
+ window(Windows.<JsonMessage, String, MessageType>intoSessions(
+ m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()).
+ setTriggers(TriggerBuilder.<JsonMessage, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000).
+ addTimeoutSinceFirstMessage(60000)));
}
);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/adcd2667/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
index 5e710b2..33ae9c9 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
@@ -25,7 +25,7 @@ import org.apache.samza.system.SystemStreamPartition;
/**
- * Example input message w/ avro message body and string as the key.
+ * Example input message w/ Json message body and string as the key.
*/
public class InputJsonSystemMessage<T> implements Message<String, T>, InputSystemMessage<Offset> {