You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/10/04 23:23:05 UTC
[5/5] samza git commit: SAMZA-914: Creating the fluent programming
APIs w/ operators
SAMZA-914: Creating the fluent programming APIs w/ operators
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fbdd76da
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fbdd76da
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fbdd76da
Branch: refs/heads/samza-sql
Commit: fbdd76daada63e52575815737b9e5c1d4e787e71
Parents: f29c614
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Oct 4 14:56:14 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Oct 4 14:56:14 2016 -0700
----------------------------------------------------------------------
build.gradle | 6 +-
gradle/dependency-versions.gradle | 4 +-
.../org/apache/samza/config/TestConfig.java | 7 +-
samza-operator/README.md | 17 +
.../samza/operators/api/MessageStream.java | 162 +++++++
.../samza/operators/api/MessageStreams.java | 80 ++++
.../samza/operators/api/TriggerBuilder.java | 314 +++++++++++++
.../apache/samza/operators/api/WindowState.java | 77 +++
.../org/apache/samza/operators/api/Windows.java | 195 ++++++++
.../apache/samza/operators/api/data/Data.java | 57 +++
.../api/data/IncomingSystemMessage.java | 76 +++
.../operators/api/data/InputSystemMessage.java | 43 ++
.../samza/operators/api/data/LongOffset.java | 75 +++
.../samza/operators/api/data/Message.java | 58 +++
.../apache/samza/operators/api/data/Offset.java | 27 ++
.../apache/samza/operators/api/data/Schema.java | 58 +++
.../samza/operators/api/internal/Operators.java | 468 +++++++++++++++++++
.../samza/operators/api/internal/Trigger.java | 95 ++++
.../samza/operators/api/internal/WindowFn.java | 60 +++
.../operators/api/internal/WindowOutput.java | 55 +++
.../samza/operators/impl/ChainedOperators.java | 73 +++
.../samza/operators/impl/OperatorImpl.java | 92 ++++
.../samza/operators/impl/ProcessorContext.java | 53 +++
.../operators/impl/SimpleOperatorImpl.java | 49 ++
.../samza/operators/impl/SinkOperatorImpl.java | 41 ++
.../samza/operators/impl/StateStoreImpl.java | 56 +++
.../operators/impl/data/avro/AvroData.java | 262 +++++++++++
.../operators/impl/data/avro/AvroSchema.java | 296 ++++++++++++
.../impl/data/serializers/SqlAvroSerde.java | 108 +++++
.../data/serializers/SqlAvroSerdeFactory.java | 40 ++
.../impl/data/serializers/SqlStringSerde.java | 44 ++
.../data/serializers/SqlStringSerdeFactory.java | 33 ++
.../operators/impl/data/string/StringData.java | 101 ++++
.../impl/data/string/StringSchema.java | 73 +++
.../impl/window/SessionWindowImpl.java | 66 +++
.../samza/task/StreamOperatorAdaptorTask.java | 85 ++++
.../apache/samza/task/StreamOperatorTask.java | 42 ++
.../apache/samza/operators/api/TestMessage.java | 47 ++
.../samza/operators/api/TestMessageStreams.java | 35 ++
.../samza/operators/api/TestTriggerBuilder.java | 211 +++++++++
.../apache/samza/operators/api/TestWindows.java | 106 +++++
.../api/data/TestIncomingSystemMessage.java | 53 +++
.../operators/api/data/TestLongOffset.java | 76 +++
.../operators/api/internal/TestOperators.java | 129 +++++
.../operators/api/internal/TestTrigger.java | 62 +++
.../api/internal/TestWindowOutput.java | 36 ++
.../samza/operators/impl/TestOperatorImpl.java | 70 +++
.../samza/operators/impl/TestOutputMessage.java | 47 ++
.../operators/impl/TestProcessorContext.java | 40 ++
.../operators/impl/TestSimpleOperatorImpl.java | 54 +++
.../operators/impl/TestSinkOperatorImpl.java | 45 ++
.../operators/impl/TestStateStoreImpl.java | 69 +++
.../impl/data/serializers/SqlAvroSerdeTest.java | 102 ++++
.../impl/window/TestSessionWindowImpl.java | 132 ++++++
.../samza/task/BroadcastOperatorTask.java | 110 +++++
.../samza/task/InputJsonSystemMessage.java | 63 +++
.../org/apache/samza/task/JoinOperatorTask.java | 79 ++++
.../task/TestStreamOperatorAdaptorTask.java | 79 ++++
.../samza/task/TestStreamOperatorTasks.java | 105 +++++
.../apache/samza/task/WindowOperatorTask.java | 70 +++
.../calcite/schema/TestAvroSchemaConverter.java | 3 +-
samza-sql-core/README.md | 17 -
.../org/apache/samza/sql/api/data/Data.java | 54 ---
.../apache/samza/sql/api/data/EntityName.java | 140 ------
.../org/apache/samza/sql/api/data/Relation.java | 40 --
.../org/apache/samza/sql/api/data/Schema.java | 55 ---
.../org/apache/samza/sql/api/data/Stream.java | 40 --
.../org/apache/samza/sql/api/data/Table.java | 38 --
.../org/apache/samza/sql/api/data/Tuple.java | 75 ---
.../samza/sql/api/operators/Operator.java | 73 ---
.../sql/api/operators/OperatorCallback.java | 70 ---
.../samza/sql/api/operators/OperatorRouter.java | 54 ---
.../samza/sql/api/operators/OperatorSpec.java | 58 ---
.../samza/sql/api/operators/SimpleOperator.java | 34 --
.../sql/api/operators/SqlOperatorFactory.java | 37 --
.../samza/sql/data/IncomingMessageTuple.java | 94 ----
.../apache/samza/sql/data/avro/AvroData.java | 262 -----------
.../apache/samza/sql/data/avro/AvroSchema.java | 296 ------------
.../sql/data/serializers/SqlAvroSerde.java | 109 -----
.../data/serializers/SqlAvroSerdeFactory.java | 40 --
.../sql/data/serializers/SqlStringSerde.java | 45 --
.../data/serializers/SqlStringSerdeFactory.java | 33 --
.../samza/sql/data/string/StringData.java | 101 ----
.../samza/sql/data/string/StringSchema.java | 73 ---
.../operators/factory/NoopOperatorCallback.java | 50 --
.../factory/SimpleOperatorFactoryImpl.java | 51 --
.../operators/factory/SimpleOperatorImpl.java | 136 ------
.../operators/factory/SimpleOperatorSpec.java | 106 -----
.../sql/operators/factory/SimpleRouter.java | 136 ------
.../sql/operators/join/StreamStreamJoin.java | 117 -----
.../operators/join/StreamStreamJoinSpec.java | 38 --
.../sql/operators/partition/PartitionOp.java | 120 -----
.../sql/operators/partition/PartitionSpec.java | 91 ----
.../sql/operators/window/BoundedTimeWindow.java | 161 -------
.../samza/sql/operators/window/WindowSpec.java | 67 ---
.../samza/sql/operators/window/WindowState.java | 44 --
.../sql/window/storage/OrderedStoreKey.java | 26 --
.../org/apache/samza/system/sql/LongOffset.java | 66 ---
.../org/apache/samza/system/sql/Offset.java | 27 --
.../samza/task/sql/RouterMessageCollector.java | 56 ---
.../samza/task/sql/SimpleMessageCollector.java | 114 -----
.../sql/data/serializers/SqlAvroSerdeTest.java | 102 ----
.../task/sql/RandomWindowOperatorTask.java | 96 ----
.../apache/samza/task/sql/StreamSqlTask.java | 104 -----
.../samza/task/sql/UserCallbacksSqlTask.java | 150 ------
settings.gradle | 2 +-
106 files changed, 5265 insertions(+), 3704 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 8fe4c46..aeefd1c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -336,7 +336,7 @@ project(":samza-yarn_$scalaVersion") {
}
if (JavaVersion.current().isJava8Compatible()) {
- project(":samza-sql-core") {
+ project(":samza-operator") {
apply plugin: 'java'
sourceCompatibility = 1.8
@@ -345,7 +345,9 @@ if (JavaVersion.current().isJava8Compatible()) {
compile project(':samza-api')
compile project(":samza-core_$scalaVersion")
compile "commons-collections:commons-collections:$commonsCollectionVersion"
+ compile "org.apache.commons:commons-lang3:$commonsLang3Version"
compile "org.apache.avro:avro:$avroVersion"
+ compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
}
@@ -363,7 +365,7 @@ if (JavaVersion.current().isJava8Compatible()) {
sourceCompatibility = 1.8
dependencies {
- compile project(":samza-sql-core")
+ compile project(":samza-operator")
compile "org.apache.calcite:calcite-core:$calciteVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index c52960c..845a3fd 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -36,7 +36,9 @@
guavaVersion = "17.0"
commonsCodecVersion = "1.9"
commonsCollectionVersion = "3.2.1"
- avroVersion = "1.7.7"
+ avroVersion = "1.4.0"
calciteVersion = "1.2.0-incubating"
httpClientVersion="4.4.1"
+ reactiveStreamVersion="1.0.0"
+ commonsLang3Version="3.4"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
index 5d066c5..7d9d56c 100644
--- a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
+++ b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
@@ -19,12 +19,13 @@
package org.apache.samza.config;
-import static org.junit.Assert.*;
+import org.junit.Test;
-import java.util.Map;
import java.util.HashMap;
+import java.util.Map;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class TestConfig {
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/README.md
----------------------------------------------------------------------
diff --git a/samza-operator/README.md b/samza-operator/README.md
new file mode 100644
index 0000000..15d2092
--- /dev/null
+++ b/samza-operator/README.md
@@ -0,0 +1,17 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+samza-operator is an experimental module that is under development (SAMZA-552).
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
new file mode 100644
index 0000000..a01cee9
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.api;
+
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.api.internal.Operators;
+import org.apache.samza.operators.api.Windows.Window;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+
+/**
+ * This class defines either the input or output streams to/from the operators. Users use the API methods defined here to
+ * directly program the stream processing stages that processes a stream and generate another one.
+ *
+ * @param <M> Type of message in this stream
+ */
+public class MessageStream<M extends Message> {
+
+ /**
+ * Public API methods start here
+ */
+
+ /**
+ * Defines a function API that takes three input parameters w/ types {@code A}, {@code B}, and {@code C} and w/o a return value
+ *
+ * @param <A> the type of input {@code a}
+ * @param <B> the type of input {@code b}
+ * @param <C> the type of input {@code c}
+ */
+ @FunctionalInterface
+ public interface VoidFunction3 <A, B, C> {
+ public void apply(A a, B b, C c);
+ }
+
+ /**
+ * Method to apply a map function (1:1) on a {@link MessageStream}
+ *
+ * @param mapper the mapper function to map one input {@link Message} to one output {@link Message}
+ * @param <OM> the type of the output {@link Message} in the output {@link MessageStream}
+ * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
+ */
+ public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) {
+ return Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() {{
+ OM r = mapper.apply(m);
+ if (r != null) {
+ this.add(r);
+ }
+ }}).getOutputStream();
+ }
+
+ /**
+ * Method to apply a flatMap function (1:n) on a {@link MessageStream}
+ *
+ * @param flatMapper the flat mapper function to map one input {@link Message} to zero or more output {@link Message}s
+ * @param <OM> the type of the output {@link Message} in the output {@link MessageStream}
+ * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
+ */
+ public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) {
+ return Operators.getStreamOperator(flatMapper).getOutputStream();
+ }
+
+ /**
+ * Method to apply a filter function on a {@link MessageStream}
+ *
+ * @param filter the filter function to filter input {@link Message}s from the input {@link MessageStream}
+ * @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream}
+ */
+ public MessageStream<M> filter(Function<M, Boolean> filter) {
+ return Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() {{
+ if (filter.apply(t)) {
+ this.add(t);
+ }
+ }}).getOutputStream();
+ }
+
+ /**
+ * Method to send an input {@link MessageStream} to an output {@link SystemStream}, and allows the output {@link MessageStream}
+ * to be consumed by downstream stream operators again.
+ *
+ * @param sink the user-defined sink function to send the input {@link Message}s to the external output systems
+ */
+ public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
+ Operators.getSinkOperator(sink);
+ }
+
+ /**
+ * Method to perform a window function (i.e. a group-by, aggregate function) on a {@link MessageStream}
+ *
+ * @param window the window function to group and aggregate the input {@link Message}s from the input {@link MessageStream}
+ * @param <WK> the type of key in the output {@link Message} from the {@link Window} function
+ * @param <WV> the type of output value from
+ * @param <WS> the type of window state kept in the {@link Window} function
+ * @param <WM> the type of {@link WindowOutput} message from the {@link Window} function
+ * @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream}
+ */
+ public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Window<M, WK, WV, WM> window) {
+ return Operators.getWindowOperator(Windows.getInternalWindowFn(window)).getOutputStream();
+ }
+
+ /**
+ * Method to add an input {@link MessageStream} to a join function. Note that we currently only support 2-way joins.
+ *
+ * @param other the other stream to be joined w/
+ * @param merger the common function to merge messages from this {@link MessageStream} and {@code other}
+ * @param <K> the type of join key
+ * @param <JM> the type of message in the {@link Message} from the other join stream
+ * @param <RM> the type of message in the {@link Message} from the join function
+ * @return the output {@link MessageStream} from the join function {@code joiner}
+ */
+ public <K, JM extends Message<K, ?>, RM extends Message> MessageStream<RM> join(MessageStream<JM> other,
+ BiFunction<M, JM, RM> merger) {
+ MessageStream<RM> outputStream = new MessageStream<>();
+
+ BiFunction<M, JM, RM> parJoin1 = merger::apply;
+ BiFunction<JM, M, RM> parJoin2 = (m, t1) -> merger.apply(t1, m);
+
+ // TODO: need to add default store functions for the two partial join functions
+
+ Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream);
+ Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream);
+ return outputStream;
+ }
+
+ /**
+ * Method to merge all {@code others} streams w/ this {@link MessageStream}. The merging streams must have the same type {@code M}
+ *
+ * @param others other streams to be merged w/ this one
+ * @return the merged output stream
+ */
+ public MessageStream<M> merge(Collection<MessageStream<M>> others) {
+ MessageStream<M> outputStream = new MessageStream<>();
+
+ others.add(this);
+ others.forEach(other -> Operators.getMergeOperator(outputStream));
+ return outputStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
new file mode 100644
index 0000000..59dd91c
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.apache.samza.operators.api.data.IncomingSystemMessage;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * This class defines all methods to create a {@link MessageStream} object. Users can use this to create an {@link MessageStream}
+ * from a specific input source.
+ *
+ */
+
+public final class MessageStreams {
+
+ /**
+ * private constructor to prevent instantiation
+ */
+ private MessageStreams() {}
+
+ /**
+ * private class for system input/output {@link MessageStream}
+ */
+ public static final class SystemMessageStream extends MessageStream<IncomingSystemMessage> {
+ /**
+ * The corresponding {@link org.apache.samza.system.SystemStream}
+ */
+ private final SystemStreamPartition ssp;
+
+ /**
+ * Constructor for input system stream
+ *
+ * @param ssp the input {@link SystemStreamPartition} for the input {@link SystemMessageStream}
+ */
+ private SystemMessageStream(SystemStreamPartition ssp) {
+ this.ssp = ssp;
+ }
+
+ /**
+ * Getter for the {@link SystemStreamPartition} of the input
+ *
+ * @return the input {@link SystemStreamPartition}
+ */
+ public SystemStreamPartition getSystemStreamPartition() {
+ return this.ssp;
+ }
+ }
+
+ /**
+ * Public static API methods start here
+ */
+
+ /**
+ * Static API method to create a {@link MessageStream} from a system input stream
+ *
+ * @param ssp the input {@link SystemStreamPartition}
+ * @return the {@link MessageStream} object takes {@code ssp} as the input
+ */
+ public static SystemMessageStream input(SystemStreamPartition ssp) {
+ return new SystemMessageStream(ssp);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java b/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java
new file mode 100644
index 0000000..fc3ea37
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Trigger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+
+/**
+ * This class defines a builder of {@link Trigger} object for a {@link Windows.Window}. The triggers are categorized into
+ * three types:
+ *
+ * <p>
+ * early trigger: defines the condition when the first output from the window function is sent.
+ * late trigger: defines the condition when the updated output after the first output is sent.
+ * timer trigger: defines a system timeout condition to trigger output if no more inputs are received to enable early/late triggers
+ * </p>
+ *
+ * If multiple conditions are defined for a specific type of trigger, the aggregated trigger is the disjunction of the each individual trigger (i.e. OR).
+ *
+ * NOTE: Programmers should not use classes defined in {@link org.apache.samza.operators.api.internal} to create triggers
+ *
+ *
+ * @param <M> the type of input {@link Message} to the {@link Windows.Window}
+ * @param <V> the type of output value from the {@link Windows.Window}
+ */
+public final class TriggerBuilder<M extends Message, V> {
+
+ /**
+ * Predicate helper to OR multiple trigger conditions
+ */
+ static class PredicateHelper {
+ static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, BiFunction<M, S, Boolean> rhs) {
+ return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s);
+ }
+
+ static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, Boolean> rhs) {
+ return s -> lhs.apply(s) || rhs.apply(s);
+ }
+ }
+
+ /**
+ * The early trigger condition that determines the first output from the {@link Windows.Window}
+ */
+ private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null;
+
+ /**
+ * The late trigger condition that determines the late output(s) from the {@link Windows.Window}
+ */
+ private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null;
+
+ /**
+ * The system timer based trigger conditions that guarantees the {@link Windows.Window} proceeds forward
+ */
+ private Function<WindowState<V>, Boolean> timerTrigger = null;
+
+ /**
+ * The state updater function to be applied after the first output is triggered
+ */
+ private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = Function.identity();
+
+ /**
+ * The state updater function to be applied after the late output is triggered
+ */
+ private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = Function.identity();
+
+ /**
+ * Helper method to add a trigger condition
+ *
+ * @param currentTrigger current trigger condition
+ * @param newTrigger new trigger condition
+ * @return combined trigger condition that is {@code currentTrigger} OR {@code newTrigger}
+ */
+ private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, WindowState<V>, Boolean> currentTrigger,
+ BiFunction<M, WindowState<V>, Boolean> newTrigger) {
+ if (currentTrigger == null) {
+ return newTrigger;
+ }
+
+ return PredicateHelper.or(currentTrigger, newTrigger);
+ }
+
+ /**
+ * Helper method to add a system timer trigger
+ *
+ * @param currentTimer current timer condition
+ * @param newTimer new timer condition
+ * @return combined timer condition that is {@code currentTimer} OR {@code newTimer}
+ */
+ private Function<WindowState<V>, Boolean> addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer,
+ Function<WindowState<V>, Boolean> newTimer) {
+ if (currentTimer == null) {
+ return newTimer;
+ }
+
+ return PredicateHelper.or(currentTimer, newTimer);
+ }
+
+ /**
+ * default constructor to prevent instantiation
+ */
+ private TriggerBuilder() {}
+
+ /**
+ * Constructor that set the size limit as the early trigger for a window
+ *
+ * @param sizeLimit the number of messages in a window that would trigger the first output
+ */
+ private TriggerBuilder(long sizeLimit) {
+ this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit;
+ }
+
+ /**
+ * Constructor that set the event time length as the early trigger
+ *
+ * @param eventTimeFunction the function that calculate the event time in nano-second from the input {@link Message}
+ * @param wndLenMs the window length in event time in milli-second
+ */
+ private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) {
+ this.earlyTrigger = (m, s) ->
+ TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - s.getEarliestEventTimeNs(),
+ eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > wndLenMs;
+ }
+
+ /**
+ * Constructor that set the special token message as the early trigger
+ *
+ * @param tokenFunc the function that checks whether an input {@link Message} is a token message that triggers window output
+ */
+ private TriggerBuilder(Function<M, Boolean> tokenFunc) {
+ this.earlyTrigger = (m, s) -> tokenFunc.apply(m);
+ }
+
+ /**
+ * Build method that creates an {@link Trigger} object based on the trigger conditions set in {@link TriggerBuilder}
+ * This is kept package private and only used by {@link Windows} to convert the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object
+ *
+ * @return the final {@link Trigger} object
+ */
+ Trigger<M, WindowState<V>> build() {
+ return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater);
+ }
+
+ /**
+ * Public API methods start here
+ */
+
+
+ /**
+ * API method to allow users to set an update method to update the output value after the first window output is triggered
+ * by the early trigger condition
+ *
+ * @param onTriggerFunc the method to update the output value after the early trigger
+ * @return the {@link TriggerBuilder} object
+ */
+ public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) {
+ this.earlyTriggerUpdater = s -> { s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); return s; };
+ return this;
+ }
+
+ /**
+ * API method to allow users to set an update method to update the output value after a late window output is triggered
+ * by the late trigger condition
+ *
+ * @param onTriggerFunc the method to update the output value after the late trigger
+ * @return the {@link TriggerBuilder} object
+ */
+ public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) {
+ this.lateTriggerUpdater = s -> { s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); return s; };
+ return this;
+ }
+
+ /**
+ * API method to allow users to add a system timer trigger based on timeout after the last message received in the window
+ *
+ * @param timeoutMs the timeout in ms after the last message received in the window
+ * @return the {@link TriggerBuilder} object
+ */
+ public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) {
+ this.timerTrigger = this.addTimerTrigger(this.timerTrigger,
+ s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + timeoutMs < System.currentTimeMillis());
+ return this;
+ }
+
+ /**
+ * API method to allow users to add a system timer trigger based on the timeout after the first message received in the window
+ *
+ * @param timeoutMs the timeout in ms after the first message received in the window
+ * @return the {@link TriggerBuilder} object
+ */
+ public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) {
+ this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s ->
+ TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < System.currentTimeMillis());
+ return this;
+ }
+
+ /**
+ * API method allow users to add a late trigger based on the window size limit
+ *
+ * @param sizeLimit limit on the number of messages in window
+ * @return the {@link TriggerBuilder} object
+ */
+ public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) {
+ this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> s.getNumberMessages() > sizeLimit);
+ return this;
+ }
+
+ /**
+ * API method to allow users to define a customized late trigger function based on input message and the window state
+ *
+ * @param lateTrigger the late trigger condition based on input {@link Message} and the current {@link WindowState}
+ * @return the {@link TriggerBuilder} object
+ */
+ public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, Boolean> lateTrigger) {
+ this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger);
+ return this;
+ }
+
+ /**
+ * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on window size limit
+ *
+ * @param sizeLimit window size limit
+ * @param <M> the type of input {@link Message}
+ * @param <V> the type of {@link Windows.Window} output value
+ * @return the {@link TriggerBuilder} object
+ */
+ public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerWhenExceedWndLen(long sizeLimit) {
+ return new TriggerBuilder<M, V>(sizeLimit);
+ }
+
+ /**
+ * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on event time window
+ *
+ *
+ * @param eventTimeFunc the function to get the event time from the input message
+ * @param eventTimeWndSizeMs the event time window size in Ms
+ * @param <M> the type of input {@link Message}
+ * @param <V> the type of {@link Windows.Window} output value
+ * @return the {@link TriggerBuilder} object
+ */
+ public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long eventTimeWndSizeMs) {
+ return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs);
+ }
+
+ /**
+ * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on token messages
+ *
+ * @param tokenFunc the function to determine whether an input message is a window token or not
+ * @param <M> the type of input {@link Message}
+ * @param <V> the type of {@link Windows.Window} output value
+ * @return the {@link TriggerBuilder} object
+ */
+ public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) {
+ return new TriggerBuilder<M, V>(tokenFunc);
+ }
+
+ /**
+ * Static API method to allow customized early trigger condition based on input {@link Message} and the corresponding {@link WindowState}
+ *
+ * @param earlyTrigger the user defined early trigger condition
+ * @param <M> the input message type
+ * @param <V> the output value from the window
+ * @return the {@link TriggerBuilder} object
+ */
+ public static <M extends Message, V> TriggerBuilder<M, V> earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) {
+ TriggerBuilder<M, V> newTriggers = new TriggerBuilder<M, V>();
+ newTriggers.earlyTrigger = newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger);
+ return newTriggers;
+ }
+
+ /**
+ * Static API method to create a {@link TriggerBuilder} w/ system timeout after the last message received in the window
+ *
+ * @param timeoutMs timeout in ms after the last message received
+ * @param <M> the type of input {@link Message}
+ * @param <V> the type of {@link Windows.Window} output value
+ * @return the {@link TriggerBuilder} object
+ */
+ public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceLastMessage(long timeoutMs) {
+ return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs);
+ }
+
+ /**
+ * Static API method to create a {@link TriggerBuilder} w/ system timeout after the first message received in the window
+ *
+ * @param timeoutMs timeout in ms after the first message received
+ * @param <M> the type of input {@link Message}
+ * @param <V> the type of {@link Windows.Window} output value
+ * @return the {@link TriggerBuilder} object
+ */
+ public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceFirstMessage(long timeoutMs) {
+ return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java
new file mode 100644
index 0000000..402cc42
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+/**
+ * This interface defines the methods a window state class has to implement. The programmers are allowed to implement
+ * customized window state to be stored in window state stores by implementing this interface class.
+ *
+ * @param <WV> the type for window output value
+ */
+public interface WindowState<WV> {
+ /**
+ * Method to get the system time when the first message in the window is received
+ *
+ * @return nano-second of system time for the first message received in the window
+ */
+ long getFirstMessageTimeNs();
+
+ /**
+ * Method to get the system time when the last message in the window is received
+ *
+ * @return nano-second of system time for the last message received in the window
+ */
+ long getLastMessageTimeNs();
+
+ /**
+ * Method to get the earliest event time in the window
+ *
+ * @return the earliest event time in nano-second in the window
+ */
+ long getEarliestEventTimeNs();
+
+ /**
+ * Method to get the latest event time in the window
+ *
+ * @return the latest event time in nano-second in the window
+ */
+ long getLatestEventTimeNs();
+
+ /**
+ * Method to get the total number of messages received in the window
+ *
+ * @return number of messages in the window
+ */
+ long getNumberMessages();
+
+ /**
+ * Method to get the corresponding window's output value
+ *
+ * @return the corresponding window's output value
+ */
+ WV getOutputValue();
+
+ /**
+ * Method to set the corresponding window's output value
+ *
+ * @param value the corresponding window's output value
+ */
+ void setOutputValue(WV value);
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java b/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java
new file mode 100644
index 0000000..e557b34
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.api.internal.Trigger;
+import org.apache.samza.operators.api.internal.Operators;
+import org.apache.samza.operators.api.internal.WindowFn;
+import org.apache.samza.storage.kv.Entry;
+
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+
+/**
+ * This class defines a collection of {@link Window} functions. The public classes and methods here are intended to be
+ * used by the user (i.e. programmers) to create {@link Window} function directly.
+ *
+ */
+public final class Windows {
+
+ /**
+ * private constructor to prevent instantiation
+ */
+ private Windows() {}
+
+ /**
+ * This class defines a session window function class
+ *
+ * @param <M> the type of input {@link Message}
+ * @param <WK> the type of session key in the session window
+ * @param <WV> the type of output value in each session window
+ */
+ static class SessionWindow<M extends Message, WK, WV> implements Window<M, WK, WV, WindowOutput<WK, WV>> {
+
+ /**
+ * Constructor. Made private s.t. it can only be instantiated via the static API methods in {@link Windows}
+ *
+ * @param sessionKeyFunction function to get the session key from the input {@link Message}
+ * @param aggregator function to calculate the output value based on the input {@link Message} and current output value
+ */
+ private SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, WV, WV> aggregator) {
+ this.wndKeyFunction = sessionKeyFunction;
+ this.aggregator = aggregator;
+ }
+
+ /**
+ * function to calculate the window key from input message
+ */
+ private final Function<M, WK> wndKeyFunction;
+
+ /**
+ * function to calculate the output value from the input message and the current output value
+ */
+ private final BiFunction<M, WV, WV> aggregator;
+
+ /**
+ * trigger condition that determines when to send out the output value in a {@link WindowOutput} message
+ */
+ private Trigger<M, WindowState<WV>> trigger = null;
+
+ //TODO: need to create a set of {@link StoreFunctions} that is default to input {@link Message} type for {@link Window}
+ private Operators.StoreFunctions<M, WK, WindowState<WV>> storeFunctions = null;
+
+ /**
+ * Public API methods start here
+ */
+
+ /**
+ * Public API method to define the watermark trigger for the window operator
+ *
+ * @param wndTrigger {@link Trigger} function defines the watermark trigger for this {@link SessionWindow}
+ * @return The window operator w/ the defined watermark trigger
+ */
+ @Override
+ public Window<M, WK, WV, WindowOutput<WK, WV>> setTriggers(TriggerBuilder<M, WV> wndTrigger) {
+ this.trigger = wndTrigger.build();
+ return this;
+ }
+
+ private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() {
+ // TODO: actual implementation of the main session window logic, based on the wndKeyFunction, aggregator, and triggers;
+ return null;
+ }
+
+ private WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn() {
+ return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() {
+
+ @Override public BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() {
+ return SessionWindow.this.getTransformFunc();
+ }
+
+ @Override public Operators.StoreFunctions<M, WK, WindowState<WV>> getStoreFuncs() {
+ return SessionWindow.this.storeFunctions;
+ }
+
+ @Override public Trigger<M, WindowState<WV>> getTrigger() {
+ return SessionWindow.this.trigger;
+ }
+ };
+ }
+ }
+
+ static <M extends Message, WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn(
+ Window<M, WK, WV, WM> window) {
+ if (window instanceof SessionWindow) {
+ SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) window;
+ return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn();
+ }
+ throw new IllegalArgumentException("Input window type not supported.");
+ }
+
+ /**
+ * Public static API methods start here
+ *
+ */
+
+ /**
+ * The public programming interface class for window function
+ *
+ * @param <M> the type of input {@link Message}
+ * @param <WK> the type of key to the {@link Window}
+ * @param <WV> the type of output value in the {@link WindowOutput}
+ * @param <WM> the type of message in the window output stream
+ */
+ public interface Window<M extends Message, WK, WV, WM extends WindowOutput<WK, WV>> {
+
+ /**
+ * Set the triggers for this {@link Window}
+ *
+ * @param wndTrigger trigger conditions set by the programmers
+ * @return the {@link Window} function w/ the trigger {@code wndTrigger}
+ */
+ Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger);
+ }
+
+ /**
+ * Static API method to create a {@link SessionWindow} in which the output value is simply the collection of input messages
+ *
+ * @param sessionKeyFunction function to calculate session window key
+ * @param <M> type of input {@link Message}
+ * @param <WK> type of the session window key
+ * @return the {@link Window} function for the session
+ */
+ public static <M extends Message, WK> Window<M, WK, Collection<M>, WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> sessionKeyFunction) {
+ return new SessionWindow<>(sessionKeyFunction, (m, c) -> { c.add(m); return c; });
+ }
+
+ /**
+ * Static API method to create a {@link SessionWindow} in which the output value is a collection of {@code SI} from the input messages
+ *
+ * @param sessionKeyFunction function to calculate session window key
+ * @param sessionInfoExtractor function to retrieve session info of type {@code SI} from the input message of type {@code M}
+ * @param <M> type of the input {@link Message}
+ * @param <WK> type of the session window key
+ * @param <SI> type of the session information retrieved from each input message of type {@code M}
+ * @return the {@link Window} function for the session
+ */
+ public static <M extends Message, WK, SI> Window<M, WK, Collection<SI>, WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> sessionKeyFunction,
+ Function<M, SI> sessionInfoExtractor) {
+ return new SessionWindow<>(sessionKeyFunction,
+ (m, c) -> { c.add(sessionInfoExtractor.apply(m)); return c; } );
+ }
+
+ /**
+ * Static API method to create a {@link SessionWindow} as a counter of input messages
+ *
+ * @param sessionKeyFunction function to calculate session window key
+ * @param <M> type of the input {@link Message}
+ * @param <WK> type of the session window key
+ * @return the {@link Window} function for the session
+ */
+ public static <M extends Message, WK> Window<M, WK, Integer, WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> sessionKeyFunction) {
+ return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
new file mode 100644
index 0000000..69a3bee
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.api.data;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * A generic data interface that allows to implement data access / deserialization w/ {@link Schema}
+ */
+public interface Data {
+
+ Schema schema();
+
+ Object value();
+
+ int intValue();
+
+ long longValue();
+
+ float floatValue();
+
+ double doubleValue();
+
+ boolean booleanValue();
+
+ String strValue();
+
+ byte[] bytesValue();
+
+ List<Object> arrayValue();
+
+ Map<Object, Object> mapValue();
+
+ Data getElement(int index);
+
+ Data getFieldData(String fldName);
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java
new file mode 100644
index 0000000..ba74618
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/data/IncomingSystemMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api.data;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * This class implements a {@link Message} that encapsulates an {@link IncomingMessageEnvelope} from the system
+ *
+ */
+public class IncomingSystemMessage implements Message<Object, Object>, InputSystemMessage<Offset> {
+ /**
+ * Incoming message envelope
+ */
+ private final IncomingMessageEnvelope imsg;
+
+ /**
+ * The receive time of this incoming message
+ */
+ private final long recvTimeNano;
+
+ /**
+ * Ctor to create a {@code IncomingSystemMessage} from {@link IncomingMessageEnvelope}
+ *
+ * @param imsg The incoming system message
+ */
+ public IncomingSystemMessage(IncomingMessageEnvelope imsg) {
+ this.imsg = imsg;
+ this.recvTimeNano = System.nanoTime();
+ }
+
+ @Override
+ public Object getMessage() {
+ return this.imsg.getMessage();
+ }
+
+ @Override
+ public Object getKey() {
+ return this.imsg.getKey();
+ }
+
+ @Override
+ public long getTimestamp() {
+ return this.recvTimeNano;
+ }
+
+ @Override
+ public Offset getOffset() {
+ // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
+ // assuming incoming message carries long value as offset (i.e. Kafka case)
+ return new LongOffset(this.imsg.getOffset());
+ }
+
+ @Override
+ public SystemStreamPartition getSystemStreamPartition() {
+ return imsg.getSystemStreamPartition();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java
new file mode 100644
index 0000000..c786025
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/data/InputSystemMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api.data;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * This interface defines additional methods a message from an system input should implement, including the methods to
+ * get {@link SystemStreamPartition} and the {@link Offset} of the input system message.
+ */
+public interface InputSystemMessage<O extends Offset> {
+
+ /**
+ * Get the input message's {@link SystemStreamPartition}
+ *
+ * @return the {@link SystemStreamPartition} this message is coming from
+ */
+ SystemStreamPartition getSystemStreamPartition();
+
+ /**
+ * Get the offset of the message in the input stream. This should be used to uniquely identify a message in an input stream.
+ *
+ * @return The offset of the message in the input stream.
+ */
+ O getOffset();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java
new file mode 100644
index 0000000..f059b33
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/data/LongOffset.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.api.data;
+
+/**
+ * An implementation of {@link org.apache.samza.operators.api.data.Offset}, w/ {@code long} value as the offset
+ */
+public class LongOffset implements Offset {
+
+ /**
+ * The offset value in {@code long}
+ */
+ private final Long offset;
+
+ private LongOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public LongOffset(String offset) {
+ this.offset = Long.valueOf(offset);
+ }
+
+ @Override
+ public int compareTo(Offset o) {
+ if (!(o instanceof LongOffset)) {
+ throw new IllegalArgumentException("Not comparable offset classes. LongOffset vs " + o.getClass().getName());
+ }
+ LongOffset other = (LongOffset) o;
+ return this.offset.compareTo(other.offset);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof LongOffset)) {
+ return false;
+ }
+ LongOffset o = (LongOffset) other;
+ return this.offset.equals(o.offset);
+ }
+
+ /**
+ * Helper method to get the minimum offset
+ *
+ * @return The minimum offset
+ */
+ public static LongOffset getMinOffset() {
+ return new LongOffset(Long.MIN_VALUE);
+ }
+
+ /**
+ * Helper method to get the maximum offset
+ *
+ * @return The maximum offset
+ */
+ public static LongOffset getMaxOffset() {
+ return new LongOffset(Long.MAX_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java
new file mode 100644
index 0000000..9b53b45
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.api.data;
+
+/**
+ * This class defines the generic interface of {@link Message}, which is a entry in the input/output stream.
+ *
+ * <p>The {@link Message} models the basic operatible unit in streaming SQL processes in Samza.
+ *
+ */
+public interface Message<K, M> {
+
+ /**
+ * Access method to get the corresponding message body in {@link Message}
+ *
+ * @return Message object in this {@link Message}
+ */
+ M getMessage();
+
+ /**
+ * Method to indicate whether this {@link Message} indicates deletion of a message w/ the message key
+ *
+ * @return A boolean value indicates whether the current message is a delete or insert message
+ */
+ default boolean isDelete() { return false; };
+
+ /**
+ * Access method to the key of the message
+ *
+ * @return The key of the message
+ */
+ K getKey();
+
+ /**
+ * Get the message creation timestamp of the message.
+ *
+ * @return The message's timestamp in nano seconds.
+ */
+ long getTimestamp();
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java
new file mode 100644
index 0000000..0fac2c0
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Offset.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.api.data;
+
+/**
+ * A generic interface extending {@link java.lang.Comparable} to be used as {@code Offset} in a stream
+ */
+public interface Offset extends Comparable<Offset> {
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
new file mode 100644
index 0000000..dc3f8f4
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.api.data;
+
+import java.util.Map;
+
+
+/**
+ * This defines an interface for generic schema access methods
+ */
+public interface Schema {
+
+ enum Type {
+ INTEGER,
+ LONG,
+ FLOAT,
+ DOUBLE,
+ BOOLEAN,
+ STRING,
+ BYTES,
+ STRUCT,
+ ARRAY,
+ MAP
+ };
+
+ Type getType();
+
+ Schema getElementType();
+
+ Schema getValueType();
+
+ Map<String, Schema> getFields();
+
+ Schema getFieldType(String fldName);
+
+ Data read(Object object);
+
+ Data transform(Data inputData);
+
+ boolean equals(Schema other);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
new file mode 100644
index 0000000..f220285
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.api.internal;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.WindowState;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+
+/**
+ * This class defines all basic stream operator classes used by internal implementation only. All classes defined in
+ * this file are immutable.
+ *
+ * NOTE: Programmers should not use the operators defined in this class directly. All {@link Operator} objects
+ * should be initiated via {@link MessageStream} API methods
+ */
+public class Operators {
+ /**
+ * Private constructor to prevent instantiation of the {@link Operators} class
+ */
+ private Operators() {}
+
+ private static String getOperatorId() {
+ // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts
+ return UUID.randomUUID().toString();
+ }
+
+ /**
+ * Private interface for stream operator functions. The interface class defines the output of the stream operator function.
+ *
+ */
+ private interface Operator<OM extends Message> {
+ MessageStream<OM> getOutputStream();
+ }
+
+ /**
+ * Linear stream operator function that takes 1 input {@link Message} and output a collection of output {@link Message}s.
+ *
+ * @param <M> the type of input {@link Message}
+ * @param <OM> the type of output {@link Message}
+ */
+ public static class StreamOperator<M extends Message, OM extends Message> implements Operator<OM> {
+ /**
+ * The output {@link MessageStream}
+ */
+ private final MessageStream<OM> outputStream;
+
+ /**
+ * The transformation function
+ */
+ private final Function<M, Collection<OM>> txfmFunction;
+
+ /**
+ * Constructor of {@link StreamOperator}. Make it private s.t. it can only be created within {@link Operators}.
+ *
+ * @param transformFn the transformation function to be applied that transforms 1 input {@link Message} into a collection
+ * of output {@link Message}s
+ */
+ private StreamOperator(Function<M, Collection<OM>> transformFn) {
+ this(transformFn, new MessageStream<>());
+ }
+
+ /**
+ * Constructor of {@link StreamOperator} which allows the user to define the output {@link MessageStream}
+ *
+ * @param transformFn the transformation function
+ * @param outputStream the output {@link MessageStream}
+ */
+ private StreamOperator(Function<M, Collection<OM>> transformFn, MessageStream<OM> outputStream) {
+ this.outputStream = outputStream;
+ this.txfmFunction = transformFn;
+ }
+
+ @Override
+ public MessageStream<OM> getOutputStream() {
+ return this.outputStream;
+ }
+
+ /**
+ * Method to get the transformation function.
+ *
+ * @return the {@code txfmFunction}
+ */
+ public Function<M, Collection<OM>> getFunction() {
+ return this.txfmFunction;
+ }
+
+ }
+
+ /**
+ * A sink operator function that allows customized code to send the output to external system. This is the terminal
+ * operator that does not have any output {@link MessageStream} that allows further processing in the same {@link org.apache.samza.task.StreamOperatorTask}
+ *
+ * @param <M> the type of input {@link Message}
+ */
+ public static class SinkOperator<M extends Message> implements Operator {
+
+ /**
+ * The user-defined sink function
+ */
+ private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink;
+
+ /**
+ * Default constructor for {@link SinkOperator}. Make it private s.t. it can only be created within {@link Operators}.
+ *
+ * @param sink the user-defined sink function
+ */
+ private SinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
+ this.sink = sink;
+ }
+
+ @Override
+ public MessageStream getOutputStream() {
+ return null;
+ }
+
+ /**
+ * Method to get the user-defined function implements the {@link SinkOperator}
+ *
+ * @return a {@link MessageStream.VoidFunction3} function that allows the caller to pass in an input message, {@link MessageCollector}
+ * and {@link TaskCoordinator} to the sink function
+ */
+ public MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> getFunction() {
+ return this.sink;
+ }
+ }
+
+ /**
+ * The store functions that are used by {@link WindowOperator} and {@link PartialJoinOperator} to store and retrieve
+ * buffered messages and partial aggregation results
+ *
+ * @param <SK> the type of key used to store the operator states
+ * @param <SS> the type of operator state. e.g. could be the partial aggregation result for a window, or a buffered
+ * input message from the join stream for a join
+ */
+ public static class StoreFunctions<M extends Message, SK, SS> {
+ /**
+ * Function to define the key to query in the operator state store, according to the incoming {@link Message}
+ * This method only supports finding the unique key for the incoming message, which supports use case of non-overlapping
+ * windows and unique-key-based join.
+ *
+ * TODO: for windows that overlaps (i.e. sliding windows and hopping windows) and non-unique-key-based join, the query
+ * to the state store is usually a range scan. We need to add a rangeKeyFinder function to map from a single input
+ * message to a range of keys in the store.
+ */
+ private final Function<M, SK> storeKeyFinder;
+
+ /**
+ * Function to update the store entry based on the current state and the incoming {@link Message}
+ *
+ * TODO: this is assuming a 1:1 mapping from the input message to the store entry. When implementing sliding/hopping
+ * windows and non-unique-key-based join, we may need to include the corresponding state key, in addition to the
+ * state value.
+ */
+ private final BiFunction<M, SS, SS> stateUpdater;
+
+ /**
+ * Constructor of state store functions.
+ *
+ */
+ private StoreFunctions(Function<M, SK> keyFinder,
+ BiFunction<M, SS, SS> stateUpdater) {
+ this.storeKeyFinder = keyFinder;
+ this.stateUpdater = stateUpdater;
+ }
+
+ /**
+ * Method to get the {@code storeKeyFinder} function
+ *
+ * @return the function to calculate the key from an input {@link Message}
+ */
+ public Function<M, SK> getStoreKeyFinder() {
+ return this.storeKeyFinder;
+ }
+
+ /**
+ * Method to get the {@code stateUpdater} function
+ *
+ * @return the function to update the corresponding state according to an input {@link Message}
+ */
+ public BiFunction<M, SS, SS> getStateUpdater() {
+ return this.stateUpdater;
+ }
+ }
+
+ /**
+ * Defines a window operator function that takes one {@link MessageStream} as an input, accumulate the window state, and generate
+ * an output {@link MessageStream} w/ output type {@code WM} which extends {@link WindowOutput}
+ *
+ * @param <M> the type of input {@link Message}
+ * @param <WK> the type of key in the output {@link Message} from the {@link WindowOperator} function
+ * @param <WS> the type of window state in the {@link WindowOperator} function
+ * @param <WM> the type of window output {@link Message}
+ */
+ public static class WindowOperator<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> implements Operator<WM> {
+ /**
+ * The output {@link MessageStream}
+ */
+ private final MessageStream<WM> outputStream;
+
+ /**
+ * The main window transformation function that takes {@link Message}s from one input stream, aggregates w/ the window
+ * state(s) from the window state store, and generate output {@link Message}s to the output stream.
+ */
+ private final BiFunction<M, Entry<WK, WS>, WM> txfmFunction;
+
+ /**
+ * The state store functions for the {@link WindowOperator}
+ */
+ private final StoreFunctions<M, WK, WS> storeFunctions;
+
+ /**
+ * The window trigger function
+ */
+ private final Trigger<M, WS> trigger;
+
+ /**
+ * The unique ID of stateful operators
+ */
+ private final String opId;
+
+ /**
+ * Constructor for {@link WindowOperator}. Make it private s.t. it can only be created within {@link Operators}.
+ *
+ * @param windowFn description of the window function
+ * @param operatorId auto-generated unique ID of the operator
+ */
+ private WindowOperator(WindowFn<M, WK, WS, WM> windowFn, String operatorId) {
+ this.outputStream = new MessageStream<>();
+ this.txfmFunction = windowFn.getTransformFunc();
+ this.storeFunctions = windowFn.getStoreFuncs();
+ this.trigger = windowFn.getTrigger();
+ this.opId = operatorId;
+ }
+
+ @Override
+ public String toString() {
+ return this.opId;
+ }
+
+ @Override
+ public MessageStream<WM> getOutputStream() {
+ return this.outputStream;
+ }
+
+ /**
+ * Method to get the window's {@link StoreFunctions}.
+ *
+ * @return the window operator's {@code storeFunctions}
+ */
+ public StoreFunctions<M, WK, WS> getStoreFunctions() {
+ return this.storeFunctions;
+ }
+
+ /**
+ * Method to get the window operator's main function
+ *
+ * @return the window operator's {@code txfmFunction}
+ */
+ public BiFunction<M, Entry<WK, WS>, WM> getFunction() {
+ return this.txfmFunction;
+ }
+
+ /**
+ * Method to get the trigger functions
+ *
+ * @return the {@link Trigger} for this {@link WindowOperator}
+ */
+ public Trigger<M, WS> getTrigger() {
+ return this.trigger;
+ }
+
+ /**
+ * Method to generate the window operator's state store name
+ *
+ * @param inputStream the input {@link MessageStream} to this state store
+ * @return the persistent store name of the window operator
+ */
+ public String getStoreName(MessageStream<M> inputStream) {
+ //TODO: need to get the persistent name of ds and the operator in a serialized form
+ return String.format("input-%s-wndop-%s", inputStream.toString(), this.toString());
+ }
+ }
+
+ /**
+ * The partial join operator that takes {@link Message}s from one input stream and join w/ buffered {@link Message}s from
+ * another stream and generate join output to {@code output}
+ *
+ * @param <M> the type of input {@link Message}
+ * @param <K> the type of join key
+ * @param <JM> the type of message of {@link Message} in the other join stream
+ * @param <RM> the type of message of {@link Message} in the join output stream
+ */
+ public static class PartialJoinOperator<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> implements Operator<RM> {
+
+ private final MessageStream<RM> joinOutput;
+
+ /**
+ * The main transformation function of {@link PartialJoinOperator} that takes a type {@code M} input message,
+ * join w/ a stream of buffered {@link Message}s from another stream w/ type {@code JM}, and generate joined type {@code RM}.
+ */
+ private final BiFunction<M, JM, RM> txfmFunction;
+
+ /**
+ * The message store functions that read the buffered messages from the other stream in the join
+ */
+ private final StoreFunctions<JM, K, JM> joinStoreFunctions;
+
+ /**
+ * The message store functions that save the buffered messages of this {@link MessageStream} in the join
+ */
+ private final StoreFunctions<M, K, M> selfStoreFunctions;
+
+ /**
+ * The unique ID for the stateful operator
+ */
+ private final String opId;
+
+ /**
+ * Default constructor to create a {@link PartialJoinOperator} object
+ *
+ * @param partialJoin partial join function that take type {@code M} of input {@link Message} and join w/ type
+ * {@code JM} of buffered {@link Message} from another stream
+ * @param joinOutput the output {@link MessageStream} of the join results
+ */
+ private PartialJoinOperator(BiFunction<M, JM, RM> partialJoin, MessageStream<RM> joinOutput, String opId) {
+ this.joinOutput = joinOutput;
+ this.txfmFunction = partialJoin;
+ // Read-only join store, no creator/updater functions specified
+ this.joinStoreFunctions = new StoreFunctions<>(m -> m.getKey(), null);
+ // Buffered message store for this input stream
+ this.selfStoreFunctions = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m);
+ this.opId = opId;
+ }
+
+ @Override
+ public String toString() {
+ return this.opId;
+ }
+
+ @Override
+ public MessageStream<RM> getOutputStream() {
+ return this.joinOutput;
+ }
+
+ /**
+ * Method to get {@code joinStoreFunctions}
+ *
+ * @return {@code joinStoreFunctions}
+ */
+ public StoreFunctions<JM, K, JM> getJoinStoreFunctions() {
+ return this.joinStoreFunctions;
+ }
+
+ /**
+ * Method to get {@code selfStoreFunctions}
+ *
+ * @return {@code selfStoreFunctions}
+ */
+ public StoreFunctions<M, K, M> getSelfStoreFunctions() {
+ return this.selfStoreFunctions;
+ }
+
+ /**
+ * Method to get {@code txfmFunction}
+ *
+ * @return {@code txfmFunction}
+ */
+ public BiFunction<M, JM, RM> getFunction() {
+ return this.txfmFunction;
+ }
+ }
+
+ /**
+ * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator}
+ *
+ * @param transformFn the corresponding transformation function
+ * @param <M> type of input {@link Message}
+ * @param <OM> type of output {@link Message}
+ * @return the {@link StreamOperator}
+ */
+ public static <M extends Message, OM extends Message> StreamOperator<M, OM> getStreamOperator(Function<M, Collection<OM>> transformFn) {
+ return new StreamOperator<>(transformFn);
+ }
+
+ /**
+ * The method only to be used internally in {@link MessageStream} to create {@link SinkOperator}
+ *
+ * @param sinkFn the sink function
+ * @param <M> type of input {@link Message}
+ * @return the {@link SinkOperator}
+ */
+ public static <M extends Message> SinkOperator<M> getSinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFn) {
+ return new SinkOperator<>(sinkFn);
+ }
+
+ /**
+ * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator}
+ *
+ * @param windowFn the {@link WindowFn} function
+ * @param <M> type of input {@link Message}
+ * @param <WK> type of window key
+ * @param <WS> type of {@link WindowState}
+ * @param <WM> type of output {@link WindowOutput}
+ * @return the {@link WindowOperator}
+ */
+ public static <M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> WindowOperator<M, WK, WS, WM> getWindowOperator(
+ WindowFn<M, WK, WS, WM> windowFn) {
+ return new WindowOperator<>(windowFn, Operators.getOperatorId());
+ }
+
+ /**
+ * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator}
+ *
+ * @param joiner the {@link WindowFn} function
+ * @param joinOutput the output {@link MessageStream}
+ * @param <M> type of input {@link Message}
+ * @param <K> type of join key
+ * @param <JM> the type of message in the {@link Message} from the other join stream
+ * @param <RM> the type of message in the {@link Message} from the join function
+ * @return the {@link PartialJoinOperator}
+ */
+ public static <M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> PartialJoinOperator<M, K, JM, RM> getPartialJoinOperator(
+ BiFunction<M, JM, RM> joiner, MessageStream<RM> joinOutput) {
+ return new PartialJoinOperator<>(joiner, joinOutput, Operators.getOperatorId());
+ }
+
+ /**
+ * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator} as a merger function
+ *
+ * @param mergeOutput the common output {@link MessageStream} from the merger
+ * @param <M> the type of input {@link Message}
+ * @return the {@link StreamOperator} for merge
+ */
+ public static <M extends Message> StreamOperator<M, M> getMergeOperator(MessageStream<M> mergeOutput) {
+ return new StreamOperator<M, M>(t ->
+ new ArrayList<M>() {{
+ this.add(t);
+ }},
+ mergeOutput);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
new file mode 100644
index 0000000..33a0134
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api.internal;
+
+import org.apache.samza.operators.api.WindowState;
+import org.apache.samza.operators.api.data.Message;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Defines the trigger functions for {@link Operators.WindowOperator}. This class is immutable.
+ *
+ * @param <M> the type of message from the input stream
+ * @param <S> the type of state variable in the window's state store
+ */
+public class Trigger<M extends Message, S extends WindowState> {
+
+ /**
+ * System timer based trigger condition. This is the only guarantee that the {@link Operators.WindowOperator} will proceed forward
+ */
+ private final Function<S, Boolean> timerTrigger;
+
+ /**
+ * early trigger condition that determines when to send the first output from the {@link Operators.WindowOperator}
+ */
+ private final BiFunction<M, S, Boolean> earlyTrigger;
+
+ /**
+ * late trigger condition that determines when to send the updated output after the first one from a {@link Operators.WindowOperator}
+ */
+ private final BiFunction<M, S, Boolean> lateTrigger;
+
+ /**
+ * the function to updated the window state when the first output is triggered
+ */
+ private final Function<S, S> earlyTriggerUpdater;
+
+ /**
+ * the function to updated the window state when the late output is triggered
+ */
+ private final Function<S, S> lateTriggerUpdater;
+
+ /**
+ * Private constructor to prevent instantiation
+ *
+ * @param timerTrigger system timer trigger condition
+ * @param earlyTrigger early trigger condition
+ * @param lateTrigger late trigger condition
+ * @param earlyTriggerUpdater early trigger state updater
+ * @param lateTriggerUpdater late trigger state updater
+ */
+ private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger,
+ Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) {
+ this.timerTrigger = timerTrigger;
+ this.earlyTrigger = earlyTrigger;
+ this.lateTrigger = lateTrigger;
+ this.earlyTriggerUpdater = earlyTriggerUpdater;
+ this.lateTriggerUpdater = lateTriggerUpdater;
+ }
+
+ /**
+ * Static method to create a {@link Trigger} object
+ *
+ * @param timerTrigger system timer trigger condition
+ * @param earlyTrigger early trigger condition
+ * @param lateTrigger late trigger condition
+ * @param earlyTriggerUpdater early trigger state updater
+ * @param lateTriggerUpdater late trigger state updater
+ * @param <M> the type of input {@link Message}
+ * @param <S> the type of window state extends {@link WindowState}
+ * @return the {@link Trigger} function
+ */
+ public static <M extends Message, S extends WindowState> Trigger<M, S> createTrigger(Function<S, Boolean> timerTrigger,
+ BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, Function<S, S> earlyTriggerUpdater,
+ Function<S, S> lateTriggerUpdater) {
+ return new Trigger(timerTrigger, earlyTrigger, lateTrigger, earlyTriggerUpdater, lateTriggerUpdater);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
new file mode 100644
index 0000000..1fd88e7
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api.internal;
+
+import org.apache.samza.operators.api.WindowState;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.storage.kv.Entry;
+
+import java.util.function.BiFunction;
+
+
+/**
+ * Defines an internal representation of a window function. This class SHOULD NOT be used by the programmer directly. It is used
+ * by the internal representation and implementation classes in operators.
+ *
+ * @param <M> type of input stream {@link Message} for window
+ * @param <WK> type of window key in the output {@link Message}
+ * @param <WS> type of {@link WindowState} variable in the state store
+ * @param <WM> type of the message in the output stream
+ */
+public interface WindowFn<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> {
+
+ /**
+ * get the transformation function of the {@link WindowFn}
+ *
+ * @return the transformation function takes type {@code M} message and the window state entry, then transform to an {@link WindowOutput}
+ */
+ BiFunction<M, Entry<WK, WS>, WM> getTransformFunc();
+
+ /**
+ * get the state store functions for this {@link WindowFn}
+ *
+ * @return the collection of state store methods
+ */
+ Operators.StoreFunctions<M, WK, WS> getStoreFuncs();
+
+ /**
+ * get the trigger conditions for this {@link WindowFn}
+ *
+ * @return the trigger condition for the {@link WindowFn} function
+ */
+ Trigger<M, WS> getTrigger();
+
+}