You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/09 10:22:47 UTC
[4/4] samza git commit: SAMZA-1073: top-level fluent API `
SAMZA-1073: top-level fluent API
`
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/373048aa
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/373048aa
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/373048aa
Branch: refs/heads/samza-fluent-api-v1
Commit: 373048aa0a68221af5f6b5589bbe161c972b11a9
Parents: 38b1dc3
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Thu Feb 9 01:56:10 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Feb 9 01:57:05 2017 -0800
----------------------------------------------------------------------
.../apache/samza/operators/ContextManager.java | 47 ++++
.../apache/samza/operators/MessageStream.java | 90 ++++---
.../apache/samza/operators/OutputStream.java | 41 +++
.../org/apache/samza/operators/StreamGraph.java | 118 +++++++++
.../samza/operators/StreamGraphFactory.java | 38 +++
.../samza/operators/StreamOperatorTask.java | 51 ----
.../org/apache/samza/operators/StreamSpec.java | 46 ++++
.../data/IncomingSystemMessageEnvelope.java | 63 -----
.../operators/data/InputMessageEnvelope.java | 63 +++++
.../samza/operators/data/MessageEnvelope.java | 2 +-
.../operators/functions/FilterFunction.java | 23 +-
.../operators/functions/FlatMapFunction.java | 25 +-
.../samza/operators/functions/InitFunction.java | 38 +++
.../samza/operators/functions/JoinFunction.java | 48 +++-
.../functions/KeyValueJoinFunction.java | 44 ++++
.../samza/operators/functions/MapFunction.java | 25 +-
.../samza/operators/functions/SinkFunction.java | 23 +-
.../samza/operators/triggers/AnyTrigger.java | 3 +-
.../samza/operators/triggers/CountTrigger.java | 4 +-
.../operators/triggers/RepeatingTrigger.java | 4 +-
.../triggers/TimeSinceFirstMessageTrigger.java | 3 +-
.../triggers/TimeSinceLastMessageTrigger.java | 4 +-
.../samza/operators/triggers/TimeTrigger.java | 4 +-
.../samza/operators/triggers/Trigger.java | 7 +-
.../samza/operators/triggers/Triggers.java | 41 +--
.../apache/samza/operators/windows/Window.java | 20 +-
.../samza/operators/windows/WindowKey.java | 19 +-
.../samza/operators/windows/WindowPane.java | 9 +-
.../apache/samza/operators/windows/Windows.java | 136 +++++-----
.../windows/internal/WindowInternal.java | 14 +-
.../samza/system/ExecutionEnvironment.java | 73 ++++++
.../java/org/apache/samza/task/TaskContext.java | 10 +
.../data/TestIncomingSystemMessage.java | 2 +-
.../operators/windows/TestWindowOutput.java | 35 ---
.../samza/operators/windows/TestWindowPane.java | 33 +++
.../samza/operators/MessageStreamImpl.java | 151 +++++++----
.../apache/samza/operators/StreamGraphImpl.java | 260 +++++++++++++++++++
.../operators/StreamOperatorAdaptorTask.java | 105 --------
.../functions/PartialJoinFunction.java | 65 +++++
.../samza/operators/impl/OperatorGraph.java | 164 ++++++++++++
.../samza/operators/impl/OperatorImpl.java | 22 +-
.../samza/operators/impl/OperatorImpls.java | 124 ---------
.../operators/impl/PartialJoinOperatorImpl.java | 15 +-
.../samza/operators/impl/RootOperatorImpl.java | 7 +-
.../impl/SessionWindowOperatorImpl.java | 52 ++++
.../samza/operators/impl/SinkOperatorImpl.java | 7 +-
.../operators/impl/StreamOperatorImpl.java | 14 +-
.../operators/impl/WindowOperatorImpl.java | 11 +-
.../samza/operators/spec/OperatorSpec.java | 39 ++-
.../samza/operators/spec/OperatorSpecs.java | 161 +++++++++---
.../operators/spec/PartialJoinOperatorSpec.java | 58 +++--
.../samza/operators/spec/SinkOperatorSpec.java | 70 ++++-
.../operators/spec/StreamOperatorSpec.java | 58 +++--
.../operators/spec/WindowOperatorSpec.java | 41 ++-
.../samza/operators/spec/WindowState.java | 16 +-
.../system/SingleJobExecutionEnvironment.java | 37 +++
.../system/StandaloneExecutionEnvironment.java | 41 +++
.../apache/samza/task/StreamOperatorTask.java | 108 ++++++++
.../apache/samza/example/BroadcastGraph.java | 121 +++++++++
.../org/apache/samza/example/JoinGraph.java | 118 +++++++++
.../samza/example/KeyValueStoreExample.java | 184 +++++++++++++
.../samza/example/NoContextStreamExample.java | 156 +++++++++++
.../samza/example/OrderShipmentJoinExample.java | 190 ++++++++++++++
.../samza/example/PageViewCounterExample.java | 133 ++++++++++
.../samza/example/RepartitionExample.java | 145 +++++++++++
.../samza/example/TestFluentStreamTasks.java | 99 +++++++
.../org/apache/samza/example/WindowGraph.java | 87 +++++++
.../apache/samza/operators/BroadcastTask.java | 96 -------
.../org/apache/samza/operators/JoinTask.java | 77 ------
.../operators/TestFluentStreamAdaptorTask.java | 85 ------
.../samza/operators/TestFluentStreamTasks.java | 112 --------
.../samza/operators/TestMessageStreamImpl.java | 52 ++--
.../operators/TestMessageStreamImplUtil.java | 26 ++
.../org/apache/samza/operators/WindowTask.java | 63 -----
.../samza/operators/impl/TestOperatorImpls.java | 94 +++++--
.../operators/impl/TestSinkOperatorImpl.java | 11 +-
.../operators/impl/TestStreamOperatorImpl.java | 20 +-
.../samza/operators/spec/TestOperatorSpecs.java | 65 +++--
78 files changed, 3455 insertions(+), 1311 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
new file mode 100644
index 0000000..c3b1cf3
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * Interface class defining methods to initialize and finalize the context used by the transformation functions.
+ */
+@InterfaceStability.Unstable
+public interface ContextManager {
+ /**
+ * The initialization method to create shared context for the whole task in Samza. Default to NO-OP
+ *
+ * @param config the configuration object for the task
+ * @param context the {@link TaskContext} object
+ * @return User-defined task-wide context object
+ */
+ default TaskContext initTaskContext(Config config, TaskContext context) {
+ return context;
+ }
+
+ /**
+ * The finalize method to allow users to close resource initialized in {@link #initTaskContext} method. Default to NO-OP.
+ *
+ */
+ default void finalizeTaskContext() { }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 6a2f95b..87a9fd3 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -19,7 +19,6 @@
package org.apache.samza.operators;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
@@ -29,73 +28,93 @@ import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import java.util.Collection;
+import java.util.function.Function;
/**
- * Represents a stream of {@link MessageEnvelope}s.
+ * Represents a stream of messages.
* <p>
* A {@link MessageStream} can be transformed into another {@link MessageStream} by applying the transforms in this API.
*
- * @param <M> type of {@link MessageEnvelope}s in this stream
+ * @param <M> type of messages in this stream
*/
@InterfaceStability.Unstable
-public interface MessageStream<M extends MessageEnvelope> {
+public interface MessageStream<M> {
/**
- * Applies the provided 1:1 {@link MapFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the
+ * Applies the provided 1:1 {@link Function} to messages in this {@link MessageStream} and returns the
* transformed {@link MessageStream}.
*
- * @param mapFn the function to transform a {@link MessageEnvelope} to another {@link MessageEnvelope}
- * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream}
+ * @param mapFn the function to transform a message to another message
+ * @param <TM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
- <TM extends MessageEnvelope> MessageStream<TM> map(MapFunction<M, TM> mapFn);
+ <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn);
/**
- * Applies the provided 1:n {@link FlatMapFunction} to transform a {@link MessageEnvelope} in this {@link MessageStream}
- * to n {@link MessageEnvelope}s in the transformed {@link MessageStream}
+ * Applies the provided 1:n {@link Function} to transform a message in this {@link MessageStream}
+ * to n messages in the transformed {@link MessageStream}
*
- * @param flatMapFn the function to transform a {@link MessageEnvelope} to zero or more {@link MessageEnvelope}s
- * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream}
+ * @param flatMapFn the function to transform a message to zero or more messages
+ * @param <TM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
- <TM extends MessageEnvelope> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);
+ <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);
/**
- * Applies the provided {@link FilterFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the
+ * Applies the provided {@link Function} to messages in this {@link MessageStream} and returns the
* transformed {@link MessageStream}.
* <p>
- * The {@link FilterFunction} is a predicate which determines whether a {@link MessageEnvelope} in this {@link MessageStream}
+ * The {@link Function} is a predicate which determines whether a message in this {@link MessageStream}
* should be retained in the transformed {@link MessageStream}.
*
- * @param filterFn the predicate to filter {@link MessageEnvelope}s from this {@link MessageStream}
+ * @param filterFn the predicate to filter messages from this {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
MessageStream<M> filter(FilterFunction<M> filterFn);
/**
- * Allows sending {@link MessageEnvelope}s in this {@link MessageStream} to an output
- * {@link org.apache.samza.system.SystemStream} using the provided {@link SinkFunction}.
+ * Allows sending messages in this {@link MessageStream} to an output using the provided {@link SinkFunction}.
*
- * @param sinkFn the function to send {@link MessageEnvelope}s in this stream to output systems
+ * NOTE: the output may not be a {@link org.apache.samza.system.SystemStream}. It can be an external database, etc.
+ *
+ * @param sinkFn the function to send messages in this stream to output
*/
void sink(SinkFunction<M> sinkFn);
/**
- * Groups and processes the {@link MessageEnvelope}s in this {@link MessageStream} according to the provided {@link Window}
+ * Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}.
+ *
+ * NOTE: the {@code stream} has to be a {@link MessageStream}.
+ *
+ * @param stream the output {@link MessageStream}
+ */
+ void sendTo(OutputStream<M> stream);
+
+ /**
+ * Allows sending messages to an intermediate {@link MessageStream}.
+ *
+ * NOTE: the {@code stream} has to be a {@link MessageStream}.
+ *
+ * @param stream the intermediate {@link MessageStream} to send the message to
+ * @return the intermediate {@link MessageStream} to consume the messages sent
+ */
+ MessageStream<M> sendThrough(OutputStream<M> stream);
+
+ /**
+ * Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
* (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of
* {@link WindowPane}s.
* <p>
* Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
*
- * @param window the window to group and process {@link MessageEnvelope}s from this {@link MessageStream}
- * @param <K> the type of key in the {@link MessageEnvelope} in this {@link MessageStream}. If a key is specified,
+ * @param window the window to group and process messages from this {@link MessageStream}
+ * @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
* panes are emitted per-key.
* @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream}
- * @param <WM> the type of {@link WindowPane} in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
- <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window(Window<M, K, WV, WM> window);
+ <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window);
/**
* Joins this {@link MessageStream} with another {@link MessageStream} using the provided pairwise {@link JoinFunction}.
@@ -103,23 +122,32 @@ public interface MessageStream<M extends MessageEnvelope> {
* We currently only support 2-way joins.
*
* @param otherStream the other {@link MessageStream} to be joined with
- * @param joinFn the function to join {@link MessageEnvelope}s from this and the other {@link MessageStream}
+ * @param joinFn the function to join messages from this and the other {@link MessageStream}
* @param <K> the type of join key
- * @param <OM> the type of {@link MessageEnvelope}s in the other stream
- * @param <RM> the type of {@link MessageEnvelope}s resulting from the {@code joinFn}
+ * @param <OM> the type of messages in the other stream
+ * @param <RM> the type of messages resulting from the {@code joinFn}
* @return the joined {@link MessageStream}
*/
- <K, OM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(MessageStream<OM> otherStream,
- JoinFunction<M, OM, RM> joinFn);
+ <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn);
/**
* Merge all {@code otherStreams} with this {@link MessageStream}.
* <p>
- * The merging streams must have the same {@link MessageEnvelope} type {@code M}.
+ * The merging streams must have the same messages of type {@code M}.
*
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
* @return the merged {@link MessageStream}
*/
MessageStream<M> merge(Collection<MessageStream<M>> otherStreams);
-
+
+ /**
+ * Send the input message to an output {@link org.apache.samza.system.SystemStream} and consume it as input {@link MessageStream} again.
+ *
+ * Note: this is an transform function only used in logic DAG. In a physical DAG, this is either translated to a NOOP function, or a {@code MessageStream#sendThrough} function.
+ *
+ * @param parKeyExtractor a {@link Function} that extract the partition key from a message in this {@link MessageStream}
+ * @param <K> the type of partition key
+ * @return a {@link MessageStream} object after the re-partition
+ */
+ <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
new file mode 100644
index 0000000..179f0e7
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.SinkFunction;
+
+
+/**
+ * The interface class defining the specific {@link SinkFunction} for a system {@link OutputStream}.
+ *
+ * @param <M> The type of message to be send to this output stream
+ */
+@InterfaceStability.Unstable
+public interface OutputStream<M> {
+
+ /**
+ * Returns the specific {@link SinkFunction} for this output stream. The {@link OutputStream} is created
+ * via {@link StreamGraph#createOutStream(StreamSpec, Serde, Serde)} or {@link StreamGraph#createIntStream(StreamSpec, Serde, Serde)}.
+ * Hence, the proper types of serdes for key and value are instantiated and are used in the {@link SinkFunction} returned.
+ *
+ * @return The pre-defined {@link SinkFunction} to apply proper serdes before sending the message to the output stream.
+ */
+ SinkFunction<M> getSinkFunction();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
new file mode 100644
index 0000000..9e6644b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.serializers.Serde;
+
+import java.util.Map;
+
+
+/**
+ * Job-level programming interface to create an operator DAG and run in various different runtime environments.
+ */
+@InterfaceStability.Unstable
+public interface StreamGraph {
+ /**
+ * Method to add an input {@link MessageStream} from the system
+ *
+ * @param streamSpec the {@link StreamSpec} describing the physical characteristics of the input {@link MessageStream}
+ * @param keySerde the serde used to serialize/deserialize the message key from the input {@link MessageStream}
+ * @param msgSerde the serde used to serialize/deserialize the message body from the input {@link MessageStream}
+ * @param <K> the type of key in the input message
+ * @param <V> the type of message in the input message
+ * @param <M> the type of {@link MessageEnvelope} in the input {@link MessageStream}
+ * @return the input {@link MessageStream} object
+ */
+ <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
+
+ /**
+ * Method to add an output {@link MessageStream} from the system
+ *
+ * @param streamSpec the {@link StreamSpec} describing the physical characteristics of the output {@link MessageStream}
+ * @param keySerde the serde used to serialize/deserialize the message key from the output {@link MessageStream}
+ * @param msgSerde the serde used to serialize/deserialize the message body from the output {@link MessageStream}
+ * @param <K> the type of key in the output message
+ * @param <V> the type of message in the output message
+ * @param <M> the type of {@link MessageEnvelope} in the output {@link MessageStream}
+ * @return the output {@link MessageStream} object
+ */
+ <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
+
+ /**
+ * Method to add an intermediate {@link MessageStream} from the system
+ *
+ * @param streamSpec the {@link StreamSpec} describing the physical characteristics of the intermediate {@link MessageStream}
+ * @param keySerde the serde used to serialize/deserialize the message key from the intermediate {@link MessageStream}
+ * @param msgSerde the serde used to serialize/deserialize the message body from the intermediate {@link MessageStream}
+ * @param <K> the type of key in the intermediate message
+ * @param <V> the type of message in the intermediate message
+ * @param <M> the type of {@link MessageEnvelope} in the intermediate {@link MessageStream}
+ * @return the intermediate {@link MessageStream} object
+ */
+ <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
+
+ /**
+ * Method to get the input {@link MessageStream}s
+ *
+ * @return the input {@link MessageStream}
+ */
+ Map<StreamSpec, MessageStream> getInStreams();
+
+ /**
+ * Method to get the {@link OutputStream}s
+ *
+ * @return the map of all {@link OutputStream}s
+ */
+ Map<StreamSpec, OutputStream> getOutStreams();
+
+ /**
+ * Method to set the {@link ContextManager} for this {@link StreamGraph}
+ *
+ * @param manager the {@link ContextManager} object
+ * @return this {@link StreamGraph} object
+ */
+ StreamGraph withContextManager(ContextManager manager);
+
+ String GRAPH_CONFIG = "job.stream.graph.impl.class";
+ String DEFAULT_GRAPH_IMPL_CLASS = "org.apache.samza.operators.StreamGraphImpl";
+
+ /**
+ * Static method to instantiate the implementation class of {@link StreamGraph}.
+ *
+ * @param config the {@link Config} object for this job
+ * @return the {@link StreamGraph} object created
+ */
+ static StreamGraph fromConfig(Config config) {
+
+ try {
+ if (StreamGraph.class.isAssignableFrom(Class.forName(config.get(GRAPH_CONFIG, DEFAULT_GRAPH_IMPL_CLASS)))) {
+ return (StreamGraph) Class.forName(config.get(GRAPH_CONFIG, DEFAULT_GRAPH_IMPL_CLASS)).newInstance();
+ }
+ } catch (Exception e) {
+ throw new ConfigException(String.format("Problem in loading StreamGraphImpl class %s", config.get(GRAPH_CONFIG)), e);
+ }
+ throw new ConfigException(String.format(
+ "Class %s does not implement interface StreamGraphBuilder properly",
+ config.get(GRAPH_CONFIG)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
new file mode 100644
index 0000000..c292363
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+
+/**
+ * This interface defines a factory class that user will implement to create user-defined operator DAG in a {@link StreamGraph} object.
+ */
+@InterfaceStability.Unstable
+public interface StreamGraphFactory {
+ /**
+ * Users are required to implement this abstract method to initialize the processing logic of the application, in terms
+ * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
+ *
+ * @param config the {@link Config} of the application
+ * @return the {@link StreamGraph} object which contains user-defined processing logic of the application
+ */
+ StreamGraph create(Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
deleted file mode 100644
index 16cf27a..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Map;
-
-
-/**
- * A {@link StreamOperatorTask} is the basic interface to implement for processing {@link MessageStream}s.
- * Implementations can describe the transformation steps for each {@link MessageStream} in the
- * {@link #transform} method using {@link MessageStream} APIs.
- * <p>
- * Implementations may be augmented by implementing {@link org.apache.samza.task.InitableTask},
- * {@link org.apache.samza.task.WindowableTask} and {@link org.apache.samza.task.ClosableTask} interfaces,
- * but should not implement {@link org.apache.samza.task.StreamTask} or {@link org.apache.samza.task.AsyncStreamTask}
- * interfaces.
- */
-@InterfaceStability.Unstable
-public interface StreamOperatorTask {
-
- /**
- * Describe the transformation steps for each {@link MessageStream}s for this task using the
- * {@link MessageStream} APIs. Each {@link MessageStream} corresponds to one {@link SystemStreamPartition}
- * in the input system.
- *
- * @param messageStreams the {@link MessageStream}s that receive {@link IncomingSystemMessageEnvelope}s
- * from their corresponding {@link org.apache.samza.system.SystemStreamPartition}
- */
- void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams);
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
new file mode 100644
index 0000000..c8a5e8d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.system.SystemStream;
+
+import java.util.Properties;
+
+
+/**
+ * This interface defines the specification of a {@link SystemStream}. It will be used by the {@link org.apache.samza.system.SystemAdmin}
+ * to create a {@link SystemStream}
+ */
+@InterfaceStability.Unstable
+public interface StreamSpec {
+ /**
+ * Get the {@link SystemStream}
+ *
+ * @return {@link SystemStream} object
+ */
+ SystemStream getSystemStream();
+
+ /**
+ * Get the physical properties of the {@link SystemStream}
+ *
+ * @return the properties of this stream
+ */
+ Properties getProperties();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
deleted file mode 100644
index a65809c..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition}
- * and its {@link Offset} within the {@link SystemStreamPartition}.
- * <p>
- * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}.
- */
-public class IncomingSystemMessageEnvelope implements MessageEnvelope<Object, Object> {
-
- private final IncomingMessageEnvelope ime;
-
- /**
- * Creates an {@code IncomingSystemMessageEnvelope} from the {@link IncomingMessageEnvelope}.
- *
- * @param ime the {@link IncomingMessageEnvelope} from the input system.
- */
- public IncomingSystemMessageEnvelope(IncomingMessageEnvelope ime) {
- this.ime = ime;
- }
-
- @Override
- public Object getKey() {
- return this.ime.getKey();
- }
-
- @Override
- public Object getMessage() {
- return this.ime.getMessage();
- }
-
- public Offset getOffset() {
- // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
- // assuming incoming message envelope carries long value as offset (i.e. Kafka case)
- return new LongOffset(this.ime.getOffset());
- }
-
- public SystemStreamPartition getSystemStreamPartition() {
- return this.ime.getSystemStreamPartition();
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
new file mode 100644
index 0000000..306145b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition}
+ * and its {@link Offset} within the {@link SystemStreamPartition}.
+ * <p>
+ * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}.
+ */
+public class InputMessageEnvelope implements MessageEnvelope<Object, Object> {
+
+ private final IncomingMessageEnvelope ime;
+
+ /**
+ * Creates an {@code InputMessageEnvelope} from the {@link IncomingMessageEnvelope}.
+ *
+ * @param ime the {@link IncomingMessageEnvelope} from the input system.
+ */
+ public InputMessageEnvelope(IncomingMessageEnvelope ime) {
+ this.ime = ime;
+ }
+
+ @Override
+ public Object getKey() {
+ return this.ime.getKey();
+ }
+
+ @Override
+ public Object getMessage() {
+ return this.ime.getMessage();
+ }
+
+ public Offset getOffset() {
+ // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
+ // assuming incoming message envelope carries long value as offset (i.e. Kafka case)
+ return new LongOffset(this.ime.getOffset());
+ }
+
+ public SystemStreamPartition getSystemStreamPartition() {
+ return this.ime.getSystemStreamPartition();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
index ad64231..703a44c 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
@@ -23,7 +23,7 @@ import org.apache.samza.annotation.InterfaceStability;
/**
- * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s.
+ * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s
*/
@InterfaceStability.Unstable
public interface MessageEnvelope<K, M> {
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
index e611cd0..73c5c9d 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
@@ -19,22 +19,29 @@
package org.apache.samza.operators.functions;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
/**
- * A function that specifies whether a {@link MessageEnvelope} should be retained for further processing or filtered out.
- * @param <M> type of the input {@link MessageEnvelope}
+ * A function that specifies whether a message should be retained for further processing or filtered out.
+ * @param <M> type of the input message
*/
@InterfaceStability.Unstable
-@FunctionalInterface
-public interface FilterFunction<M extends MessageEnvelope> {
+public interface FilterFunction<M> extends InitFunction {
/**
- * Returns a boolean indicating whether this {@link MessageEnvelope} should be retained or filtered out.
- * @param message the {@link MessageEnvelope} to be checked
- * @return true if {@link MessageEnvelope} should be retained
+ * Returns a boolean indicating whether this message should be retained or filtered out.
+ * @param message the input message to be checked
+ * @return true if {@code message} should be retained
*/
boolean apply(M message);
+ /**
+ * Init method to initialize the context for this {@link FilterFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
index dbc0bd9..f8458f2 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
@@ -19,26 +19,33 @@
package org.apache.samza.operators.functions;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
import java.util.Collection;
/**
- * A function that transforms a {@link MessageEnvelope} into a collection of 0 or more {@link MessageEnvelope}s,
+ * A function that transforms an input message into a collection of 0 or more messages,
* possibly of a different type.
- * @param <M> type of the input {@link MessageEnvelope}
- * @param <OM> type of the transformed {@link MessageEnvelope}s
+ * @param <M> type of the input message
+ * @param <OM> type of the transformed messages
*/
@InterfaceStability.Unstable
-@FunctionalInterface
-public interface FlatMapFunction<M extends MessageEnvelope, OM extends MessageEnvelope> {
+public interface FlatMapFunction<M, OM> extends InitFunction {
/**
- * Transforms the provided {@link MessageEnvelope} into a collection of 0 or more {@link MessageEnvelope}s.
- * @param message the {@link MessageEnvelope} to be transformed
- * @return a collection of 0 or more transformed {@link MessageEnvelope}s
+ * Transforms the provided message into a collection of 0 or more messages.
+ * @param message the input message to be transformed
+ * @return a collection of 0 or more transformed messages
*/
Collection<OM> apply(M message);
+ /**
+ * Init method to initialize the context for this {@link FlatMapFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { };
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
new file mode 100644
index 0000000..eec56df
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * interface defined to initalize the context of message transformation functions
+ */
+@InterfaceStability.Unstable
+public interface InitFunction {
+ /**
+ * Interface method to initialize the context for a specific message transformation function.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ void init(Config config, TaskContext context);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
index 8cb1fce..afc92ee 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
@@ -19,26 +19,50 @@
package org.apache.samza.operators.functions;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
/**
- * A function that joins {@link MessageEnvelope}s from two {@link org.apache.samza.operators.MessageStream}s and produces
- * a joined {@link MessageEnvelope}.
- * @param <M> type of the input {@link MessageEnvelope}
- * @param <JM> type of the {@link MessageEnvelope} to join with
- * @param <RM> type of the joined {@link MessageEnvelope}
+ * A function that joins messages from two {@link org.apache.samza.operators.MessageStream}s and produces
+ * a joined message.
+ * @param <K> type of the join key
+ * @param <M> type of the input message
+ * @param <JM> type of the message to join with
+ * @param <RM> type of the joined message
*/
@InterfaceStability.Unstable
-@FunctionalInterface
-public interface JoinFunction<M extends MessageEnvelope, JM extends MessageEnvelope, RM extends MessageEnvelope> {
+public interface JoinFunction<K, M, JM, RM> extends InitFunction {
/**
- * Join the provided {@link MessageEnvelope}s and produces the joined {@link MessageEnvelope}.
- * @param message the input {@link MessageEnvelope}
- * @param otherMessage the {@link MessageEnvelope} to join with
- * @return the joined {@link MessageEnvelope}
+ * Join the provided input messages and produces the joined messages.
+ * @param message the input message
+ * @param otherMessage the message to join with
+ * @return the joined message
*/
RM apply(M message, JM otherMessage);
+ /**
+ * Method to get the join key in the messages from the first input stream
+ *
+ * @param message the input message from the first input stream
+ * @return the join key
+ */
+ K getFirstKey(M message);
+
+ /**
+ * Method to get the join key in the messages from the second input stream
+ *
+ * @param message the input message from the second input stream
+ * @return the join key
+ */
+ K getSecondKey(JM message);
+
+ /**
+ * Init method to initialize the context for this {@link JoinFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
new file mode 100644
index 0000000..b651b3d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+
+/**
+ * A specific {@link JoinFunction} that joins {@link MessageEnvelope}s from two {@link org.apache.samza.operators.MessageStream}s and produces
+ * a joined message.
+ *
+ * @param <K> type of the join key
+ * @param <M> type of the input {@link MessageEnvelope}
+ * @param <JM> type of the {@link MessageEnvelope} to join with
+ * @param <RM> type of the joined message
+ */
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface KeyValueJoinFunction<K, M extends MessageEnvelope<K, ?>, JM extends MessageEnvelope<K, ?>, RM> extends JoinFunction<K, M, JM, RM> {
+
+ default K getFirstKey(M message) {
+ return message.getKey();
+ }
+
+ default K getSecondKey(JM message) {
+ return message.getKey();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
index 04919a7..a051914 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -19,23 +19,30 @@
package org.apache.samza.operators.functions;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
/**
- * A function that transforms a {@link MessageEnvelope} into another {@link MessageEnvelope}, possibly of a different type.
- * @param <M> type of the input {@link MessageEnvelope}
- * @param <OM> type of the transformed {@link MessageEnvelope}
+ * A function that transforms an input message into another message, possibly of a different type.
+ * @param <M> type of the input message
+ * @param <OM> type of the transformed message
*/
@InterfaceStability.Unstable
-@FunctionalInterface
-public interface MapFunction<M extends MessageEnvelope, OM extends MessageEnvelope> {
+public interface MapFunction<M, OM> extends InitFunction {
/**
- * Transforms the provided {@link MessageEnvelope} into another {@link MessageEnvelope}
- * @param message the {@link MessageEnvelope} to be transformed
- * @return the transformed {@link MessageEnvelope}
+ * Transforms the provided message into another message
+ * @param message the input message to be transformed
+ * @return the transformed message
*/
OM apply(M message);
+ /**
+ * Init method to initialize the context for this {@link MapFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
index 505da92..1050771 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
@@ -19,28 +19,35 @@
package org.apache.samza.operators.functions;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
/**
- * A function that allows sending a {@link MessageEnvelope} to an output system.
- * @param <M> type of the input {@link MessageEnvelope}
+ * A function that allows sending a message to an output system.
+ * @param <M> type of the input message
*/
@InterfaceStability.Unstable
-@FunctionalInterface
-public interface SinkFunction<M extends MessageEnvelope> {
+public interface SinkFunction<M> extends InitFunction {
/**
- * Allows sending the provided {@link MessageEnvelope} to an output {@link org.apache.samza.system.SystemStream} using
+ * Allows sending the provided message to an output {@link org.apache.samza.system.SystemStream} using
* the provided {@link MessageCollector}. Also provides access to the {@link TaskCoordinator} to request commits
* or shut the container down.
*
- * @param message the {@link MessageEnvelope} to be sent to an output {@link org.apache.samza.system.SystemStream}
- * @param messageCollector the {@link MessageCollector} to use to send the {@link MessageEnvelope}
+ * @param message the input message to be sent to an output {@link org.apache.samza.system.SystemStream}
+ * @param messageCollector the {@link MessageCollector} to use to send the {@link org.apache.samza.operators.data.MessageEnvelope}
* @param taskCoordinator the {@link TaskCoordinator} to request commits or shutdown
*/
void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator);
+ /**
+ * Init method to initialize the context for this {@link MapFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
index 3ca4e9a..6e134df 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
@@ -18,13 +18,12 @@
*/
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
import java.util.List;
/**
* A {@link Trigger} fires as soon as any of its individual triggers has fired.
*/
-public class AnyTrigger<M extends MessageEnvelope> implements Trigger {
+public class AnyTrigger<M> implements Trigger {
private final List<Trigger> triggers;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
index ba14928..1cf930c 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
@@ -18,13 +18,11 @@
*/
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
-
/**
* A {@link Trigger} that fires when the number of messages in the {@link org.apache.samza.operators.windows.WindowPane}
* reaches the specified count.
*/
-public class CountTrigger<M extends MessageEnvelope> implements Trigger {
+public class CountTrigger<M> implements Trigger {
private final long count;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
index ae9564d..7f78eb8 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
@@ -18,12 +18,10 @@
*/
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
-
/**
* A {@link Trigger} that repeats its underlying trigger forever.
*/
-class RepeatingTrigger<M extends MessageEnvelope> implements Trigger<M> {
+class RepeatingTrigger<M> implements Trigger<M> {
private final Trigger<M> trigger;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
index 13fc3cd..4de60a2 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
@@ -19,7 +19,6 @@
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
import java.time.Duration;
@@ -27,7 +26,7 @@ import java.time.Duration;
* A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in
* the window pane.
*/
-public class TimeSinceFirstMessageTrigger<M extends MessageEnvelope> implements Trigger {
+public class TimeSinceFirstMessageTrigger<M> implements Trigger {
private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
index 0150d86..6b09625 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
@@ -18,14 +18,12 @@
*/
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
-
import java.time.Duration;
/*
* A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration.
*/
-public class TimeSinceLastMessageTrigger<M extends MessageEnvelope> implements Trigger {
+public class TimeSinceLastMessageTrigger<M> implements Trigger {
private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
index ed7fef7..c5875aa 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
@@ -18,14 +18,12 @@
*/
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
-
import java.time.Duration;
/*
* A {@link Trigger} that fires after the specified duration in processing time.
*/
-public class TimeTrigger<M extends MessageEnvelope> implements Trigger {
+public class TimeTrigger<M> implements Trigger {
private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
index 6dc4f43..be0a877 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
@@ -20,15 +20,16 @@
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.annotation.InterfaceStability;
/**
* Marker interface for all triggers. The firing of a trigger indicates the completion of a window pane.
*
* <p> Use the {@link Triggers} APIs to create a {@link Trigger}.
*
- * @param <M> the type of the incoming {@link MessageEnvelope}
+ * @param <M> the type of the incoming message
*/
-public interface Trigger<M extends MessageEnvelope> {
+@InterfaceStability.Unstable
+public interface Trigger<M> {
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
index f27cfd8..97fb7b7 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
@@ -19,7 +19,6 @@
package org.apache.samza.operators.triggers;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
import java.time.Duration;
import java.util.ArrayList;
@@ -35,61 +34,63 @@ import java.util.List;
* <pre> {@code
* MessageStream<> windowedStream = stream.window(Windows.tumblingWindow(Duration.of(10, TimeUnit.SECONDS))
* .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.of(4, TimeUnit.SECONDS))))))
- * .accumulateFiredPanes());
+ * .setAccumulationMode(AccumulationMode.ACCUMULATING));
* }</pre>
*
- * @param <M> the type of input {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream}
*/
@InterfaceStability.Unstable
-public final class Triggers<M extends MessageEnvelope> {
+public final class Triggers {
private Triggers() { }
/**
- * Creates a {@link Trigger} that fires when the number of {@link MessageEnvelope}s in the pane
+ * Creates a {@link Trigger} that fires when the number of messages in the pane
* reaches the specified count.
*
- * @param count the number of {@link MessageEnvelope}s to fire the trigger after
+ * @param count the number of messages to fire the trigger after
+ * @param <M> the type of input message in the window
* @return the created trigger
*/
- public static Trigger count(long count) {
- return new CountTrigger(count);
+ public static <M> Trigger<M> count(long count) {
+ return new CountTrigger<M>(count);
}
/**
- * Creates a trigger that fires after the specified duration has passed since the first {@link MessageEnvelope} in
+ * Creates a trigger that fires after the specified duration has passed since the first message in
* the pane.
*
* @param duration the duration since the first element
+ * @param <M> the type of input message in the window
* @return the created trigger
*/
- public static Trigger timeSinceFirstMessage(Duration duration) {
- return new TimeSinceFirstMessageTrigger(duration);
+ public static <M> Trigger<M> timeSinceFirstMessage(Duration duration) {
+ return new TimeSinceFirstMessageTrigger<M>(duration);
}
/**
- * Creates a trigger that fires when there is no new {@link MessageEnvelope} for the specified duration in the pane.
+ * Creates a trigger that fires when there is no new message for the specified duration in the pane.
*
* @param duration the duration since the last element
+ * @param <M> the type of input message in the window
* @return the created trigger
*/
- public static Trigger timeSinceLastMessage(Duration duration) {
- return new TimeSinceLastMessageTrigger(duration);
+ public static <M> Trigger<M> timeSinceLastMessage(Duration duration) {
+ return new TimeSinceLastMessageTrigger<M>(duration);
}
/**
* Creates a trigger that fires when any of the provided triggers fire.
*
- * @param <M> the type of input {@link MessageEnvelope} in the window
* @param triggers the individual triggers
+ * @param <M> the type of input message in the window
* @return the created trigger
*/
- public static <M extends MessageEnvelope> Trigger any(Trigger<M>... triggers) {
- List<Trigger> triggerList = new ArrayList<>();
+ public static <M> Trigger<M> any(Trigger<M>... triggers) {
+ List<Trigger<M>> triggerList = new ArrayList<>();
for (Trigger trigger : triggers) {
triggerList.add(trigger);
}
- return new AnyTrigger(Collections.unmodifiableList(triggerList));
+ return new AnyTrigger<M>(Collections.unmodifiableList(triggerList));
}
/**
@@ -98,11 +99,11 @@ public final class Triggers<M extends MessageEnvelope> {
* <p>Creating a {@link RepeatingTrigger} from an {@link AnyTrigger} is equivalent to creating an {@link AnyTrigger} from
* its individual {@link RepeatingTrigger}s.
*
- * @param <M> the type of input {@link MessageEnvelope} in the window
* @param trigger the individual trigger to repeat
+ * @param <M> the type of input message in the window
* @return the created trigger
*/
- public static <M extends MessageEnvelope> Trigger repeat(Trigger<M> trigger) {
+ public static <M> Trigger<M> repeat(Trigger<M> trigger) {
return new RepeatingTrigger<>(trigger);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
index 6aae940..8aa665a 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
@@ -19,18 +19,17 @@
package org.apache.samza.operators.windows;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.triggers.Trigger;
/**
- * Groups incoming {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} into finite
+ * Groups incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite
* windows for processing.
*
* <p> A window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
* that determine when results from the {@link Window} are emitted.
*
- * <p> Each emitted result contains one or more {@link MessageEnvelope}s in the window and is called a {@link WindowPane}.
- * A pane can include all {@link MessageEnvelope}s collected for the window so far or only the new {@link MessageEnvelope}s
+ * <p> Each emitted result contains one or more messages in the window and is called a {@link WindowPane}.
+ * A pane can include all messagess collected for the window so far or only the new messages
* since the last emitted pane. (as determined by the {@link AccumulationMode})
*
* <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
@@ -66,13 +65,12 @@ import org.apache.samza.operators.triggers.Trigger;
* <p> Use the {@link Windows} APIs to create various windows and the {@link org.apache.samza.operators.triggers.Triggers}
* APIs to create triggers.
*
- * @param <M> the type of the input {@link MessageEnvelope}
- * @param <K> the type of the key in the {@link MessageEnvelope} in this {@link org.apache.samza.operators.MessageStream}.
+ * @param <M> the type of the input message
+ * @param <K> the type of the key in the message in this {@link org.apache.samza.operators.MessageStream}.
* @param <WV> the type of the value in the {@link WindowPane}.
- * @param <WM> the type of the output.
*/
@InterfaceStability.Unstable
-public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<K, WV>> {
+public interface Window<M, K, WV> {
/**
* Set the early triggers for this {@link Window}.
@@ -81,7 +79,7 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<
* @param trigger the early trigger
* @return the {@link Window} function with the early trigger
*/
- Window<M, K, WV, WM> setEarlyTrigger(Trigger<M> trigger);
+ Window<M, K, WV> setEarlyTrigger(Trigger<M> trigger);
/**
* Set the late triggers for this {@link Window}.
@@ -90,7 +88,7 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<
* @param trigger the late trigger
* @return the {@link Window} function with the late trigger
*/
- Window<M, K, WV, WM> setLateTrigger(Trigger<M> trigger);
+ Window<M, K, WV> setLateTrigger(Trigger<M> trigger);
/**
* Specify how a {@link Window} should process its previously emitted {@link WindowPane}s.
@@ -106,6 +104,6 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<
* @param mode the accumulation mode
* @return the {@link Window} function with the specified {@link AccumulationMode}.
*/
- Window<M, K, WV, WM> setAccumulationMode(AccumulationMode mode);
+ Window<M, K, WV> setAccumulationMode(AccumulationMode mode);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
index 7edf3e1..14bd5ab 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
@@ -21,7 +21,7 @@ package org.apache.samza.operators.windows;
/**
* Key for a {@link WindowPane} emitted from a {@link Window}.
*
- * @param <K> the type of the key in the incoming {@link org.apache.samza.operators.data.MessageEnvelope}.
+ * @param <K> the type of the key in the incoming message.
* Windows that are not keyed have a {@link Void} key type.
*
*/
@@ -29,18 +29,27 @@ public class WindowKey<K> {
private final K key;
- private final String windowId;
+ private final String paneId;
public WindowKey(K key, String windowId) {
this.key = key;
- this.windowId = windowId;
+ this.paneId = windowId;
}
public K getKey() {
return key;
}
- public String getWindowId() {
- return windowId;
+ public String getPaneId() {
+ return paneId;
+ }
+
+ @Override
+ public String toString() {
+ String wndKey = "";
+ if (!(key instanceof Void)) {
+ wndKey = String.format("%s:", key.toString());
+ }
+ return String.format("%s%s", wndKey, paneId);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
index 0388048..3b66bd1 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
@@ -18,16 +18,13 @@
*/
package org.apache.samza.operators.windows;
-import org.apache.samza.operators.data.MessageEnvelope;
-
-
/**
* Specifies the result emitted from a {@link Window}.
*
* @param <K> the type of key in the window pane
* @param <V> the type of value in the window pane.
*/
-public final class WindowPane<K, V> implements MessageEnvelope<WindowKey<K>, V> {
+public final class WindowPane<K, V> {
private final WindowKey<K> key;
@@ -41,11 +38,11 @@ public final class WindowPane<K, V> implements MessageEnvelope<WindowKey<K>, V>
this.mode = mode;
}
- @Override public V getMessage() {
+ public V getMessage() {
return this.value;
}
- @Override public WindowKey<K> getKey() {
+ public WindowKey<K> getKey() {
return this.key;
}