You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/05/25 16:36:53 UTC
[10/10] samza git commit: SAMZA-1659: Serializable OperatorSpec
SAMZA-1659: Serializable OperatorSpec
This change is to make the user supplied functions serializable. Hence, making the full user defined DAG serializable.
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Reviewers: Jagadish <jv...@linkedin.com>, Prateek Maheshwari <pm...@linkedin.com>
Closes #475 from nickpan47/serializable-opspec-only-Jan-24-18 and squashes the following commits:
db0dea73 [Yi Pan (Data Infrastructure)] SAMZA-1659: fix intermittent TestZkLocalApplicationRunner failure due to StreamProcessor#stop()
34716d42 [Yi Pan (Data Infrastructure)] SAMZA-1659: fix a comment on OperatorSpec#isClone()
37d4e6ae [Yi Pan (Data Infrastructure)] SAMZA-1659: addressing latest round of review comments
68674a14 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
d3a7826c [Yi Pan (Data Infrastructure)] SAMZA-1659: addressing review comments
f83e8dd0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
acca418b [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
842a73d6 [Yi Pan (Data Infrastructure)] SAMZA-1659: making user-defined functions in high-level API serializable
ad85a2cb [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
c1567116 [Yi Pan (Data Infrastructure)] SAMZA-1659: Before re-merge with master. Still need to fix unit tests (moving OperatorSpec clone tests to OperatorSpecGraph.clone)
f2563f8e [Yi Pan (Data Infrastructure)] SAMZA-1659: serialize the whole DAG instead of each individual OperatorSpec.
24d33496 [Yi Pan (Data Infrastructure)] SAMZA-1659: updated according to review comments. Need to merge again with master.
3f643f8b [Yi Pan (Data Infrastructure)] SAMZA-1659: serialiable OperatorSpec
ed7d8c0e [Yi Pan (Data Infrastructure)] Fixed some javadoc and test files
94de218b [Yi Pan (Data Infrastructure)] Remove public access from StreamGraphImpl#getIntermediateStream(String, Serde)
8f4e9dd4 [Yi Pan (Data Infrastructure)] Serialization of StreamGraph in a wrapper class SerializedStreamGraph
f3bb1958 [Yi Pan (Data Infrastructure)] Fix some comments
c15246f5 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
e981967d [Yi Pan (Data Infrastructure)] WIP: fixing unit test for SamzaSQL translators w/ serialization of operator functions
40583051 [Yi Pan (Data Infrastructure)] WIP: update the serialization of user functions after the merge
18ba924f [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
93951c5f [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
54a28801 [Yi Pan (Data Infrastructure)] WIP: broadcast, sendtotable, and streamtotablejoin serialization and unit tests
45eb1fb0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
7c8d1591 [Yi Pan (Data Infrastructure)] WIP: working on unit tests for trigger, broadcast, join, table, and SQL UDF function serialization
b973b105 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
aca42308 [Yi Pan (Data Infrastructure)] WIP: Serialize OperatorSpec only w/o StreamApplication interface change. Passed all build and tests.
0ebebfc3 [Yi Pan (Data Infrastructure)] WIP: serialization only change
1670aff0 [Yi Pan (Data Infrastructure)] WIP: class-loading of user program logic and main() method based user program logic are both included in ThreadJobFactory/ProcessJobFactory/YarnJobFactory. ThreadJobFactory test suite to be fixed.
4102aa8c [Yi Pan (Data Infrastructure)] WIP: continued working on potential offspring integration
dc7da87e [Yi Pan (Data Infrastructure)] WIP: unit tests for serialization
475a46bc [Yi Pan (Data Infrastructure)] WIP: fixed TestZkLocalApplicationRunner. Debugging issues w/ TestRepartitionWindowApp (i.e. missing changelog creation step when directly running LocalApplicationRunner)
6a14b2af [Yi Pan (Data Infrastructure)] WIP: fixed unit test failure for Windows
d4640329 [Yi Pan (Data Infrastructure)] WIP: fixing unit tests after merge
bf1ce907 [Yi Pan (Data Infrastructure)] WIP: removing StreamDescriptor first
50201728 [Yi Pan (Data Infrastructure)] Merge branch 'experiment-new-api-v2' into new-api-v2-0.14
dde1ab14 [Yi Pan (Data Infrastructure)] WIP: first end-to-end test
d7df6ed0 [Yi Pan (Data Infrastructure)] WIP: added all unit test for OperatorSpec#copy methods.
6fc6d4c0 [Yi Pan (Data Infrastructure)] WIP: experiment code to implement an end-to-end working example for new APIs
525d8bc1 [Yi Pan (Data Infrastructure)] Merge branch '0.14.0' into new-api-v2
e6fb96e5 [Yi Pan (Data Infrastructure)] WIP: merged all application types into StreamApplications
f227380f [Yi Pan (Data Infrastructure)] WIP: update the app runner classes
256155ad [Yi Pan (Data Infrastructure)] WIP: new API user code examples
4a6a58dc [Yi Pan (Data Infrastructure)] WIP: updated w/ low-level task API and global var ingestion/metrics reporter
3c50629e [Yi Pan (Data Infrastructure)] WIP: adding support for low-level task APIs
51541e13 [Yi Pan (Data Infrastructure)] WIP: cleanup StreamDescriptor
0bc7ee7b [Yi Pan (Data Infrastructure)] WIP: update the user code example on new APIs
cd528c1c [Yi Pan (Data Infrastructure)] WIP: updated spec and user DAG API
b898e6c0 [Yi Pan (Data Infrastructure)] WIP: new-api-v2
91f364f1 [Yi Pan (Data Infrastructure)] WIP: proto-type of input/output stream/system specs
ae3dc6ff [Yi Pan (Data Infrastructure)] WIP: new api revision
8bb97520 [Yi Pan (Data Infrastructure)] WIP: proto-type of input/output stream/system specs
5573a069 [Yi Pan (Data Infrastructure)] WIP: new api revision
aeb45730 [Xinyu Liu] SAMZA-1321: Propagate end-of-stream and watermark messages
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/53d7f262
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/53d7f262
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/53d7f262
Branch: refs/heads/master
Commit: 53d7f2625145f560eb6ccc49d48dc176f244f9b3
Parents: bc4a0c2
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Fri May 25 09:37:55 2018 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Committed: Fri May 25 09:37:55 2018 -0700
----------------------------------------------------------------------
build.gradle | 5 +-
.../samza/application/StreamApplication.java | 7 +-
.../java/org/apache/samza/config/MapConfig.java | 7 +-
.../apache/samza/operators/MessageStream.java | 19 +-
.../operators/functions/ClosableFunction.java | 3 +
.../operators/functions/FilterFunction.java | 3 +-
.../operators/functions/FlatMapFunction.java | 3 +-
.../operators/functions/FoldLeftFunction.java | 16 +-
.../samza/operators/functions/JoinFunction.java | 3 +-
.../samza/operators/functions/MapFunction.java | 3 +-
.../samza/operators/functions/SinkFunction.java | 3 +-
.../functions/StreamTableJoinFunction.java | 3 +-
.../operators/functions/SupplierFunction.java | 38 ++
.../samza/operators/triggers/AnyTrigger.java | 10 +-
.../samza/operators/triggers/Trigger.java | 3 +-
.../apache/samza/operators/windows/Window.java | 3 +-
.../apache/samza/operators/windows/Windows.java | 53 +-
.../windows/internal/WindowInternal.java | 32 +-
.../samza/serializers/SerializableSerde.java | 2 +-
.../org/apache/samza/system/StreamSpec.java | 3 +-
.../samza/system/SystemStreamPartition.java | 4 +-
.../java/org/apache/samza/table/TableSpec.java | 12 +-
.../samza/operators/windows/TestWindowPane.java | 2 +-
.../samza/execution/ExecutionPlanner.java | 28 +-
.../org/apache/samza/execution/JobGraph.java | 14 +-
.../samza/execution/JobGraphJsonGenerator.java | 4 +-
.../org/apache/samza/execution/JobNode.java | 26 +-
.../samza/operators/MessageStreamImpl.java | 79 ++-
.../samza/operators/OperatorSpecGraph.java | 132 ++++
.../apache/samza/operators/StreamGraphImpl.java | 328 ----------
.../apache/samza/operators/StreamGraphSpec.java | 299 +++++++++
.../org/apache/samza/operators/TableImpl.java | 3 +-
.../operators/impl/BroadcastOperatorImpl.java | 2 +-
.../samza/operators/impl/OperatorImpl.java | 2 +-
.../samza/operators/impl/OperatorImplGraph.java | 73 ++-
.../operators/impl/OutputOperatorImpl.java | 5 +-
.../operators/impl/PartitionByOperatorImpl.java | 16 +-
.../operators/impl/StreamOperatorImpl.java | 3 +-
.../operators/impl/WindowOperatorImpl.java | 23 +-
.../operators/spec/FilterOperatorSpec.java | 74 +++
.../operators/spec/FlatMapOperatorSpec.java | 47 ++
.../samza/operators/spec/InputOperatorSpec.java | 13 +-
.../samza/operators/spec/JoinOperatorSpec.java | 17 +-
.../samza/operators/spec/MapOperatorSpec.java | 77 +++
.../samza/operators/spec/MergeOperatorSpec.java | 51 ++
.../samza/operators/spec/OperatorSpec.java | 23 +-
.../samza/operators/spec/OperatorSpecs.java | 73 +--
.../samza/operators/spec/OutputStreamImpl.java | 17 +-
.../operators/spec/PartitionByOperatorSpec.java | 23 +-
.../operators/spec/SendToTableOperatorSpec.java | 9 +-
.../operators/spec/StreamOperatorSpec.java | 23 +-
.../operators/spec/WindowOperatorSpec.java | 11 +
.../stream/IntermediateMessageStreamImpl.java | 4 +-
.../samza/operators/triggers/Cancellable.java | 2 +-
.../samza/operators/triggers/TriggerImpl.java | 6 +-
.../runtime/AbstractApplicationRunner.java | 21 +-
.../samza/runtime/LocalApplicationRunner.java | 28 +-
.../samza/runtime/LocalContainerRunner.java | 11 +-
.../apache/samza/task/StreamOperatorTask.java | 48 +-
.../org/apache/samza/task/TaskFactoryUtil.java | 26 +-
.../apache/samza/container/TaskInstance.scala | 1 -
.../samza/job/local/ThreadJobFactory.scala | 11 +-
.../apache/samza/example/BroadcastExample.java | 71 ---
.../samza/example/KeyValueStoreExample.java | 131 ----
.../org/apache/samza/example/MergeExample.java | 60 --
.../samza/example/OrderShipmentJoinExample.java | 115 ----
.../samza/example/PageViewCounterExample.java | 95 ---
.../samza/example/RepartitionExample.java | 90 ---
.../org/apache/samza/example/WindowExample.java | 81 ---
.../samza/execution/TestExecutionPlanner.java | 117 ++--
.../apache/samza/execution/TestJobGraph.java | 68 +--
.../execution/TestJobGraphJsonGenerator.java | 32 +-
.../org/apache/samza/execution/TestJobNode.java | 14 +-
.../samza/operators/TestJoinOperator.java | 152 ++---
.../samza/operators/TestMessageStreamImpl.java | 55 +-
.../samza/operators/TestOperatorSpecGraph.java | 185 ++++++
.../samza/operators/TestStreamGraphImpl.java | 601 -------------------
.../samza/operators/TestStreamGraphSpec.java | 601 +++++++++++++++++++
.../data/TestOutputMessageEnvelope.java | 14 +
.../operators/impl/TestOperatorImplGraph.java | 298 ++++++---
.../operators/impl/TestStreamOperatorImpl.java | 4 +-
.../operators/impl/TestWindowOperator.java | 263 ++++----
.../operators/spec/OperatorSpecTestUtils.java | 141 +++++
.../samza/operators/spec/TestOperatorSpec.java | 465 ++++++++++++++
.../spec/TestPartitionByOperatorSpec.java | 165 +++++
.../operators/spec/TestWindowOperatorSpec.java | 306 +++++++++-
.../runtime/TestAbstractApplicationRunner.java | 36 +-
.../runtime/TestLocalApplicationRunner.java | 21 +-
.../apache/samza/task/IdentityStreamTask.java | 55 ++
.../apache/samza/task/TestTaskFactoryUtil.java | 64 +-
.../testUtils/InvalidStreamApplication.java | 25 -
.../samza/system/kafka/TestKafkaStreamSpec.java | 3 +-
.../samza/sql/data/SamzaSqlCompositeKey.java | 1 +
.../sql/data/SamzaSqlExecutionContext.java | 20 +-
.../samza/sql/translator/FilterTranslator.java | 47 +-
.../translator/LogicalAggregateTranslator.java | 24 +-
.../samza/sql/translator/ProjectTranslator.java | 60 +-
.../samza/sql/translator/QueryTranslator.java | 46 +-
.../SamzaSqlRelMessageJoinFunction.java | 12 +-
.../samza/sql/translator/ScanTranslator.java | 28 +-
.../samza/sql/translator/TranslatorContext.java | 79 ++-
.../apache/samza/sql/TestQueryTranslator.java | 510 ----------------
.../sql/TestSamzaSqlApplicationConfig.java | 95 ---
.../sql/TestSamzaSqlApplicationRunner.java | 56 --
.../samza/sql/TestSamzaSqlFileParser.java | 58 --
.../samza/sql/TestSamzaSqlQueryParser.java | 76 ---
.../samza/sql/TestSamzaSqlRelMessage.java | 46 --
.../sql/TestSamzaSqlRelMessageJoinFunction.java | 119 ----
.../samza/sql/data/TestSamzaSqlRelMessage.java | 46 ++
.../runner/TestSamzaSqlApplicationConfig.java | 95 +++
.../runner/TestSamzaSqlApplicationRunner.java | 56 ++
.../sql/testutil/TestSamzaSqlFileParser.java | 58 ++
.../sql/testutil/TestSamzaSqlQueryParser.java | 75 +++
.../sql/translator/TestFilterTranslator.java | 136 +++++
.../sql/translator/TestJoinTranslator.java | 191 ++++++
.../sql/translator/TestProjectTranslator.java | 289 +++++++++
.../sql/translator/TestQueryTranslator.java | 596 ++++++++++++++++++
.../TestSamzaSqlRelMessageJoinFunction.java | 118 ++++
.../sql/translator/TranslatorTestBase.java | 72 +++
.../example/AppWithGlobalConfigExample.java | 86 +++
.../apache/samza/example/BroadcastExample.java | 70 +++
.../samza/example/KeyValueStoreExample.java | 138 +++++
.../org/apache/samza/example/MergeExample.java | 62 ++
.../samza/example/OrderShipmentJoinExample.java | 121 ++++
.../samza/example/PageViewCounterExample.java | 100 +++
.../samza/example/RepartitionExample.java | 96 +++
.../org/apache/samza/example/WindowExample.java | 86 +++
.../samza/test/framework/StreamAssert.java | 14 +
.../EndOfStreamIntegrationTest.java | 8 +-
.../WatermarkIntegrationTest.java | 7 +-
.../test/operator/RepartitionJoinWindowApp.java | 54 +-
.../test/operator/RepartitionWindowApp.java | 72 +++
.../samza/test/operator/SessionWindowApp.java | 21 +-
.../operator/TestRepartitionJoinWindowApp.java | 2 +-
.../test/operator/TestRepartitionWindowApp.java | 90 +++
.../samza/test/operator/TumblingWindowApp.java | 20 +-
.../samza/test/operator/data/PageView.java | 3 +-
.../test/processor/SharedContextFactories.java | 117 ++++
.../test/processor/TestStreamApplication.java | 148 +++++
.../processor/TestZkLocalApplicationRunner.java | 211 +++----
.../apache/samza/test/table/TestLocalTable.java | 243 +++++---
.../samza/test/table/TestRemoteTable.java | 37 +-
.../apache/samza/test/timer/TestTimerApp.java | 5 +-
143 files changed, 7227 insertions(+), 3811 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6872354..a94fcfa 100644
--- a/build.gradle
+++ b/build.gradle
@@ -325,6 +325,9 @@ project(':samza-sql') {
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
+ testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"
+ testCompile "org.powermock:powermock-core:$powerMockVersion"
+ testCompile "org.powermock:powermock-module-junit4:$powerMockVersion"
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
}
@@ -756,10 +759,10 @@ project(":samza-test_$scalaVersion") {
compile project(":samza-kv-inmemory_$scalaVersion")
compile project(":samza-kv-rocksdb_$scalaVersion")
compile project(":samza-core_$scalaVersion")
+ compile project(":samza-kafka_$scalaVersion")
compile project(":samza-sql")
runtime project(":samza-log4j")
runtime project(":samza-yarn_$scalaVersion")
- runtime project(":samza-kafka_$scalaVersion")
runtime project(":samza-hdfs_$scalaVersion")
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
index f615207..0b2142b 100644
--- a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
+++ b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
@@ -61,9 +61,10 @@ import org.apache.samza.task.TaskContext;
*
* <p>
* Implementation Notes: Currently StreamApplications are wrapped in a {@link StreamTask} during execution.
- * A new StreamApplication instance will be created and initialized when planning the execution, as well as for each
- * {@link StreamTask} instance used for processing incoming messages. Execution is synchronous and thread-safe within
- * each {@link StreamTask}.
+ * A new StreamApplication instance will be created and initialized with a user-defined {@link StreamGraph}
+ * when planning the execution. The {@link StreamGraph} and the functions implemented for transforms are required to
+ * be serializable. The execution planner will generate a serialized DAG which will be deserialized in each {@link StreamTask}
+ * instance used for processing incoming messages. Execution is synchronous and thread-safe within each {@link StreamTask}.
*
* <p>
* Functions implemented for transforms in StreamApplications ({@link org.apache.samza.operators.functions.MapFunction},
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
index 0b1ed98..5af2535 100644
--- a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
+++ b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
@@ -43,8 +43,11 @@ public class MapConfig extends Config {
public MapConfig(List<Map<String, String>> maps) {
this.map = new HashMap<>();
- for (Map<String, String> m: maps)
- this.map.putAll(m);
+ for (Map<String, String> m: maps) {
+ if (m != null) {
+ this.map.putAll(m);
+ }
+ }
}
public MapConfig(Map<String, String>... maps) {
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 98f0784..7797f9a 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -21,7 +21,6 @@ package org.apache.samza.operators;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.function.Function;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.functions.FilterFunction;
@@ -237,34 +236,34 @@ public interface MessageStream<M> {
* <p>
* Unlike {@link #sendTo}, messages with a null key are all sent to partition 0.
*
- * @param keyExtractor the {@link Function} to extract the message and partition key from the input message.
+ * @param keyExtractor the {@link MapFunction} to extract the message and partition key from the input message.
* Messages with a null key are all sent to partition 0.
- * @param valueExtractor the {@link Function} to extract the value from the input message
+ * @param valueExtractor the {@link MapFunction} to extract the value from the input message
* @param serde the {@link KVSerde} to use for (de)serializing the key and value.
* @param id the unique id of this operator in this application
* @param <K> the type of output key
* @param <V> the type of output value
* @return the repartitioned {@link MessageStream}
*/
- <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
- Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id);
+ <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
+ MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id);
/**
- * Same as calling {@link #partitionBy(Function, Function, KVSerde, String)} with a null KVSerde.
+ * Same as calling {@link #partitionBy(MapFunction, MapFunction, KVSerde, String)} with a null KVSerde.
* <p>
* Uses the default serde provided via {@link StreamGraph#setDefaultSerde}, which must be a KVSerde. If the default
* serde is not a {@link KVSerde}, a runtime exception will be thrown. If no default serde has been provided
* <b>before</b> calling this method, a {@code KVSerde<NoOpSerde, NoOpSerde>} is used.
*
- * @param keyExtractor the {@link Function} to extract the message and partition key from the input message
- * @param valueExtractor the {@link Function} to extract the value from the input message
+ * @param keyExtractor the {@link MapFunction} to extract the message and partition key from the input message
+ * @param valueExtractor the {@link MapFunction} to extract the value from the input message
* @param id the unique id of this operator in this application
* @param <K> the type of output key
* @param <V> the type of output value
* @return the repartitioned {@link MessageStream}
*/
- <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
- Function<? super M, ? extends V> valueExtractor, String id);
+ <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
+ MapFunction<? super M, ? extends V> valueExtractor, String id);
/**
* Sends messages in this {@link MessageStream} to a {@link Table}. The type of input message is expected
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
index ea83ba4..faf9fc5 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
@@ -33,5 +33,8 @@ import org.apache.samza.annotation.InterfaceStability;
*/
@InterfaceStability.Unstable
public interface ClosableFunction {
+ /**
+ * Frees any resource acquired by the operators in {@link InitableFunction}
+ */
default void close() {}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
index 31bbbd8..ce68e0f 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.functions;
+import java.io.Serializable;
import org.apache.samza.annotation.InterfaceStability;
@@ -28,7 +29,7 @@ import org.apache.samza.annotation.InterfaceStability;
*/
@InterfaceStability.Unstable
@FunctionalInterface
-public interface FilterFunction<M> extends InitableFunction, ClosableFunction {
+public interface FilterFunction<M> extends InitableFunction, ClosableFunction, Serializable {
/**
* Returns a boolean indicating whether this message should be retained or filtered out.
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
index 7e9253e..63d7061 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.functions;
+import java.io.Serializable;
import org.apache.samza.annotation.InterfaceStability;
import java.util.Collection;
@@ -31,7 +32,7 @@ import java.util.Collection;
*/
@InterfaceStability.Unstable
@FunctionalInterface
-public interface FlatMapFunction<M, OM> extends InitableFunction, ClosableFunction {
+public interface FlatMapFunction<M, OM> extends InitableFunction, ClosableFunction, Serializable {
/**
* Transforms the provided message into a collection of 0 or more messages.
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
index 78250e3..d6ba205 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
@@ -19,16 +19,22 @@
package org.apache.samza.operators.functions;
+import java.io.Serializable;
+import org.apache.samza.annotation.InterfaceStability;
+
+
/**
- * Incrementally updates the window value as messages are added to the window.
+ * Incrementally updates the aggregated value as messages are added. Main usage is in {@link org.apache.samza.operators.windows.Window} operator.
*/
-public interface FoldLeftFunction<M, WV> extends InitableFunction, ClosableFunction {
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface FoldLeftFunction<M, WV> extends InitableFunction, ClosableFunction, Serializable {
/**
- * Incrementally updates the window value as messages are added to the window.
+ * Incrementally updates the aggregated value as messages are added.
*
- * @param message the message being added to the window
- * @param oldValue the previous value associated with the window
+ * @param message the message being added to the aggregated value
+ * @param oldValue the previous value
* @return the new value
*/
WV apply(M message, WV oldValue);
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
index 954083d..94a998d 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.functions;
+import java.io.Serializable;
import org.apache.samza.annotation.InterfaceStability;
@@ -30,7 +31,7 @@ import org.apache.samza.annotation.InterfaceStability;
* @param <RM> type of the joined message
*/
@InterfaceStability.Unstable
-public interface JoinFunction<K, M, JM, RM> extends InitableFunction, ClosableFunction {
+public interface JoinFunction<K, M, JM, RM> extends InitableFunction, ClosableFunction, Serializable {
/**
* Joins the provided messages and returns the joined message.
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
index a8c139f..fad9cf8 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.functions;
+import java.io.Serializable;
import org.apache.samza.annotation.InterfaceStability;
@@ -29,7 +30,7 @@ import org.apache.samza.annotation.InterfaceStability;
*/
@InterfaceStability.Unstable
@FunctionalInterface
-public interface MapFunction<M, OM> extends InitableFunction, ClosableFunction {
+public interface MapFunction<M, OM> extends InitableFunction, ClosableFunction, Serializable {
/**
* Transforms the provided message into another message.
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
index e290d7d..2b44125 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.functions;
+import java.io.Serializable;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
@@ -30,7 +31,7 @@ import org.apache.samza.task.TaskCoordinator;
*/
@InterfaceStability.Unstable
@FunctionalInterface
-public interface SinkFunction<M> extends InitableFunction, ClosableFunction {
+public interface SinkFunction<M> extends InitableFunction, ClosableFunction, Serializable {
/**
* Allows sending the provided message to an output {@link org.apache.samza.system.SystemStream} using
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
index 6afcf67..356e07f 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.functions;
+import java.io.Serializable;
import org.apache.samza.annotation.InterfaceStability;
@@ -30,7 +31,7 @@ import org.apache.samza.annotation.InterfaceStability;
* @param <JM> type of join results
*/
@InterfaceStability.Unstable
-public interface StreamTableJoinFunction<K, M, R, JM> extends InitableFunction, ClosableFunction {
+public interface StreamTableJoinFunction<K, M, R, JM> extends InitableFunction, ClosableFunction, Serializable {
/**
* Joins the provided messages and table record, returns the joined message.
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java
new file mode 100644
index 0000000..155fb0e
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import java.io.Serializable;
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * A supplier to return a new value at each invocation
+ */
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface SupplierFunction<T> extends InitableFunction, ClosableFunction, Serializable {
+
+ /**
+ * Returns a value of type T
+ *
+ * @return a value for type T
+ */
+ T get();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
index f52b57b..6bdf406 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
@@ -18,20 +18,24 @@
*/
package org.apache.samza.operators.triggers;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+
/**
* A {@link Trigger} fires as soon as any of its individual triggers has fired.
*/
public class AnyTrigger<M> implements Trigger<M> {
- private final List<Trigger<M>> triggers;
+ private final ArrayList<Trigger<M>> triggers;
AnyTrigger(List<Trigger<M>> triggers) {
- this.triggers = triggers;
+ this.triggers = new ArrayList<>();
+ this.triggers.addAll(triggers);
}
public List<Trigger<M>> getTriggers() {
- return triggers;
+ return Collections.unmodifiableList(triggers);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
index be0a877..f224fa2 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
@@ -20,6 +20,7 @@
package org.apache.samza.operators.triggers;
+import java.io.Serializable;
import org.apache.samza.annotation.InterfaceStability;
/**
@@ -30,6 +31,6 @@ import org.apache.samza.annotation.InterfaceStability;
* @param <M> the type of the incoming message
*/
@InterfaceStability.Unstable
-public interface Trigger<M> {
+public interface Trigger<M> extends Serializable {
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
index 1c0fa53..7534fca 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.windows;
+import java.io.Serializable;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.triggers.Trigger;
@@ -70,7 +71,7 @@ import org.apache.samza.operators.triggers.Trigger;
* @param <WV> the type of the value in the window
*/
@InterfaceStability.Unstable
-public interface Window<M, K, WV> {
+public interface Window<M, K, WV> extends Serializable {
/**
* Set the early triggers for this {@link Window}.
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index 50391ff..4805a0e 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -21,6 +21,8 @@ package org.apache.samza.operators.windows;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
import org.apache.samza.operators.triggers.TimeTrigger;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.Triggers;
@@ -30,8 +32,6 @@ import org.apache.samza.serializers.Serde;
import java.time.Duration;
import java.util.Collection;
-import java.util.function.Function;
-import java.util.function.Supplier;
/**
* APIs for creating different types of {@link Window}s.
@@ -84,7 +84,7 @@ import java.util.function.Supplier;
* and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants
* of the window types above.
*
- * <p> The value for the window can be updated incrementally by providing an {@code initialValue} {@link Supplier}
+ * <p> The value for the window can be updated incrementally by providing an {@code initialValue} {@link SupplierFunction}
* and an aggregating {@link FoldLeftFunction}. The initial value supplier is invoked every time a new window is
* created. The aggregating function is invoked for each incoming message for the window. If these are not provided,
* the emitted {@link WindowPane} will contain a collection of messages in the window.
@@ -105,8 +105,8 @@ public final class Windows {
*
* <pre> {@code
* MessageStream<UserClick> stream = ...;
- * Function<UserClick, String> keyFn = ...;
- * Supplier<Integer> initialValue = () -> 0;
+ * MapFunction<UserClick, String> keyFn = ...;
+ * SupplierFunction<Integer> initialValue = () -> 0;
* FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
* MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
* Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
@@ -125,16 +125,15 @@ public final class Windows {
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function.
*/
- public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval,
- Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, Serde<K> keySerde,
+ public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(MapFunction<? super M, ? extends K> keyFn, Duration interval,
+ SupplierFunction<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, Serde<K> keySerde,
Serde<WV> windowValueSerde) {
Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
- return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
- (Function<M, K>) keyFn, null, WindowType.TUMBLING, keySerde, windowValueSerde, null);
+ return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
+ (MapFunction<M, K>) keyFn, null, WindowType.TUMBLING, keySerde, windowValueSerde, null);
}
-
/**
* Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
* processing time based windows using the provided keyFn.
@@ -157,12 +156,12 @@ public final class Windows {
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function
*/
- public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval,
+ public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(MapFunction<M, K> keyFn, Duration interval,
Serde<K> keySerde, Serde<M> msgSerde) {
Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
- return new WindowInternal<>(defaultTrigger, null, null, keyFn, null,
- WindowType.TUMBLING, keySerde, null, msgSerde);
+ return new WindowInternal<>(defaultTrigger, null, null, keyFn, null, WindowType.TUMBLING,
+ keySerde, null, msgSerde);
}
/**
@@ -173,7 +172,7 @@ public final class Windows {
*
* <pre> {@code
* MessageStream<String> stream = ...;
- * Supplier<Integer> initialValue = () -> 0;
+ * SupplierFunction<Integer> initialValue = () -> 0;
* FoldLeftFunction<String, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
* MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
* Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
@@ -189,10 +188,10 @@ public final class Windows {
* @param <WV> the type of the {@link WindowPane} output value
* @return the created {@link Window} function
*/
- public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, Supplier<? extends WV> initialValue,
+ public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, SupplierFunction<? extends WV> initialValue,
FoldLeftFunction<? super M, WV> aggregator, Serde<WV> windowValueSerde) {
Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
- return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
+ return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
null, null, WindowType.TUMBLING, null, windowValueSerde, null);
}
@@ -221,9 +220,8 @@ public final class Windows {
*/
public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration, Serde<M> msgSerde) {
Trigger<M> defaultTrigger = new TimeTrigger<>(duration);
-
- return new WindowInternal<>(defaultTrigger, null, null, null,
- null, WindowType.TUMBLING, null, null, msgSerde);
+ return new WindowInternal<>(defaultTrigger, null, null, null, null,
+ WindowType.TUMBLING, null, null, msgSerde);
}
/**
@@ -238,7 +236,7 @@ public final class Windows {
*
* <pre> {@code
* MessageStream<UserClick> stream = ...;
- * Supplier<Integer> initialValue = () -> 0;
+ * SupplierFunction<Integer> initialValue = () -> 0;
* FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
* Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
* MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
@@ -258,12 +256,12 @@ public final class Windows {
* @param <WV> the type of the output value in the {@link WindowPane}
* @return the created {@link Window} function
*/
- public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<? super M, ? extends K> keyFn,
- Duration sessionGap, Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator,
+ public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(MapFunction<? super M, ? extends K> keyFn,
+ Duration sessionGap, SupplierFunction<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator,
Serde<K> keySerde, Serde<WV> windowValueSerde) {
Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
- return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
- (Function<M, K>) keyFn, null, WindowType.SESSION, keySerde, windowValueSerde, null);
+ return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
+ (MapFunction<M, K>) keyFn, null, WindowType.SESSION, keySerde, windowValueSerde, null);
}
/**
@@ -278,7 +276,7 @@ public final class Windows {
*
* <pre> {@code
* MessageStream<UserClick> stream = ...;
- * Supplier<Integer> initialValue = () -> 0;
+ * SupplierFunction<Integer> initialValue = () -> 0;
* FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
* Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
* MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
@@ -294,11 +292,10 @@ public final class Windows {
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function
*/
- public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<? super M, ? extends K> keyFn,
+ public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(MapFunction<? super M, ? extends K> keyFn,
Duration sessionGap, Serde<K> keySerde, Serde<M> msgSerde) {
-
Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
- return new WindowInternal<>(defaultTrigger, null, null, (Function<M, K>) keyFn,
+ return new WindowInternal<>(defaultTrigger, null, null, (MapFunction<M, K>) keyFn,
null, WindowType.SESSION, keySerde, null, msgSerde);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
index bc71872..ff19aba 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
@@ -18,14 +18,14 @@
*/
package org.apache.samza.operators.windows.internal;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.serializers.Serde;
-import java.util.function.Function;
-import java.util.function.Supplier;
/**
* Internal representation of a {@link Window}. This specifies default, early and late triggers for the {@link Window}
@@ -45,7 +45,7 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> {
/**
* The supplier of initial value to be used for windowed aggregations
*/
- private final Supplier<WV> initializer;
+ private final SupplierFunction<WV> initializer;
/*
* The function that is applied each time a {@link MessageEnvelope} is added to this window.
@@ -55,28 +55,32 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> {
/*
* The function that extracts the key from a {@link MessageEnvelope}
*/
- private final Function<M, WK> keyExtractor;
+ private final MapFunction<M, WK> keyExtractor;
/*
* The function that extracts the event time from a {@link MessageEnvelope}
*/
- private final Function<M, Long> eventTimeExtractor;
+ private final MapFunction<M, Long> eventTimeExtractor;
/**
* The type of this window. Tumbling and Session windows are supported for now.
*/
private final WindowType windowType;
- private final Serde<WK> keySerde;
- private final Serde<WV> windowValSerde;
- private final Serde<M> msgSerde;
-
private Trigger<M> earlyTrigger;
private Trigger<M> lateTrigger;
private AccumulationMode mode;
- public WindowInternal(Trigger<M> defaultTrigger, Supplier<WV> initializer, FoldLeftFunction<M, WV> foldLeftFunction,
- Function<M, WK> keyExtractor, Function<M, Long> eventTimeExtractor, WindowType windowType, Serde<WK> keySerde,
+ /**
+ * The following {@link Serde}s are serialized by the ExecutionPlanner when generating the store configs, and deserialized
+ * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis
+ */
+ private transient final Serde<WK> keySerde;
+ private transient final Serde<WV> windowValSerde;
+ private transient final Serde<M> msgSerde;
+
+ public WindowInternal(Trigger<M> defaultTrigger, SupplierFunction<WV> initializer, FoldLeftFunction<M, WV> foldLeftFunction,
+ MapFunction<M, WK> keyExtractor, MapFunction<M, Long> eventTimeExtractor, WindowType windowType, Serde<WK> keySerde,
Serde<WV> windowValueSerde, Serde<M> msgSerde) {
this.defaultTrigger = defaultTrigger;
this.initializer = initializer;
@@ -121,7 +125,7 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> {
return lateTrigger;
}
- public Supplier<WV> getInitializer() {
+ public SupplierFunction<WV> getInitializer() {
return initializer;
}
@@ -129,11 +133,11 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> {
return foldLeftFunction;
}
- public Function<M, WK> getKeyExtractor() {
+ public MapFunction<M, WK> getKeyExtractor() {
return keyExtractor;
}
- public Function<M, Long> getEventTimeExtractor() {
+ public MapFunction<M, Long> getEventTimeExtractor() {
return eventTimeExtractor;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
index d70746c..d49518c 100644
--- a/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
+++ b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
@@ -68,7 +68,7 @@ public class SerializableSerde<T extends Serializable> implements Serde<T> {
ois = new ObjectInputStream(bis);
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
- throw new SamzaException("Error reading from input stream.");
+ throw new SamzaException("Error reading from input stream.", e);
} finally {
try {
if (ois != null) {
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index ce67d8d..cd86426 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -19,6 +19,7 @@
package org.apache.samza.system;
+import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -33,7 +34,7 @@ import java.util.Map;
*
* It is immutable by design.
*/
-public class StreamSpec {
+public class StreamSpec implements Serializable {
private static final int DEFAULT_PARTITION_COUNT = 1;
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
index 95cc266..e9ca9f7 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
@@ -60,7 +60,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
public Partition getPartition() {
return partition;
}
-
+
public SystemStream getSystemStream() {
return new SystemStream(system, stream);
}
@@ -69,7 +69,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy
public int hashCode() {
return hash;
}
-
+
private int computeHashCode() {
final int prime = 31;
int result = super.hashCode();
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
index 68043f9..ba57c2f 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.table;
+import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -41,12 +42,17 @@ import org.apache.samza.serializers.KVSerde;
* It is immutable by design.
*/
@InterfaceStability.Unstable
-public class TableSpec {
+public class TableSpec implements Serializable {
private final String id;
- private final KVSerde serde;
private final String tableProviderFactoryClassName;
- private final Map<String, String> config = new HashMap<>();
+
+ /**
+ * The following fields are serialized by the ExecutionPlanner when generating the configs for a table, and deserialized
+ * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis
+ */
+ private transient final KVSerde serde;
+ private transient final Map<String, String> config = new HashMap<>();
/**
* Default constructor
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
index 4184c9d..19cce6f 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
@@ -27,7 +27,7 @@ import static org.junit.Assert.assertEquals;
public class TestWindowPane {
@Test
public void testConstructor() {
- WindowPane<String, Integer> wndOutput = new WindowPane(new WindowKey("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY);
+ WindowPane<String, Integer> wndOutput = new WindowPane<>(new WindowKey<>("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY);
assertEquals(wndOutput.getKey().getKey(), "testMsg");
assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index e2c122a..9d8bd5f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -34,7 +34,7 @@ import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.system.StreamSpec;
@@ -61,18 +61,18 @@ public class ExecutionPlanner {
this.streamManager = streamManager;
}
- public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
+ public ExecutionPlan plan(OperatorSpecGraph specGraph) throws Exception {
validateConfig();
// create physical job graph based on stream graph
- JobGraph jobGraph = createJobGraph(streamGraph);
+ JobGraph jobGraph = createJobGraph(specGraph);
// fetch the external streams partition info
updateExistingPartitions(jobGraph, streamManager);
if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
// figure out the partitions for internal streams
- calculatePartitions(streamGraph, jobGraph);
+ calculatePartitions(jobGraph);
}
return jobGraph;
@@ -91,12 +91,12 @@ public class ExecutionPlanner {
/**
* Create the physical graph from StreamGraph
*/
- /* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) {
- JobGraph jobGraph = new JobGraph(config);
- Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
- Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
+ /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) {
+ JobGraph jobGraph = new JobGraph(config, specGraph);
+ Set<StreamSpec> sourceStreams = new HashSet<>(specGraph.getInputOperators().keySet());
+ Set<StreamSpec> sinkStreams = new HashSet<>(specGraph.getOutputStreams().keySet());
Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
- Set<TableSpec> tables = new HashSet<>(streamGraph.getTables().keySet());
+ Set<TableSpec> tables = new HashSet<>(specGraph.getTables().keySet());
intStreams.retainAll(sinkStreams);
sourceStreams.removeAll(intStreams);
sinkStreams.removeAll(intStreams);
@@ -104,7 +104,7 @@ public class ExecutionPlanner {
// For this phase, we have a single job node for the whole dag
String jobName = config.get(JobConfig.JOB_NAME());
String jobId = config.get(JobConfig.JOB_ID(), "1");
- JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId, streamGraph);
+ JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId);
// add sources
sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
@@ -126,9 +126,9 @@ public class ExecutionPlanner {
/**
* Figure out the number of partitions of all streams
*/
- /* package private */ void calculatePartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
+ /* package private */ void calculatePartitions(JobGraph jobGraph) {
// calculate the partitions for the input streams of join operators
- calculateJoinInputPartitions(streamGraph, jobGraph);
+ calculateJoinInputPartitions(jobGraph);
// calculate the partitions for the rest of intermediate streams
calculateIntStreamPartitions(jobGraph, config);
@@ -172,7 +172,7 @@ public class ExecutionPlanner {
/**
* Calculate the partitions for the input streams of join operators
*/
- /* package private */ static void calculateJoinInputPartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
+ /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph) {
// mapping from a source stream to all join specs reachable from it
Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
// reverse mapping of the above
@@ -182,7 +182,7 @@ public class ExecutionPlanner {
// The visited set keeps track of the join specs that have been already inserted in the queue before
Set<OperatorSpec> visited = new HashSet<>();
- streamGraph.getInputOperators().entrySet().forEach(entry -> {
+ jobGraph.getSpecGraph().getInputOperators().entrySet().forEach(entry -> {
StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey());
// Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index abd3ce7..843db85 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -34,7 +34,7 @@ import java.util.stream.Collectors;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.table.TableSpec;
import org.slf4j.Logger;
@@ -60,13 +60,15 @@ import org.slf4j.LoggerFactory;
private final Set<TableSpec> tables = new HashSet<>();
private final Config config;
private final JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();
+ private final OperatorSpecGraph specGraph;
/**
* The JobGraph is only constructed by the {@link ExecutionPlanner}.
* @param config Config
*/
- JobGraph(Config config) {
+ JobGraph(Config config, OperatorSpecGraph specGraph) {
this.config = config;
+ this.specGraph = specGraph;
}
@Override
@@ -107,6 +109,10 @@ import org.slf4j.LoggerFactory;
return new ApplicationConfig(config);
}
+ public OperatorSpecGraph getSpecGraph() {
+ return specGraph;
+ }
+
/**
* Add a source stream to a {@link JobNode}
* @param input source stream
@@ -152,11 +158,11 @@ import org.slf4j.LoggerFactory;
* @param jobId id of the job
* @return
*/
- JobNode getOrCreateJobNode(String jobName, String jobId, StreamGraphImpl streamGraph) {
+ JobNode getOrCreateJobNode(String jobName, String jobId) {
String nodeId = JobNode.createId(jobName, jobId);
JobNode node = nodes.get(nodeId);
if (node == null) {
- node = new JobNode(jobName, jobId, streamGraph, config);
+ node = new JobNode(jobName, jobId, specGraph, config);
nodes.put(nodeId, node);
}
return node;
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 48d2219..298042b 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -170,7 +170,7 @@ import org.codehaus.jackson.map.ObjectMapper;
private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
OperatorGraphJson opGraph = new OperatorGraphJson();
opGraph.inputStreams = new ArrayList<>();
- jobNode.getStreamGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> {
+ jobNode.getSpecGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> {
StreamJson inputJson = new StreamJson();
opGraph.inputStreams.add(inputJson);
inputJson.streamId = streamSpec.getId();
@@ -181,7 +181,7 @@ import org.codehaus.jackson.map.ObjectMapper;
});
opGraph.outputStreams = new ArrayList<>();
- jobNode.getStreamGraph().getOutputStreams().keySet().forEach(streamSpec -> {
+ jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamSpec -> {
StreamJson outputJson = new StreamJson();
outputJson.streamId = streamSpec.getId();
opGraph.outputStreams.add(outputJson);
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 8abd463..db44d9f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -39,7 +39,7 @@ import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
@@ -73,22 +73,22 @@ public class JobNode {
private final String jobName;
private final String jobId;
private final String id;
- private final StreamGraphImpl streamGraph;
+ private final OperatorSpecGraph specGraph;
private final List<StreamEdge> inEdges = new ArrayList<>();
private final List<StreamEdge> outEdges = new ArrayList<>();
private final List<TableSpec> tables = new ArrayList<>();
private final Config config;
- JobNode(String jobName, String jobId, StreamGraphImpl streamGraph, Config config) {
+ JobNode(String jobName, String jobId, OperatorSpecGraph specGraph, Config config) {
this.jobName = jobName;
this.jobId = jobId;
this.id = createId(jobName, jobId);
- this.streamGraph = streamGraph;
+ this.specGraph = specGraph;
this.config = config;
}
- public StreamGraphImpl getStreamGraph() {
- return streamGraph;
+ public OperatorSpecGraph getSpecGraph() {
+ return this.specGraph;
}
public String getId() {
@@ -154,7 +154,7 @@ public class JobNode {
}
// set triggering interval if a window or join is defined
- if (streamGraph.hasWindowOrJoins()) {
+ if (specGraph.hasWindowOrJoins()) {
if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) {
long triggerInterval = computeTriggerInterval();
log.info("Using triggering interval: {} for jobName: {}", triggerInterval, jobName);
@@ -163,7 +163,7 @@ public class JobNode {
}
}
- streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
+ specGraph.getAllOperatorSpecs().forEach(opSpec -> {
if (opSpec instanceof StatefulOperatorSpec) {
((StatefulOperatorSpec) opSpec).getStoreDescriptors()
.forEach(sd -> configs.putAll(sd.getStorageConfigs()));
@@ -228,14 +228,14 @@ public class JobNode {
// collect all key and msg serde instances for streams
Map<String, Serde> streamKeySerdes = new HashMap<>();
Map<String, Serde> streamMsgSerdes = new HashMap<>();
- Map<StreamSpec, InputOperatorSpec> inputOperators = streamGraph.getInputOperators();
+ Map<StreamSpec, InputOperatorSpec> inputOperators = specGraph.getInputOperators();
inEdges.forEach(edge -> {
String streamId = edge.getStreamSpec().getId();
InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec());
streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde());
streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde());
});
- Map<StreamSpec, OutputStreamImpl> outputStreams = streamGraph.getOutputStreams();
+ Map<StreamSpec, OutputStreamImpl> outputStreams = specGraph.getOutputStreams();
outEdges.forEach(edge -> {
String streamId = edge.getStreamSpec().getId();
OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec());
@@ -246,7 +246,7 @@ public class JobNode {
// collect all key and msg serde instances for stores
Map<String, Serde> storeKeySerdes = new HashMap<>();
Map<String, Serde> storeMsgSerdes = new HashMap<>();
- streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
+ specGraph.getAllOperatorSpecs().forEach(opSpec -> {
if (opSpec instanceof StatefulOperatorSpec) {
((StatefulOperatorSpec) opSpec).getStoreDescriptors().forEach(storeDescriptor -> {
storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde());
@@ -320,8 +320,8 @@ public class JobNode {
* Computes the triggering interval to use during the execution of this {@link JobNode}
*/
private long computeTriggerInterval() {
- // Obtain the operator specs from the streamGraph
- Collection<OperatorSpec> operatorSpecs = streamGraph.getAllOperatorSpecs();
+ // Obtain the operator specs from the specGraph
+ Collection<OperatorSpec> operatorSpecs = specGraph.getAllOperatorSpecs();
// Filter out window operators, and obtain a list of their triggering interval values
List<Long> windowTimerIntervals = operatorSpecs.stream()
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 1681f30..6922c76 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -21,7 +21,6 @@ package org.apache.samza.operators;
import java.time.Duration;
import java.util.Collection;
-import java.util.function.Function;
import org.apache.samza.SamzaException;
import org.apache.samza.operators.functions.FilterFunction;
@@ -64,16 +63,16 @@ import org.apache.samza.table.TableSpec;
*/
public class MessageStreamImpl<M> implements MessageStream<M> {
/**
- * The {@link StreamGraphImpl} that contains this {@link MessageStreamImpl}
+ * The {@link StreamGraphSpec} that contains this {@link MessageStreamImpl}
*/
- private final StreamGraphImpl graph;
+ private final StreamGraphSpec graph;
/**
* The {@link OperatorSpec} associated with this {@link MessageStreamImpl}
*/
private final OperatorSpec operatorSpec;
- public MessageStreamImpl(StreamGraphImpl graph, OperatorSpec<?, M> operatorSpec) {
+ public MessageStreamImpl(StreamGraphSpec graph, OperatorSpec<?, M> operatorSpec) {
this.graph = graph;
this.operatorSpec = operatorSpec;
}
@@ -81,7 +80,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) {
String opId = this.graph.getNextOpId(OpCode.MAP);
- OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId);
+ StreamOperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.graph, op);
}
@@ -89,7 +88,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
String opId = this.graph.getNextOpId(OpCode.FILTER);
- OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId);
+ StreamOperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.graph, op);
}
@@ -97,7 +96,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
String opId = this.graph.getNextOpId(OpCode.FLAT_MAP);
- OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId);
+ StreamOperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.graph, op);
}
@@ -112,15 +111,15 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public void sendTo(OutputStream<M> outputStream) {
String opId = this.graph.getNextOpId(OpCode.SEND_TO);
- OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamImpl<M>) outputStream, opId);
+ OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
+ (OutputStreamImpl<M>) outputStream, opId);
this.operatorSpec.registerNextOperatorSpec(op);
}
@Override
public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String userDefinedId) {
String opId = this.graph.getNextOpId(OpCode.WINDOW, userDefinedId);
- OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec(
- (WindowInternal<M, K, WV>) window, opId);
+ OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.graph, op);
}
@@ -131,24 +130,24 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
Duration ttl, String userDefinedId) {
if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself.");
- OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId);
- JoinOperatorSpec<K, M, OM, JM> joinOpSpec =
- OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn,
- keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), opId);
-
- this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
- otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec);
+ OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
+ JoinOperatorSpec<K, M, OM, JM> op =
+ OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn, keySerde,
+ messageSerde, otherMessageSerde, ttl.toMillis(), opId);
+ this.operatorSpec.registerNextOperatorSpec(op);
+ otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) op);
- return new MessageStreamImpl<>(this.graph, joinOpSpec);
+ return new MessageStreamImpl<>(this.graph, op);
}
@Override
public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn) {
+ String opId = this.graph.getNextOpId(OpCode.JOIN);
TableSpec tableSpec = ((TableImpl) table).getTableSpec();
StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec(
- tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, this.graph.getNextOpId(OpCode.JOIN));
+ tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId);
this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
return new MessageStreamImpl<>(this.graph, joinOpSpec);
}
@@ -157,46 +156,38 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
if (otherStreams.isEmpty()) return this;
String opId = this.graph.getNextOpId(OpCode.MERGE);
- StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(opId);
- this.operatorSpec.registerNextOperatorSpec(opSpec);
- otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(opSpec));
- return new MessageStreamImpl<>(this.graph, opSpec);
+ StreamOperatorSpec<M, M> op = OperatorSpecs.createMergeOperatorSpec(opId);
+ this.operatorSpec.registerNextOperatorSpec(op);
+ otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(op));
+ return new MessageStreamImpl<>(this.graph, op);
}
@Override
- public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
- Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) {
+ public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
+ MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) {
String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId);
IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde);
if (!intermediateStream.isKeyed()) {
// this can only happen when the default serde partitionBy variant is being used
throw new SamzaException("partitionBy can not be used with a default serde that is not a KVSerde.");
}
- PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec =
- OperatorSpecs.createPartitionByOperatorSpec(
- intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId);
+ PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
+ intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId);
this.operatorSpec.registerNextOperatorSpec(partitionByOperatorSpec);
return intermediateStream;
}
@Override
- public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
- Function<? super M, ? extends V> valueExtractor, String userDefinedId) {
+ public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
+ MapFunction<? super M, ? extends V> valueExtractor, String userDefinedId) {
return partitionBy(keyExtractor, valueExtractor, null, userDefinedId);
}
- /**
- * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
- * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
- */
- protected OperatorSpec<?, M> getOperatorSpec() {
- return this.operatorSpec;
- }
-
@Override
public <K, V> void sendTo(Table<KV<K, V>> table) {
- SendToTableOperatorSpec<K, V> op = OperatorSpecs.createSendToTableOperatorSpec(
- this.operatorSpec, ((TableImpl) table).getTableSpec(), this.graph.getNextOpId(OpCode.SEND_TO));
+ String opId = this.graph.getNextOpId(OpCode.SEND_TO);
+ SendToTableOperatorSpec<K, V> op =
+ OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableSpec(), opId);
this.operatorSpec.registerNextOperatorSpec(op);
}
@@ -215,4 +206,12 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
return broadcast(null, userDefinedId);
}
+ /**
+ * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
+ * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
+ */
+ protected OperatorSpec<?, M> getOperatorSpec() {
+ return this.operatorSpec;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
new file mode 100644
index 0000000..ba51c7c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Defines the serialized format of {@link StreamGraphSpec}. This class encapsulates all getter methods to get the {@link OperatorSpec}
+ * initialized in the {@link StreamGraphSpec} and constructsthe corresponding serialized instances of {@link OperatorSpec}.
+ * The {@link StreamGraphSpec} and {@link OperatorSpec} instances included in this class are considered as immutable and read-only.
+ * The instance of {@link OperatorSpecGraph} should only be used in runtime to construct {@link org.apache.samza.task.StreamOperatorTask}.
+ */
+public class OperatorSpecGraph implements Serializable {
+ // We use a LHM for deterministic order in initializing and closing operators.
+ private final Map<StreamSpec, InputOperatorSpec> inputOperators;
+ private final Map<StreamSpec, OutputStreamImpl> outputStreams;
+ private final Map<TableSpec, TableImpl> tables;
+ private final Set<OperatorSpec> allOpSpecs;
+ private final boolean hasWindowOrJoins;
+
+ // The following objects are transient since they are recreateable.
+ private transient final SerializableSerde<OperatorSpecGraph> opSpecGraphSerde = new SerializableSerde<>();
+ private transient final byte[] serializedOpSpecGraph;
+
+ OperatorSpecGraph(StreamGraphSpec graphSpec) {
+ this.inputOperators = graphSpec.getInputOperators();
+ this.outputStreams = graphSpec.getOutputStreams();
+ this.tables = graphSpec.getTables();
+ this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs());
+ this.hasWindowOrJoins = checkWindowOrJoins();
+ this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this);
+ }
+
+ public Map<StreamSpec, InputOperatorSpec> getInputOperators() {
+ return inputOperators;
+ }
+
+ public Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
+ return outputStreams;
+ }
+
+ public Map<TableSpec, TableImpl> getTables() {
+ return tables;
+ }
+
+ /**
+ * Get all {@link OperatorSpec}s available in this {@link StreamGraphSpec}
+ *
+ * @return all available {@link OperatorSpec}s
+ */
+ public Collection<OperatorSpec> getAllOperatorSpecs() {
+ return allOpSpecs;
+ }
+
+ /**
+ * Returns <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator
+ *
+ * @return <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator
+ */
+ public boolean hasWindowOrJoins() {
+ return hasWindowOrJoins;
+ }
+
+ /**
+ * Returns a deserialized {@link OperatorSpecGraph} as a copy from this instance of {@link OperatorSpecGraph}
+ * This is used to create per-task instance of {@link OperatorSpecGraph} when instantiating task instances.
+ *
+ * @return a copy of this {@link OperatorSpecGraph} object via deserialization
+ */
+ public OperatorSpecGraph clone() {
+ if (opSpecGraphSerde == null) {
+ throw new IllegalStateException("Cannot clone from an already deserialized OperatorSpecGraph.");
+ }
+ return opSpecGraphSerde.fromBytes(serializedOpSpecGraph);
+ }
+
+ private HashSet<OperatorSpec> findAllOperatorSpecs() {
+ Collection<InputOperatorSpec> inputOperatorSpecs = this.inputOperators.values();
+ HashSet<OperatorSpec> operatorSpecs = new HashSet<>();
+ for (InputOperatorSpec inputOperatorSpec : inputOperatorSpecs) {
+ operatorSpecs.add(inputOperatorSpec);
+ doGetOperatorSpecs(inputOperatorSpec, operatorSpecs);
+ }
+ return operatorSpecs;
+ }
+
+ private void doGetOperatorSpecs(OperatorSpec operatorSpec, Set<OperatorSpec> specs) {
+ Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs();
+ for (OperatorSpec registeredOperatorSpec : registeredOperatorSpecs) {
+ specs.add(registeredOperatorSpec);
+ doGetOperatorSpecs(registeredOperatorSpec, specs);
+ }
+ }
+
+ private boolean checkWindowOrJoins() {
+ Set<OperatorSpec> windowOrJoinSpecs = allOpSpecs.stream()
+ .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW || spec.getOpCode() == OperatorSpec.OpCode.JOIN)
+ .collect(Collectors.toSet());
+
+ return windowOrJoinSpecs.size() != 0;
+ }
+
+}