You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/05/09 02:24:19 UTC
samza git commit: SAMZA-1268: Javadoc cleanup for public APIs for
0.13 release
Repository: samza
Updated Branches:
refs/heads/master 2d47ee804 -> 634e568cd
SAMZA-1268: Javadoc cleanup for public APIs for 0.13 release
This PR cleans up javadocs for Fluent API classes in the samza-api module.
Also updates the TaskContext (existing) and ContextManager (new) interfaces to add support for setting an pass-through user-defined context.
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jacob Maes <jm...@linkedin.com>
Closes #169 from prateekm/api-docs-cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/634e568c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/634e568c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/634e568c
Branch: refs/heads/master
Commit: 634e568cd38851cfeb376c965056fd7f3f23bc5b
Parents: 2d47ee8
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Mon May 8 19:24:03 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Mon May 8 19:24:03 2017 -0700
----------------------------------------------------------------------
checkstyle/checkstyle-suppressions.xml | 2 +
.../samza/application/StreamApplication.java | 71 +++++++-----
.../apache/samza/operators/ContextManager.java | 24 ++--
.../apache/samza/operators/MessageStream.java | 54 +++++----
.../org/apache/samza/operators/StreamGraph.java | 24 ++--
.../operators/functions/FilterFunction.java | 6 +-
.../operators/functions/FlatMapFunction.java | 5 +-
.../operators/functions/FoldLeftFunction.java | 9 +-
.../operators/functions/InitableFunction.java | 8 +-
.../samza/operators/functions/JoinFunction.java | 15 +--
.../samza/operators/functions/MapFunction.java | 5 +-
.../samza/operators/functions/SinkFunction.java | 3 +-
.../apache/samza/operators/windows/Window.java | 44 ++++---
.../apache/samza/operators/windows/Windows.java | 114 +++++++++++--------
.../java/org/apache/samza/task/TaskContext.java | 15 ++-
.../apache/samza/operators/StreamGraphImpl.java | 2 +-
.../apache/samza/task/StreamOperatorTask.java | 12 +-
.../apache/samza/container/TaskInstance.scala | 9 ++
.../samza/operators/TestStreamGraphImpl.java | 28 -----
19 files changed, 247 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/checkstyle/checkstyle-suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle-suppressions.xml b/checkstyle/checkstyle-suppressions.xml
index a88b341..d227929 100644
--- a/checkstyle/checkstyle-suppressions.xml
+++ b/checkstyle/checkstyle-suppressions.xml
@@ -30,5 +30,7 @@
<suppress checks="ConstantName"
files="ApplicationStatus.java"
lines="26-29"/>
+ <suppress checks="UnusedImports"
+ files="StreamApplication.java"/>
</suppressions>
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
index a26c5af..0d77295 100644
--- a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
@@ -20,54 +20,69 @@ package org.apache.samza.application;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-
+import org.apache.samza.operators.functions.InitableFunction;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
/**
- * This interface defines a template for stream application that user will implement to initialize operator DAG in {@link StreamGraph}.
- *
+ * Describes and initializes the transforms for processing message streams and generating results.
* <p>
- * User program implements {@link StreamApplication#init(StreamGraph, Config)} method to initialize the transformation logic
- * from all input streams to output streams. A simple user code example is shown below:
- * </p>
- *
+ * The following example removes page views older than 1 hour from the input stream:
* <pre>{@code
- * public class PageViewCounterExample implements StreamApplication {
- * // max timeout is 60 seconds
- * private static final MAX_TIMEOUT = 60000;
- *
+ * public class PageViewCounter implements StreamApplication {
* public void init(StreamGraph graph, Config config) {
- * MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
- * OutputStream<String, PageViewEvent, PageViewEvent> pageViewEventFilteredStream = graph
- * .getOutputStream("pageViewEventFiltered", m -> m.memberId, m -> m);
+ * MessageStream<PageViewEvent> pageViewEvents =
+ * graph.getInputStream("pageViewEvents", (k, m) -> (PageViewEvent) m);
+ * OutputStream<String, PageViewEvent, PageViewEvent> recentPageViewEvents =
+ * graph.getOutputStream("recentPageViewEvents", m -> m.memberId, m -> m);
*
* pageViewEvents
- * .filter(m -> !(m.getMessage().getEventTime() < System.currentTimeMillis() - MAX_TIMEOUT))
- * .sendTo(pageViewEventFilteredStream);
+ * .filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis())
+ * .sendTo(filteredPageViewEvents);
* }
- *
- * // local execution mode
+ * }
+ * }</pre>
+ *<p>
+ * The example above can be run using an ApplicationRunner:
+ * <pre>{@code
* public static void main(String[] args) {
* CommandLine cmdLine = new CommandLine();
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * PageViewCounterExample userApp = new PageViewCounterExample();
- * ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
- * localRunner.run(userApp);
+ * PageViewCounter app = new PageViewCounter();
+ * LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ * runner.run(app);
+ * runner.waitForFinish();
* }
- *
- * }
* }</pre>
- *
+ * <p>
+ * Implementation Notes: Currently StreamApplications are wrapped in a {@link StreamTask} during execution.
+ * A new StreamApplication instance will be created and initialized when planning the execution, as well as for each
+ * {@link StreamTask} instance used for processing incoming messages. Execution is synchronous and thread-safe
+ * within each {@link StreamTask}.
*/
@InterfaceStability.Unstable
public interface StreamApplication {
/**
- * Users are required to implement this abstract method to initialize the processing logic of the application, in terms
- * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
+ * Describes and initializes the transforms for processing message streams and generating results.
+ * <p>
+ * The {@link StreamGraph} provides access to input and output streams. Input {@link MessageStream}s can be
+ * transformed into other {@link MessageStream}s or sent to an {@link OutputStream} using the {@link MessageStream}
+ * operators.
+ * <p>
+ * Most operators accept custom functions for doing the transformations. These functions are {@link InitableFunction}s
+ * and are provided the {@link Config} and {@link TaskContext} during their own initialization. The config and the
+ * context can be used, for example, to create custom metrics or access durable state stores.
+ * <p>
+ * A shared context between {@link InitableFunction}s for different operators within a task instance can be set
+ * up by providing a {@link ContextManager} using {@link StreamGraph#withContextManager}.
*
- * @param graph an empty {@link StreamGraph} object to be initialized
- * @param config the {@link Config} of the application
+ * @param graph the {@link StreamGraph} to get input/output streams from
+ * @param config the configuration for the application
*/
void init(StreamGraph graph, Config config);
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
index c3b1cf3..5f2c020 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
@@ -24,24 +24,26 @@ import org.apache.samza.task.TaskContext;
/**
- * Interface class defining methods to initialize and finalize the context used by the transformation functions.
+ * Manages custom context that is shared across multiple operator functions in a task.
*/
@InterfaceStability.Unstable
public interface ContextManager {
+
/**
- * The initialization method to create shared context for the whole task in Samza. Default to NO-OP
+ * Allows initializing and setting a custom context that is shared across multiple operator functions in a task.
+ * <p>
+ * This method is invoked before any {@link org.apache.samza.operators.functions.InitableFunction}s are initialized.
+ * Use {@link TaskContext#setUserContext(Object)} to set the context here and {@link TaskContext#getUserContext()} to
+ * get it in InitableFunctions.
*
- * @param config the configuration object for the task
- * @param context the {@link TaskContext} object
- * @return User-defined task-wide context object
+ * @param config the {@link Config} for the application
+ * @param context the {@link TaskContext} for this task
*/
- default TaskContext initTaskContext(Config config, TaskContext context) {
- return context;
- }
+ void init(Config config, TaskContext context);
/**
- * The finalize method to allow users to close resource initialized in {@link #initTaskContext} method. Default to NO-OP.
- *
+ * Allows closing the custom context that is shared across multiple operator functions in a task.
*/
- default void finalizeTaskContext() { }
+ void close();
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 91ef44c..7e7a537 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -33,11 +33,11 @@ import java.util.function.Function;
/**
- * Represents a stream of messages.
+ * A stream of messages that can be transformed into another {@link MessageStream}.
* <p>
- * A {@link MessageStream} can be transformed into another {@link MessageStream} by applying the transforms in this API.
+ * A {@link MessageStream} corresponding to an input stream can be obtained using {@link StreamGraph#getInputStream}.
*
- * @param <M> type of messages in this stream
+ * @param <M> the type of messages in this stream
*/
@InterfaceStability.Unstable
public interface MessageStream<M> {
@@ -47,46 +47,49 @@ public interface MessageStream<M> {
* transformed {@link MessageStream}.
*
* @param mapFn the function to transform a message to another message
- * @param <TM> the type of messages in the transformed {@link MessageStream}
+ * @param <OM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
- <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn);
+ <OM> MessageStream<OM> map(MapFunction<? super M, ? extends OM> mapFn);
/**
* Applies the provided 1:n function to transform a message in this {@link MessageStream}
* to n messages in the transformed {@link MessageStream}
*
* @param flatMapFn the function to transform a message to zero or more messages
- * @param <TM> the type of messages in the transformed {@link MessageStream}
+ * @param <OM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
- <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn);
+ <OM> MessageStream<OM> flatMap(FlatMapFunction<? super M, ? extends OM> flatMapFn);
/**
* Applies the provided function to messages in this {@link MessageStream} and returns the
- * transformed {@link MessageStream}.
+ * filtered {@link MessageStream}.
* <p>
* The {@link Function} is a predicate which determines whether a message in this {@link MessageStream}
- * should be retained in the transformed {@link MessageStream}.
+ * should be retained in the filtered {@link MessageStream}.
*
- * @param filterFn the predicate to filter messages from this {@link MessageStream}
+ * @param filterFn the predicate to filter messages from this {@link MessageStream}.
* @return the transformed {@link MessageStream}
*/
MessageStream<M> filter(FilterFunction<? super M> filterFn);
/**
* Allows sending messages in this {@link MessageStream} to an output system using the provided {@link SinkFunction}.
- *
- * NOTE: If the output is for a {@link org.apache.samza.system.SystemStream}, use
- * {@link #sendTo(OutputStream)} instead. This transform should only be used to output to
- * non-stream systems (e.g., an external database).
+ * <p>
+ * Offers more control over processing and sending messages than {@link #sendTo(OutputStream)} since
+ * the {@link SinkFunction} has access to the {@link org.apache.samza.task.MessageCollector} and
+ * {@link org.apache.samza.task.TaskCoordinator}.
+ * <p>
+ * This can also be used to send output to a system (e.g. a database) that doesn't have a corresponding
+ * Samza SystemProducer implementation.
*
* @param sinkFn the function to send messages in this stream to an external system
*/
void sink(SinkFunction<? super M> sinkFn);
/**
- * Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}.
+ * Allows sending messages in this {@link MessageStream} to an {@link OutputStream}.
*
* @param outputStream the output stream to send messages to
* @param <K> the type of key in the outgoing message
@@ -100,6 +103,8 @@ public interface MessageStream<M> {
* {@link WindowPane}s.
* <p>
* Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
+ * <p>
+ * <b>Note:</b> As of version 0.13.0, messages in windows are kept in memory and may be lost in case of failures.
*
* @param window the window to group and process messages from this {@link MessageStream}
* @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
@@ -110,23 +115,27 @@ public interface MessageStream<M> {
<K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window);
/**
- * Joins this {@link MessageStream} with another {@link MessageStream} using the provided pairwise {@link JoinFunction}.
+ * Joins this {@link MessageStream} with another {@link MessageStream} using the provided
+ * pairwise {@link JoinFunction}.
* <p>
- * Messages in each stream are retained (currently, in memory) for the provided {@code ttl} and join results are
+ * Messages in each stream are retained for the provided {@code ttl} and join results are
* emitted as matches are found.
+ * <p>
+ * <b>Note:</b> As of version 0.13.0, messages in joins are kept in memory and may be lost in case of failures.
*
* @param otherStream the other {@link MessageStream} to be joined with
* @param joinFn the function to join messages from this and the other {@link MessageStream}
* @param ttl the ttl for messages in each stream
* @param <K> the type of join key
- * @param <OM> the type of messages in the other stream
- * @param <TM> the type of messages resulting from the {@code joinFn}
+ * @param <JM> the type of messages in the other stream
+ * @param <OM> the type of messages resulting from the {@code joinFn}
* @return the joined {@link MessageStream}
*/
- <K, OM, TM> MessageStream<TM> join(MessageStream<OM> otherStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends TM> joinFn, Duration ttl);
+ <K, JM, OM> MessageStream<OM> join(MessageStream<JM> otherStream,
+ JoinFunction<? extends K, ? super M, ? super JM, ? extends OM> joinFn, Duration ttl);
/**
- * Merge all {@code otherStreams} with this {@link MessageStream}.
+ * Merges all {@code otherStreams} with this {@link MessageStream}.
*
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
* @return the merged {@link MessageStream}
@@ -136,6 +145,9 @@ public interface MessageStream<M> {
/**
* Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes
* them as an input {@link MessageStream} again. Uses keys returned by the {@code keyExtractor} as the partition key.
+ * <p>
+ * <b>Note</b>: Repartitioned streams are created automatically in the default system. The key and message Serdes
+ * configured for the default system must be able to serialize and deserialize types K and M respectively.
*
* @param keyExtractor the {@link Function} to extract the output message key and partition key from
* the input message
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index d299068..ea6721b 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -24,16 +24,17 @@ import java.util.function.BiFunction;
import java.util.function.Function;
/**
- * Provides APIs for accessing {@link MessageStream}s to be used to create the DAG of transforms.
+ * Provides access to {@link MessageStream}s and {@link OutputStream}s used to describe the processing logic.
*/
@InterfaceStability.Unstable
public interface StreamGraph {
/**
- * Gets the input {@link MessageStream} corresponding to the logical {@code streamId}. Multiple invocations of
- * this method with the same {@code streamId} will throw an {@link IllegalStateException}
+ * Gets the input {@link MessageStream} corresponding to the {@code streamId}.
+ * <p>
+ * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
*
- * @param streamId the unique logical ID for the stream
+ * @param streamId the unique ID for the stream
* @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
* in the input {@link MessageStream}
* @param <K> the type of key in the incoming message
@@ -45,10 +46,11 @@ public interface StreamGraph {
<K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder);
/**
- * Gets the {@link OutputStream} corresponding to the logical {@code streamId}. Multiple invocations of
- * this method with the same {@code streamId} will throw an {@link IllegalStateException}
+ * Gets the {@link OutputStream} corresponding to the {@code streamId}.
+ * <p>
+ * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
*
- * @param streamId the unique logical ID for the stream
+ * @param streamId the unique ID for the stream
* @param keyExtractor the {@link Function} to extract the outgoing key from the output message
* @param msgExtractor the {@link Function} to extract the outgoing message from the output message
* @param <K> the type of key in the outgoing message
@@ -62,12 +64,12 @@ public interface StreamGraph {
/**
* Sets the {@link ContextManager} for this {@link StreamGraph}.
- *
- * The provided {@code contextManager} will be initialized before the transformation functions
- * and can be used to setup shared context between them.
+ * <p>
+ * The provided {@link ContextManager} can be used to setup shared context between the operator functions
+ * within a task instance
*
* @param contextManager the {@link ContextManager} to use for the {@link StreamGraph}
- * @return the {@link StreamGraph} with the {@code contextManager} as its {@link ContextManager}
+ * @return the {@link StreamGraph} with {@code contextManager} set as its {@link ContextManager}
*/
StreamGraph withContextManager(ContextManager contextManager);
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
index 143bae0..cd49d1b 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
@@ -22,7 +22,8 @@ import org.apache.samza.annotation.InterfaceStability;
/**
- * A function that specifies whether a message should be retained for further processing or filtered out.
+ * Specifies whether a message should be retained for further processing.
+ *
* @param <M> type of the input message
*/
@InterfaceStability.Unstable
@@ -31,7 +32,8 @@ public interface FilterFunction<M> extends InitableFunction {
/**
* Returns a boolean indicating whether this message should be retained or filtered out.
- * @param message the input message to be checked. This object should not be mutated.
+ *
+ * @param message the input message to be checked
* @return true if {@code message} should be retained
*/
boolean apply(M message);
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
index bbbddeb..e6c4958 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
@@ -24,8 +24,8 @@ import java.util.Collection;
/**
- * A function that transforms an input message into a collection of 0 or more messages,
- * possibly of a different type.
+ * Transforms an input message into a collection of 0 or more messages, possibly of a different type.
+ *
* @param <M> type of the input message
* @param <OM> type of the transformed messages
*/
@@ -35,6 +35,7 @@ public interface FlatMapFunction<M, OM> extends InitableFunction {
/**
* Transforms the provided message into a collection of 0 or more messages.
+ *
* @param message the input message to be transformed
* @return a collection of 0 or more transformed messages
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
index 58e88fd..25728fc 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
@@ -20,16 +20,15 @@
package org.apache.samza.operators.functions;
/**
- * A fold function that incrementally combines and aggregates values for a window.
+ * Incrementally updates the window value as messages are added to the window.
*/
public interface FoldLeftFunction<M, WV> extends InitableFunction {
/**
- * Incrementally combine and aggregate values for the window. Guaranteed to be invoked for every
- * message added to the window.
+ * Incrementally updates the window value as messages are added to the window.
*
- * @param message the incoming message that is added to the window. This object should not be mutated.
- * @param oldValue the previous value
+ * @param message the message being added to the window
+ * @param oldValue the previous value associated with the window
* @return the new value
*/
WV apply(M message, WV oldValue);
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
index 2f738da..4f9fad7 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
@@ -24,16 +24,16 @@ import org.apache.samza.task.TaskContext;
/**
- * interface defined to initalize the context of message transformation functions
+ * A function that can be initialized before execution.
*/
@InterfaceStability.Unstable
public interface InitableFunction {
/**
- * Interface method to initialize the context for a specific message transformation function.
+ * Initializes the function before any messages are processed.
*
- * @param config the {@link Config} object for this task
- * @param context the {@link TaskContext} object for this task
+ * @param config the {@link Config} for the application
+ * @param context the {@link TaskContext} for this task
*/
default void init(Config config, TaskContext context) { }
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
index fc38177..f30a47d 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
@@ -22,8 +22,8 @@ import org.apache.samza.annotation.InterfaceStability;
/**
- * A function that joins messages from two {@link org.apache.samza.operators.MessageStream}s and produces
- * a joined message.
+ * Joins incoming messages in two streams by key.
+ *
* @param <K> type of the join key
* @param <M> type of the input message
* @param <JM> type of the message to join with
@@ -33,7 +33,8 @@ import org.apache.samza.annotation.InterfaceStability;
public interface JoinFunction<K, M, JM, RM> extends InitableFunction {
/**
- * Join the provided input messages and produces the joined messages.
+ * Joins the provided messages and returns the joined message.
+ *
* @param message the input message
* @param otherMessage the message to join with
* @return the joined message
@@ -41,17 +42,17 @@ public interface JoinFunction<K, M, JM, RM> extends InitableFunction {
RM apply(M message, JM otherMessage);
/**
- * Method to get the join key in the messages from the first input stream
+ * Get the join key for messages in the first input stream.
*
- * @param message the input message from the first input stream
+ * @param message the message in the first input stream
* @return the join key
*/
K getFirstKey(M message);
/**
- * Method to get the join key in the messages from the second input stream
+ * Get the join key for messages in the second input stream.
*
- * @param message the input message from the second input stream
+ * @param message the message in the second input stream
* @return the join key
*/
K getSecondKey(JM message);
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
index b09fb99..240039f 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -22,7 +22,8 @@ import org.apache.samza.annotation.InterfaceStability;
/**
- * A function that transforms an input message into another message, possibly of a different type.
+ * Transforms an input message into another message, possibly of a different type.
+ *
* @param <M> type of the input message
* @param <OM> type of the transformed message
*/
@@ -33,7 +34,7 @@ public interface MapFunction<M, OM> extends InitableFunction {
/**
* Transforms the provided message into another message.
*
- * @param message the input message to be transformed. This object should not be mutated.
+ * @param message the input message to be transformed
* @return the transformed message
*/
OM apply(M message);
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
index 1d140ee..83aa0a1 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
@@ -24,7 +24,8 @@ import org.apache.samza.task.TaskCoordinator;
/**
- * A function that allows sending a message to an output system.
+ * Allows sending a message to an output system.
+ *
* @param <M> type of the input message
*/
@InterfaceStability.Unstable
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
index 9609292..321cc26 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
@@ -22,21 +22,20 @@ import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.triggers.Trigger;
/**
- * Groups incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite
- * windows for processing.
+ * Groups incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
*
- * <p> A window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
- * that determine when results from the {@link Window} are emitted.
+ * <p> A window is uniquely identified by its {@link WindowKey}. A window can have one or more associated
+ * {@link Trigger}s that determine when results from the {@link Window} are emitted.
*
* <p> Each emitted result contains one or more messages in the window and is called a {@link WindowPane}.
* A pane can include all messages collected for the window so far or only the new messages
* since the last emitted pane. (as determined by the {@link AccumulationMode})
*
- * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
- * has arrived or late triggers that allow handling of late data arrivals.
+ * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the
+ * window has arrived, or late triggers that allow handling late arrivals of data.
*
- * <p> A {@link Window} is defined as "keyed" when the incoming {@link org.apache.samza.operators.MessageStream} is first
- * partitioned based on the provided key, and windowing is applied on the partitioned stream.
+ * <p> A {@link Window} is said to be as "keyed" when the incoming {@link org.apache.samza.operators.MessageStream}
+ * is first grouped based on the provided key, and windowing is applied on the grouped stream.
*
* window wk1 (with its triggers)
* +--------------------------------+
@@ -45,9 +44,9 @@ import org.apache.samza.operators.triggers.Trigger;
* | pane 1 |pane2 | pane3 |
* +-----------+--------+-----------+
*
- -----------------------------------
- *incoming message stream ------+
- -----------------------------------
+ * -----------------------------------
+ * incoming message stream ------+
+ * -----------------------------------
* window wk2
* +---------------------+---------+
* | pane 1| pane 2 | pane 3 |
@@ -62,19 +61,19 @@ import org.apache.samza.operators.triggers.Trigger;
* +----------+-----------+---------+
*
*
- * <p> Use the {@link Windows} APIs to create various windows and the {@link org.apache.samza.operators.triggers.Triggers}
- * APIs to create triggers.
+ * <p> Use {@link Windows} to create various windows and {@link org.apache.samza.operators.triggers.Triggers}
+ * to create their triggers.
*
* @param <M> the type of the input message
- * @param <K> the type of the key in the message in this {@link org.apache.samza.operators.MessageStream}.
- * @param <WV> the type of the value in the {@link WindowPane}.
+ * @param <K> the type of the key in the message
+ * @param <WV> the type of the value in the window
*/
@InterfaceStability.Unstable
public interface Window<M, K, WV> {
/**
* Set the early triggers for this {@link Window}.
- * <p>Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to create instances of {@link Trigger}
+ * <p> Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to create instances of {@link Trigger}
*
* @param trigger the early trigger
* @return the {@link Window} function with the early trigger
@@ -83,7 +82,7 @@ public interface Window<M, K, WV> {
/**
* Set the late triggers for this {@link Window}.
- * <p>Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to create instances of {@link Trigger}
+ * <p> Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to create instances of {@link Trigger}
*
* @param trigger the late trigger
* @return the {@link Window} function with the late trigger
@@ -92,17 +91,16 @@ public interface Window<M, K, WV> {
/**
* Specify how a {@link Window} should process its previously emitted {@link WindowPane}s.
- *
* <p> There are two types of {@link AccumulationMode}s:
* <ul>
- * <li> ACCUMULATING: Specifies that window panes should include all messages collected for the window (key) so far, even if they were
- * included in previously emitted window panes.
- * <li> DISCARDING: Specifies that window panes should only include messages collected for this window (key) since the last emitted
- * window pane.
+ * <li> ACCUMULATING: Specifies that window panes should include all messages collected for the window so far,
+ * even if they were included in previously emitted window panes.
+ * <li> DISCARDING: Specifies that window panes should only include messages collected for this window since
+ * the last emitted window pane.
* </ul>
*
* @param mode the accumulation mode
- * @return the {@link Window} function with the specified {@link AccumulationMode}.
+ * @return the {@link Window} function with {@code mode} set as its {@link AccumulationMode}.
*/
Window<M, K, WV> setAccumulationMode(AccumulationMode mode);
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index a0269cd..bb837f6 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -36,14 +36,14 @@ import java.util.function.Supplier;
/**
* APIs for creating different types of {@link Window}s.
*
- * Groups the incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
+ * Groups incoming messages in a {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
*
- * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
- * that determine when results from the {@link Window} are emitted. Each emitted result contains one or more
- * messages in the window and is called a {@link WindowPane}.
+ * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated
+ * {@link Trigger}s that determine when results from the {@link Window} are emitted. Each emitted result contains one
+ * or more messages in the window and is called a {@link WindowPane}.
*
- * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
- * has arrived or late triggers that allow handling of late data arrivals.
+ * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data
+ * for the window has arrived, or late triggers that allow handling late arrivals of data.
*
* window wk1
* +--------------------------------+
@@ -52,9 +52,9 @@ import java.util.function.Supplier;
* | pane 1 |pane2 | pane3 |
* +-----------+--------+-----------+
*
- -----------------------------------
- *incoming message stream ------+
- -----------------------------------
+ * -----------------------------------
+ * incoming message stream ------+
+ * -----------------------------------
* window wk2
* +---------------------+---------+
* | pane 1| pane 2 | pane 3 |
@@ -72,20 +72,22 @@ import java.util.function.Supplier;
* <p> A {@link Window} can be one of the following types:
* <ul>
* <li>
- * Tumbling Windows: A tumbling window defines a series of non-overlapping, fixed size, contiguous intervals.
+ * Tumbling Window: A tumbling window defines a series of non-overlapping, fixed size, contiguous intervals.
* <li>
- * Session Windows: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions.
+ * Session Window: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions.
* A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
* The boundary for a session is defined by a {@code sessionGap}. All messages that that arrive within
* the gap are grouped into the same session.
- * <li>
- * Global Windows: A global window defines a single infinite window over the entire {@link org.apache.samza.operators.MessageStream}.
- * An early trigger must be specified when defining a global window.
* </ul>
*
- * <p> A {@link Window} is defined as "keyed" when the incoming messages are first grouped based on their key
- * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window
- * types.
+ * <p> A {@link Window} is said to be "keyed" when the incoming messages are first grouped based on their key
+ * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants
+ * of the window types above.
+ *
+ * <p> The value for the window can be updated incrementally by providing an {@code initialValue} {@link Supplier}
+ * and an aggregating {@link FoldLeftFunction}. The initial value supplier is invoked every time a new window is
+ * created. The aggregating function is invoked for each incoming message for the window. If these are not provided,
+ * the emitted {@link WindowPane} will contain a collection of messages in the window.
*
* <p> Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of
* finer granularity are not supported.
@@ -104,7 +106,8 @@ public final class Windows {
* <pre> {@code
* MessageStream<UserClick> stream = ...;
* Function<UserClick, String> keyFn = ...;
- * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
+ * Supplier<Integer> initialValue = () -> 0;
+ * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
* MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
* Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
* }
@@ -112,18 +115,20 @@ public final class Windows {
*
* @param keyFn the function to extract the window key from a message
* @param interval the duration in processing time
- * @param initialValue the initial value to be used for aggregations
- * @param foldFn the function to aggregate messages in the {@link WindowPane}
+ * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created.
+ * @param aggregator the function to incrementally update the window value. Invoked when a new message
+ * arrives for the window.
* @param <M> the type of the input message
* @param <WV> the type of the {@link WindowPane} output value
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function.
*/
- public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval,
- Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) {
+ public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(
+ Function<? super M, ? extends K> keyFn, Duration interval,
+ Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator) {
Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
- return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn,
+ return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
(Function<M, K>) keyFn, null, WindowType.TUMBLING);
}
@@ -148,7 +153,8 @@ public final class Windows {
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function
*/
- public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval) {
+ public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(
+ Function<? super M, ? extends K> keyFn, Duration interval) {
FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
Supplier<Collection<M>> initialValue = ArrayList::new;
@@ -163,23 +169,25 @@ public final class Windows {
*
* <pre> {@code
* MessageStream<String> stream = ...;
- * BiFunction<String, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
+ * Supplier<Integer> initialValue = () -> 0;
+ * FoldLeftFunction<String, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
* MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
* Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
* }
* </pre>
*
- * @param duration the duration in processing time
- * @param initialValue the initial value to be used for aggregations
- * @param foldFn to aggregate messages in the {@link WindowPane}
+ * @param interval the duration in processing time
+ * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created.
+ * @param aggregator the function to incrementally update the window value. Invoked when a new message
+ * arrives for the window.
* @param <M> the type of the input message
* @param <WV> the type of the {@link WindowPane} output value
* @return the created {@link Window} function
*/
- public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<? extends WV> initialValue,
- FoldLeftFunction<? super M, WV> foldFn) {
- Trigger<M> defaultTrigger = new TimeTrigger<>(duration);
- return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn,
+ public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, Supplier<? extends WV> initialValue,
+ FoldLeftFunction<? super M, WV> aggregator) {
+ Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
+ return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
null, null, WindowType.TUMBLING);
}
@@ -191,10 +199,12 @@ public final class Windows {
*
* <pre> {@code
* MessageStream<Long> stream = ...;
- * Function<Collection<Long, Long>> percentile99 = ..
+ * Function<Collection<Long>, Long> percentile99 = ..
*
- * MessageStream<WindowPane<Void, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
- * MessageStream<Long> windowedPercentiles = windowed.map(windowedOutput -> percentile99(windowedOutput.getMessage());
+ * MessageStream<WindowPane<Void, Collection<Long>>> windowedStream =
+ * integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
+ * MessageStream<Long> windowedPercentiles =
+ * windowedStream.map(windowPane -> percentile99(windowPane.getMessage());
* }
* </pre>
*
@@ -210,18 +220,19 @@ public final class Windows {
}
/**
- * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}
- * and applies the provided fold function to them.
+ * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided
+ * {@code sessionGap} and applies the provided fold function to them.
*
* <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
- * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages that arrive within
- * the gap are grouped into the same session.
+ * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages
+ * that arrive within the gap are grouped into the same session.
*
* <p>The below example computes the maximum value per-key over a session window of gap 10 seconds.
*
* <pre> {@code
* MessageStream<UserClick> stream = ...;
- * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
+ * Supplier<Integer> initialValue = () -> 0;
+ * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
* Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
* MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
* Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
@@ -230,22 +241,25 @@ public final class Windows {
*
* @param keyFn the function to extract the window key from a message
* @param sessionGap the timeout gap for defining the session
- * @param initialValue the initial value to be used for aggregations
- * @param foldFn the function to aggregate messages in the {@link WindowPane}
+ * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created.
+ * @param aggregator the function to incrementally update the window value. Invoked when a new message
+ * arrives for the window.
* @param <M> the type of the input message
* @param <K> the type of the key in the {@link Window}
* @param <WV> the type of the output value in the {@link WindowPane}
* @return the created {@link Window} function
*/
- public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<? super M, ? extends K> keyFn, Duration sessionGap,
- Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) {
+ public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(
+ Function<? super M, ? extends K> keyFn, Duration sessionGap,
+ Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator) {
Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
- return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn, (Function<M, K>) keyFn,
- null, WindowType.SESSION);
+ return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
+ (Function<M, K>) keyFn, null, WindowType.SESSION);
}
/**
- * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}.
+ * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided
+ * {@code sessionGap}.
*
* <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. The
* boundary for the session is defined by a {@code sessionGap}. All messages that that arrive within
@@ -255,7 +269,8 @@ public final class Windows {
*
* <pre> {@code
* MessageStream<UserClick> stream = ...;
- * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
+ * Supplier<Integer> initialValue = () -> 0;
+ * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
* Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
* MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
* Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
@@ -268,7 +283,8 @@ public final class Windows {
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function
*/
- public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<? super M, ? extends K> keyFn, Duration sessionGap) {
+ public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(
+ Function<? super M, ? extends K> keyFn, Duration sessionGap) {
FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index dc5742f..4ef3d30 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -56,11 +56,18 @@ public interface TaskContext {
void setStartingOffset(SystemStreamPartition ssp, String offset);
/**
- * Method to allow user to return customized context
+ * Sets the user-defined context.
*
- * @return user-defined task context object
+ * @param context the user-defined context to set
*/
- default Object getUserDefinedContext() {
+ default void setUserContext(Object context) { }
+
+ /**
+ * Gets the user-defined context.
+ *
+ * @return the user-defined context if set, else null
+ */
+ default Object getUserContext() {
return null;
- };
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 31a75ce..1f1d282 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -56,7 +56,7 @@ public class StreamGraphImpl implements StreamGraph {
private final ApplicationRunner runner;
private final Config config;
- private ContextManager contextManager = new ContextManager() { };
+ private ContextManager contextManager = null;
public StreamGraphImpl(ApplicationRunner runner, Config config) {
// TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of StreamGraphImpl once Systems
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 4720298..b18cf06 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -86,13 +86,15 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
// initialize the user-implemented stream application.
this.streamApplication.init(streamGraph, config);
- // get the user-implemented context manager and initialize the task-specific context.
+ // get the user-implemented context manager and initialize it
this.contextManager = streamGraph.getContextManager();
- TaskContext initializedTaskContext = this.contextManager.initTaskContext(config, context);
+ if (this.contextManager != null) {
+ this.contextManager.init(config, context);
+ }
// create the operator impl DAG corresponding to the logical operator spec DAG
OperatorImplGraph operatorImplGraph = new OperatorImplGraph(clock);
- operatorImplGraph.init(streamGraph, config, initializedTaskContext);
+ operatorImplGraph.init(streamGraph, config, context);
this.operatorImplGraph = operatorImplGraph;
// TODO: SAMZA-1118 - Remove mapping after SystemConsumer starts returning logical streamId with incoming messages
@@ -135,6 +137,8 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
@Override
public void close() throws Exception {
- this.contextManager.finalizeTaskContext();
+ if (this.contextManager != null) {
+ this.contextManager.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index c04776a..84e993b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -64,6 +64,7 @@ class TaskInstance(
val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
val context = new TaskContext {
+ var userContext: Object = null;
def getMetricsRegistry = metrics.registry
def getSystemStreamPartitions = systemStreamPartitions.asJava
def getStore(storeName: String) = if (storageManager != null) {
@@ -80,6 +81,14 @@ class TaskInstance(
val startingOffsets = offsetManager.startingOffsets
offsetManager.startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset))
}
+
+ override def setUserContext(context: Object): Unit = {
+ userContext = context
+ }
+
+ override def getUserContext: Object = {
+ userContext
+ }
}
// store the (ssp -> if this ssp is catched up) mapping. "catched up"
// means the same ssp in other taskInstances have the same offset as
http://git-wip-us.apache.org/repos/asf/samza/blob/634e568c/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 77a8960..666bbb8 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -28,7 +28,6 @@ import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
import org.apache.samza.operators.stream.OutputStreamInternalImpl;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
-import org.apache.samza.task.TaskContext;
import org.junit.Test;
import java.util.function.BiFunction;
@@ -137,33 +136,6 @@ public class TestStreamGraphImpl {
}
@Test
- public void testWithContextManager() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
- // ensure that default is noop
- TaskContext mockContext = mock(TaskContext.class);
- assertEquals(graph.getContextManager().initTaskContext(mockConfig, mockContext), mockContext);
-
- ContextManager testContextManager = new ContextManager() {
- @Override
- public TaskContext initTaskContext(Config config, TaskContext context) {
- return null;
- }
-
- @Override
- public void finalizeTaskContext() {
-
- }
- };
-
- graph.withContextManager(testContextManager);
- assertEquals(graph.getContextManager(), testContextManager);
- }
-
- @Test
public void testGetIntermediateStream() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);