You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/02/27 20:30:59 UTC
[1/2] samza git commit: Enhancements to execution planner
Repository: samza
Updated Branches:
refs/heads/samza-fluent-api-v1 d39bce9cb -> dde754246
Enhancements to execution planner
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dde75424
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dde75424
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dde75424
Branch: refs/heads/samza-fluent-api-v1
Commit: dde754246391729e00a19a15058525cdeb2fca1a
Parents: 93c82f3
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Mon Feb 27 12:25:23 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Feb 27 12:29:45 2017 -0800
----------------------------------------------------------------------
.../samza/processorgraph/ExecutionPlanner.java | 34 ++++++++++++--------
1 file changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/dde75424/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
index a990463..055f87c 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -59,7 +59,7 @@ public class ExecutionPlanner {
ProcessorGraph processorGraph = splitStages(streamGraph);
// figure out the partition for internal streams
- Multimap<String, StreamSpec> streams = calculatePartitions(processorGraph, sysAdmins);
+ Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins);
// create the streams
createStreams(streams, sysAdmins);
@@ -96,9 +96,12 @@ public class ExecutionPlanner {
return processorGraph;
}
- private Multimap<String, StreamSpec> calculatePartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+ private Multimap<String, StreamSpec> calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
// fetch the external streams partition info
- getExternalStreamPartitions(processorGraph, sysAdmins);
+ getExistingStreamPartitions(processorGraph, sysAdmins);
+
+ // use BFS to figure out the join partition count
+
// TODO this algorithm assumes only one processor, and it does not consider join
Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create();
@@ -112,9 +115,11 @@ public class ExecutionPlanner {
int partition = Math.max(maxInPartition, maxOutPartition);
outStreams.forEach(streamEdge -> {
- streamEdge.setPartitions(partition);
- StreamSpec streamSpec = createStreamSpec(streamEdge);
- streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec);
+ if (streamEdge.getPartitions() == -1) {
+ streamEdge.setPartitions(partition);
+ StreamSpec streamSpec = createStreamSpec(streamEdge);
+ streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec);
+ }
});
}
});
@@ -122,16 +127,17 @@ public class ExecutionPlanner {
return streamsGroupedBySystem;
}
- private void getExternalStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
- Set<StreamEdge> externalStreams = new HashSet<>();
- externalStreams.addAll(processorGraph.getSources());
- externalStreams.addAll(processorGraph.getSinks());
+ private void getExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+ Set<StreamEdge> allStreams = new HashSet<>();
+ allStreams.addAll(processorGraph.getSources());
+ allStreams.addAll(processorGraph.getSinks());
+ allStreams.addAll(processorGraph.getInternalStreams());
Multimap<String, StreamEdge> externalStreamsMap = HashMultimap.create();
- externalStreams.forEach(streamEdge -> {
- SystemStream systemStream = streamEdge.getSystemStream();
- externalStreamsMap.put(systemStream.getSystem(), streamEdge);
- });
+ allStreams.forEach(streamEdge -> {
+ SystemStream systemStream = streamEdge.getSystemStream();
+ externalStreamsMap.put(systemStream.getSystem(), streamEdge);
+ });
for (Map.Entry<String, Collection<StreamEdge>> entry : externalStreamsMap.asMap().entrySet()) {
String systemName = entry.getKey();
Collection<StreamEdge> streamEdges = entry.getValue();
[2/2] samza git commit: SAMZA-1092: replace stream spec in fluent API
Posted by xi...@apache.org.
SAMZA-1092: replace stream spec in fluent API
Replaced the StreamSpec class w/ the new one from SAMZA-1075.
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Reviewers: Jacob Maes <jm...@linkedin.com>
Closes #58 from nickpan47/replace-stream-spec and squashes the following commits:
761ebb5 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class in system package
df953c2 [Yi Pan (Data Infrastructure)] SAMZA-1092: fix unit test
71331d8 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class
2fb19e9 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class in fluent API
ed3ad8e [Yi Pan (Data Infrastructure)] WIP: replace stream spec in fluent API
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/93c82f3d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/93c82f3d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/93c82f3d
Branch: refs/heads/samza-fluent-api-v1
Commit: 93c82f3de4f86d18b363ffb84bdaa407d4f2cac5
Parents: d39bce9
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Thu Feb 23 12:48:56 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Feb 27 12:29:45 2017 -0800
----------------------------------------------------------------------
.../org/apache/samza/operators/StreamGraph.java | 2 +-
.../org/apache/samza/operators/StreamSpec.java | 46 -----------
.../apache/samza/operators/StreamGraphImpl.java | 83 +++++++++-----------
.../samza/example/KeyValueStoreExample.java | 24 +-----
.../samza/example/NoContextStreamExample.java | 33 +-------
.../samza/example/OrderShipmentJoinExample.java | 35 +--------
.../samza/example/PageViewCounterExample.java | 23 +-----
.../samza/example/RepartitionExample.java | 23 +-----
.../samza/example/TestBroadcastExample.java | 15 +---
.../apache/samza/example/TestJoinExample.java | 26 ++----
.../apache/samza/example/TestWindowExample.java | 15 +---
.../operators/impl/TestStreamOperatorImpl.java | 1 +
12 files changed, 69 insertions(+), 257 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index abc9861..30c4576 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -21,10 +21,10 @@ package org.apache.samza.operators;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.StreamSpec;
import java.util.Map;
-
/**
* Job-level programming interface to create an operator DAG and run in various different runtime environments.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
deleted file mode 100644
index c8a5e8d..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.system.SystemStream;
-
-import java.util.Properties;
-
-
-/**
- * This interface defines the specification of a {@link SystemStream}. It will be used by the {@link org.apache.samza.system.SystemAdmin}
- * to create a {@link SystemStream}
- */
-@InterfaceStability.Unstable
-public interface StreamSpec {
- /**
- * Get the {@link SystemStream}
- *
- * @return {@link SystemStream} object
- */
- SystemStream getSystemStream();
-
- /**
- * Get the physical properties of the {@link SystemStream}
- *
- * @return the properties of this stream
- */
- Properties getProperties();
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index dca3469..353f455 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,12 +18,12 @@
*/
package org.apache.samza.operators;
-import java.util.Properties;
import java.util.function.Function;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
@@ -44,6 +44,9 @@ public class StreamGraphImpl implements StreamGraph {
*/
private int opId = 0;
+ // TODO: SAMZA-1101: the instantiation of physical streams and the physical sink functions should be delayed
+ // after physical deployment. The input/output/intermediate stream creation should also be delegated to {@link ExecutionEnvironment}
+ // s.t. we can allow different physical instantiation of stream under different execution environment w/o code change.
private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> {
final StreamSpec spec;
final Serde<K> keySerde;
@@ -83,7 +86,7 @@ public class StreamGraphImpl implements StreamGraph {
// TODO: need to find a way to directly pass in the serde class names
// mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
// message.getKey(), message.getKey(), message.getMessage()));
- mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
+ mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
};
}
}
@@ -112,10 +115,10 @@ public class StreamGraphImpl implements StreamGraph {
// mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
// message.getKey(), message.getKey(), message.getMessage()));
if (this.parKeyFn == null) {
- mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
+ mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
} else {
// apply partition key function
- mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
+ mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
}
};
}
@@ -124,17 +127,17 @@ public class StreamGraphImpl implements StreamGraph {
/**
* Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl}
*/
- private final Map<SystemStream, MessageStream> inStreams = new HashMap<>();
- private final Map<SystemStream, OutputStream> outStreams = new HashMap<>();
+ private final Map<String, MessageStream> inStreams = new HashMap<>();
+ private final Map<String, OutputStream> outStreams = new HashMap<>();
private ContextManager contextManager = new ContextManager() { };
@Override
public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
- if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
- this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+ if (!this.inStreams.containsKey(streamSpec.getId())) {
+ this.inStreams.putIfAbsent(streamSpec.getId(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
}
- return this.inStreams.get(streamSpec.getSystemStream());
+ return this.inStreams.get(streamSpec.getId());
}
/**
@@ -146,10 +149,10 @@ public class StreamGraphImpl implements StreamGraph {
*/
@Override
public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
- if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
- this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+ if (!this.outStreams.containsKey(streamSpec.getId())) {
+ this.outStreams.putIfAbsent(streamSpec.getId(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
}
- return this.outStreams.get(streamSpec.getSystemStream());
+ return this.outStreams.get(streamSpec.getId());
}
/**
@@ -161,12 +164,12 @@ public class StreamGraphImpl implements StreamGraph {
*/
@Override
public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
- if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
- this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
+ if (!this.inStreams.containsKey(streamSpec.getId())) {
+ this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
}
- IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream());
- if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
- this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+ IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getId());
+ if (!this.outStreams.containsKey(streamSpec.getId())) {
+ this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
}
return intStream;
}
@@ -200,12 +203,15 @@ public class StreamGraphImpl implements StreamGraph {
/**
* Helper method to be get the input stream via {@link SystemStream}
*
- * @param systemStream the {@link SystemStream}
+ * @param sstream the {@link SystemStream}
* @return a {@link MessageStreamImpl} object corresponding to the {@code systemStream}
*/
- public MessageStreamImpl getInputStream(SystemStream systemStream) {
- if (this.inStreams.containsKey(systemStream)) {
- return (MessageStreamImpl) this.inStreams.get(systemStream);
+ public MessageStreamImpl getInputStream(SystemStream sstream) {
+ for(MessageStream entry: this.inStreams.values()) {
+ if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() &&
+ ((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) {
+ return (MessageStreamImpl) entry;
+ }
}
return null;
}
@@ -217,13 +223,6 @@ public class StreamGraphImpl implements StreamGraph {
return null;
}
- <M> MessageStream<M> getIntStream(OutputStream<M> outStream) {
- if (this.inStreams.containsValue(outStream)) {
- return (MessageStream<M>) outStream;
- }
- return null;
- }
-
/**
* Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method.
*
@@ -234,27 +233,21 @@ public class StreamGraphImpl implements StreamGraph {
*/
<PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
// TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
- StreamSpec streamSpec = new StreamSpec() {
- @Override
- public SystemStream getSystemStream() {
- // TODO: should auto-generate intermedaite stream name here
- return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId));
- }
+ StreamSpec streamSpec = this.createIntStreamSpec();
- @Override
- public Properties getProperties() {
- return null;
- }
- };
-
- if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
- this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
+ if (!this.inStreams.containsKey(streamSpec.getId())) {
+ this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
}
- IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream());
- if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
- this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+ IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId());
+ if (!this.outStreams.containsKey(streamSpec.getId())) {
+ this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
}
return intStream;
}
+ private StreamSpec createIntStreamSpec() {
+ // TODO: placeholder to generate the intermediate stream's {@link StreamSpec} automatically
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 85ebc6c..4a0681e 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -32,12 +32,10 @@ import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.CommandLine;
-import java.util.Properties;
-
/**
* Example code using {@link KeyValueStore} to implement event-time window
@@ -113,25 +111,9 @@ public class KeyValueStoreExample implements StreamGraphBuilder {
}
}
- StreamSpec input1 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewEvent");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
+ StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
- StreamSpec output = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewPerMember5min");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
+ StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
String pageId;
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
index c6d2e6e..320680c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -28,13 +28,12 @@ import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.CommandLine;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
/**
@@ -42,35 +41,11 @@ import java.util.Properties;
*/
public class NoContextStreamExample implements StreamGraphBuilder {
- StreamSpec input1 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "input1");
- }
+ StreamSpec input1 = new StreamSpec("inputStreamA", "PageViewEvent", "kafka");
- @Override public Properties getProperties() {
- return null;
- }
- };
+ StreamSpec input2 = new StreamSpec("inputStreamB", "RumLixEvent", "kafka");
- StreamSpec input2 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "input2");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- StreamSpec output = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "output");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
+ StreamSpec output = new StreamSpec("joinedPageViewStream", "PageViewJoinRumLix", "kafka");
class MessageType {
String joinKey;
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 0477066..30ce7d2 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -23,17 +23,14 @@ import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphBuilder;
import org.apache.samza.config.Config;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
-import java.util.Properties;
-
/**
* Simple 2-way stream-to-stream join example
@@ -71,35 +68,11 @@ public class OrderShipmentJoinExample implements StreamGraphBuilder {
standaloneEnv.run(new OrderShipmentJoinExample(), config);
}
- StreamSpec input1 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "Orders");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- StreamSpec input2 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "Shipment");
- }
+ StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka");
- @Override public Properties getProperties() {
- return null;
- }
- };
+ StreamSpec input2 = new StreamSpec("shipmentStream", "ShipmentEvent", "kafka");
- StreamSpec output = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "FulfilledOrders");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
+ StreamSpec output = new StreamSpec("joinedOrderShipmentStream", "OrderShipmentJoinEvent", "kafka");
class OrderRecord implements MessageEnvelope<String, OrderRecord> {
String orderId;
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index f7d8bda..fcf67a7 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -29,11 +29,10 @@ import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
-import java.util.Properties;
/**
@@ -62,25 +61,9 @@ public class PageViewCounterExample implements StreamGraphBuilder {
standaloneEnv.run(new PageViewCounterExample(), config);
}
- StreamSpec input1 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewEvent");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
+ StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
- StreamSpec output = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewPerMember5min");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
+ StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
String pageId;
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 6994ac4..228668c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -27,11 +27,10 @@ import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
-import java.util.*;
/**
@@ -73,25 +72,9 @@ public class RepartitionExample implements StreamGraphBuilder {
standaloneEnv.run(new RepartitionExample(), config);
}
- StreamSpec input1 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewEvent");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
+ StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
- StreamSpec output = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewPerMember5min");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
+ StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
String pageId;
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
index d22324b..059afce 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -22,18 +22,16 @@ package org.apache.samza.example;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
import org.apache.samza.operators.data.InputMessageEnvelope;
import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
import org.apache.samza.operators.data.Offset;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStreamPartition;
import java.time.Duration;
import java.util.function.BiFunction;
-import java.util.Properties;
import java.util.Set;
@@ -70,15 +68,8 @@ public class TestBroadcastExample extends TestExampleBase {
public void init(StreamGraph graph, Config config) {
BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
inputs.keySet().forEach(entry -> {
- MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return entry;
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- }, null, null).map(this::getInputMessage);
+ MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(
+ new StreamSpec(entry.toString(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
.setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
index fe6e7e7..cc53814 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
@@ -22,19 +22,18 @@ package org.apache.samza.example;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
import org.apache.samza.operators.data.InputMessageEnvelope;
import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
import org.apache.samza.operators.data.Offset;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
import java.util.Set;
@@ -65,16 +64,9 @@ public class TestJoinExample extends TestExampleBase {
public void init(StreamGraph graph, Config config) {
for (SystemStream input : inputs.keySet()) {
+ StreamSpec inputStreamSpec = new StreamSpec(input.toString(), input.getStream(), input.getSystem());
MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
- new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return input;
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- }, null, null).map(this::getInputMessage);
+ inputStreamSpec, null, null).map(this::getInputMessage);
if (joinOutput == null) {
joinOutput = newSource;
} else {
@@ -82,15 +74,9 @@ public class TestJoinExample extends TestExampleBase {
}
}
- joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return null;
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- }, new StringSerde("UTF-8"), new JsonSerde<>()));
+ joinOutput.sendTo(graph.createOutStream(
+ new StreamSpec("joinOutput", "JoinOutputEvent", "kafka"),
+ new StringSerde("UTF-8"), new JsonSerde<>()));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
index e08ca20..73f4674 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -21,18 +21,16 @@ package org.apache.samza.example;
import org.apache.samza.config.Config;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
import org.apache.samza.operators.data.InputMessageEnvelope;
import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.data.Offset;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStreamPartition;
import java.time.Duration;
import java.util.function.BiFunction;
-import java.util.Properties;
import java.util.Set;
@@ -60,15 +58,8 @@ public class TestWindowExample extends TestExampleBase {
@Override
public void init(StreamGraph graph, Config config) {
BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
- inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return source;
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- }, null, null).
+ inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(
+ new StreamSpec(source.toString(), source.getStream(), source.getSystem()), null, null).
map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 010a210..0a873fd 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.*;
public class TestStreamOperatorImpl {
@Test
+ @SuppressWarnings("unchecked")
public void testSimpleOperator() {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);