You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/05/25 16:36:45 UTC
[02/10] samza git commit: SAMZA-1659: Serializable OperatorSpec
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
new file mode 100644
index 0000000..9ca4f35
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Example implementation of a task that splits its input into multiple output streams.
+ */
+public class BroadcastExample implements StreamApplication {
+
+ // local execution mode
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+
+ StreamApplication app = new BroadcastExample();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ KVSerde<String, PageViewEvent> pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
+ MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream("pageViewEventStream", pgeMsgSerde);
+
+ inputStream.filter(m -> m.key.equals("key1")).sendTo(graph.getOutputStream("outStream1", pgeMsgSerde));
+ inputStream.filter(m -> m.key.equals("key2")).sendTo(graph.getOutputStream("outStream2", pgeMsgSerde));
+ inputStream.filter(m -> m.key.equals("key3")).sendTo(graph.getOutputStream("outStream3", pgeMsgSerde));
+ }
+
+ class PageViewEvent {
+ String key;
+ long timestamp;
+
+ public PageViewEvent(String key, long timestamp) {
+ this.key = key;
+ this.timestamp = timestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
new file mode 100644
index 0000000..9edaabe
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.CommandLine;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Example code using {@link KeyValueStore} to implement event-time window
+ */
+public class KeyValueStoreExample implements StreamApplication {
+
+ // local execution mode
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ KeyValueStoreExample app = new KeyValueStoreExample();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<PageViewEvent> pageViewEvents =
+ graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
+ OutputStream<KV<String, StatsOutput>> pageViewEventPerMember =
+ graph.getOutputStream("pageViewEventPerMember",
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class)));
+
+ pageViewEvents
+ .partitionBy(pve -> pve.memberId, pve -> pve,
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
+ .map(KV::getValue)
+ .flatMap(new MyStatsCounter())
+ .map(stats -> KV.of(stats.memberId, stats))
+ .sendTo(pageViewEventPerMember);
+ }
+
+ static class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
+ private final int timeoutMs = 10 * 60 * 1000;
+
+ KeyValueStore<String, StatsWindowState> statsStore;
+
+ class StatsWindowState {
+ int lastCount = 0;
+ long timeAtLastOutput = 0;
+ int newCount = 0;
+ }
+
+ @Override
+ public Collection<StatsOutput> apply(PageViewEvent message) {
+ List<StatsOutput> outputStats = new ArrayList<>();
+ long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.timestamp) / 5) * 5;
+ String wndKey = String.format("%s-%d", message.memberId, wndTimestamp);
+ StatsWindowState curState = this.statsStore.get(wndKey);
+ if (curState == null) {
+ curState = new StatsWindowState();
+ }
+ curState.newCount++;
+ long curTimeMs = System.currentTimeMillis();
+ if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) {
+ curState.timeAtLastOutput = curTimeMs;
+ curState.lastCount += curState.newCount;
+ curState.newCount = 0;
+ outputStats.add(new StatsOutput(message.memberId, wndTimestamp, curState.lastCount));
+ }
+ // update counter w/o generating output
+ this.statsStore.put(wndKey, curState);
+ return outputStats;
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store");
+ }
+ }
+
+ class PageViewEvent {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+ }
+
+ static class StatsOutput {
+ private String memberId;
+ private long timestamp;
+ private Integer count;
+
+ StatsOutput(String key, long timestamp, Integer count) {
+ this.memberId = key;
+ this.timestamp = timestamp;
+ this.count = count;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
new file mode 100644
index 0000000..ff983a4
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+public class MergeExample implements StreamApplication {
+
+ // local execution mode
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ MergeExample app = new MergeExample();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ KVSerde<String, PageViewEvent>
+ pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
+
+ MessageStream.mergeAll(ImmutableList.of(graph.getInputStream("viewStream1", pgeMsgSerde),
+ graph.getInputStream("viewStream2", pgeMsgSerde), graph.getInputStream("viewStream3", pgeMsgSerde)))
+ .sendTo(graph.getOutputStream("mergedStream", pgeMsgSerde));
+
+ }
+
+ class PageViewEvent {
+ String pageId;
+ long viewTimestamp;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
new file mode 100644
index 0000000..1c0bc25
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+
+
+/**
+ * Simple 2-way stream-to-stream join example
+ */
+public class OrderShipmentJoinExample implements StreamApplication {
+
+ // local execution mode
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ OrderShipmentJoinExample app = new OrderShipmentJoinExample();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<OrderRecord> orders =
+ graph.getInputStream("orders", new JsonSerdeV2<>(OrderRecord.class));
+ MessageStream<ShipmentRecord> shipments =
+ graph.getInputStream("shipments", new JsonSerdeV2<>(ShipmentRecord.class));
+ OutputStream<KV<String, FulfilledOrderRecord>> fulfilledOrders =
+ graph.getOutputStream("fulfilledOrders",
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
+
+ orders
+ .join(shipments, new MyJoinFunction(),
+ new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
+ Duration.ofMinutes(1), "join")
+ .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
+ .sendTo(fulfilledOrders);
+
+ }
+
+ static class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulfilledOrderRecord> {
+ @Override
+ public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+ return new FulfilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs);
+ }
+
+ @Override
+ public String getFirstKey(OrderRecord message) {
+ return message.orderId;
+ }
+
+ @Override
+ public String getSecondKey(ShipmentRecord message) {
+ return message.orderId;
+ }
+ }
+
+ class OrderRecord {
+ String orderId;
+ long orderTimeMs;
+
+ OrderRecord(String orderId, long timeMs) {
+ this.orderId = orderId;
+ this.orderTimeMs = timeMs;
+ }
+ }
+
+ class ShipmentRecord {
+ String orderId;
+ long shipTimeMs;
+
+ ShipmentRecord(String orderId, long timeMs) {
+ this.orderId = orderId;
+ this.shipTimeMs = timeMs;
+ }
+ }
+
+ static class FulfilledOrderRecord {
+ String orderId;
+ long orderTimeMs;
+ long shipTimeMs;
+
+ FulfilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
+ this.orderId = orderId;
+ this.orderTimeMs = orderTimeMs;
+ this.shipTimeMs = shipTimeMs;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
new file mode 100644
index 0000000..2581506
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.time.Duration;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Example code to implement window-based counter
+ */
+public class PageViewCounterExample implements StreamApplication {
+
+ // local execution mode
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ PageViewCounterExample app = new PageViewCounterExample();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<PageViewEvent> pageViewEvents = null;
+ pageViewEvents = graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
+ OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
+ graph.getOutputStream("pageViewEventPerMemberStream",
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
+
+ SupplierFunction<Integer> initialValue = () -> 0;
+ FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
+ pageViewEvents
+ .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null)
+ .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
+ .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow")
+ .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane)))
+ .sendTo(pageViewEventPerMemberStream);
+
+ }
+
+ class PageViewEvent {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+ }
+
+ static class PageViewCount {
+ String memberId;
+ long timestamp;
+ int count;
+
+ PageViewCount(WindowPane<String, Integer> m) {
+ this.memberId = m.getKey().getKey();
+ this.timestamp = Long.valueOf(m.getKey().getPaneId());
+ this.count = m.getMessage();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
new file mode 100644
index 0000000..7f28346
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+
+
+/**
+ * Example {@link StreamApplication} code to test the API methods with re-partition operator
+ */
+public class RepartitionExample implements StreamApplication {
+
+ // local execution mode
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ RepartitionExample app = new RepartitionExample();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ MessageStream<PageViewEvent> pageViewEvents =
+ graph.getInputStream("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
+ OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember =
+ graph.getOutputStream("pageViewEventPerMember",
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class)));
+
+ pageViewEvents
+ .partitionBy(pve -> pve.memberId, pve -> pve,
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
+ .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null),
+ "window")
+ .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane)))
+ .sendTo(pageViewEventPerMember);
+
+ }
+
+ class PageViewEvent {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+ }
+
+ static class MyStreamOutput {
+ String memberId;
+ long timestamp;
+ int count;
+
+ MyStreamOutput(WindowPane<String, Integer> m) {
+ this.memberId = m.getKey().getKey();
+ this.timestamp = Long.valueOf(m.getKey().getPaneId());
+ this.count = m.getMessage();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
new file mode 100644
index 0000000..4950695
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+
+
+/**
+ * Example implementation of a simple user-defined task w/ a window operator.
+ *
+ */
+public class WindowExample implements StreamApplication {
+
+ // local execution mode
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ WindowExample app = new WindowExample();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ SupplierFunction<Integer> initialValue = () -> 0;
+ FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
+ MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", new JsonSerdeV2<PageViewEvent>());
+ OutputStream<Integer> outputStream = graph.getOutputStream("outputStream", new IntegerSerde());
+
+ // create a tumbling window that outputs the number of message collected every 10 minutes.
+ // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
+ // for 1 minute.
+ inputStream
+ .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, new IntegerSerde())
+ .setLateTrigger(Triggers.any(Triggers.count(30000),
+ Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window")
+ .map(WindowPane::getMessage)
+ .sendTo(outputStream);
+
+ }
+
+ class PageViewEvent {
+ String key;
+ long timestamp;
+
+ public PageViewEvent(String key, long timestamp) {
+ this.key = key;
+ this.timestamp = timestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
index de0d962..a1ac299 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
@@ -20,6 +20,8 @@
package org.apache.samza.test.framework;
import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.SinkFunction;
@@ -155,6 +157,18 @@ public class StreamAssert<M> {
}
}
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ timer = new Timer();
+ actual = Collections.synchronizedList(new ArrayList<>());
+ timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ check();
+ }
+ };
+ }
+
private void check() {
final CountDownLatch latch = LATCHES.get(id);
try {
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 29c509d..3301af8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -40,11 +40,10 @@ import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
import org.apache.samza.test.util.ArraySystemFactory;
import org.apache.samza.test.util.Base64Serializer;
-import org.junit.Test;
+import org.junit.Test;
import static org.junit.Assert.assertEquals;
-
/**
* This test uses an array as a bounded input source, and does a partitionBy() and sink() after reading the input.
* It verifies the pipeline will stop and the number of output messages should equal to the input.
@@ -53,6 +52,8 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"};
+ private static List<PageView> received = new ArrayList<>();
+
@Test
public void testPipeline() throws Exception {
Random random = new Random();
@@ -66,6 +67,7 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
int partitionCount = 4;
Map<String, String> configs = new HashMap<>();
+ configs.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
configs.put("streams.PageView.samza.system", "test");
configs.put("streams.PageView.source", Base64Serializer.serialize(pageviews));
@@ -89,7 +91,6 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
- List<PageView> received = new ArrayList<>();
final StreamApplication app = (streamGraph, cfg) -> {
streamGraph.<KV<String, PageView>>getInputStream("PageView")
.map(Values.create())
@@ -98,6 +99,7 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
received.add(m.getValue());
});
};
+
runner.run(app);
runner.waitForFinish();
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index dda3d24..d4dc4ed 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -118,6 +118,7 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
@Test
public void testWatermark() throws Exception {
Map<String, String> configs = new HashMap<>();
+ configs.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
configs.put("systems.test.samza.factory", TestSystemFactory.class.getName());
configs.put("streams.PageView.samza.system", "test");
configs.put("streams.PageView.partitionCount", String.valueOf(PARTITION_COUNT));
@@ -140,7 +141,6 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
configs.put("serializers.registry.string.class", StringSerdeFactory.class.getName());
configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
- final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
List<PageView> received = new ArrayList<>();
final StreamApplication app = (streamGraph, cfg) -> {
streamGraph.<KV<String, PageView>>getInputStream("PageView")
@@ -150,11 +150,14 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
received.add(m.getValue());
});
};
+
+ LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
runner.run(app);
+ // processors are only available when the app is running
Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs(runner);
runner.waitForFinish();
-
+ // wait for the completion to ensure that all tasks are actually initialized and the OperatorImplGraph is initialized
StreamOperatorTask task0 = tasks.get("Partition 0");
OperatorImplGraph graph = TestStreamOperatorTask.getOperatorImplGraph(task0);
OperatorImpl pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY);
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index 120f902..2171d07 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -19,17 +19,22 @@
package org.apache.samza.test.operator;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.test.operator.data.AdClick;
@@ -37,6 +42,8 @@ import org.apache.samza.test.operator.data.PageView;
import org.apache.samza.test.operator.data.UserPageAdClick;
import java.time.Duration;
+import org.apache.samza.util.CommandLine;
+
/**
* A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count.
@@ -47,6 +54,19 @@ public class RepartitionJoinWindowApp implements StreamApplication {
public static final String INPUT_TOPIC_NAME_2_PROP = "inputTopicName2";
public static final String OUTPUT_TOPIC_NAME_PROP = "outputTopicName";
+ private final List<StreamSpec> intermediateStreams = new ArrayList<>();
+
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+
+ RepartitionJoinWindowApp application = new RepartitionJoinWindowApp();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(application);
+ runner.waitForFinish();
+ }
+
@Override
public void init(StreamGraph graph, Config config) {
String inputTopicName1 = config.get(INPUT_TOPIC_NAME_1_PROP);
@@ -56,25 +76,27 @@ public class RepartitionJoinWindowApp implements StreamApplication {
MessageStream<PageView> pageViews = graph.getInputStream(inputTopicName1, new JsonSerdeV2<>(PageView.class));
MessageStream<AdClick> adClicks = graph.getInputStream(inputTopicName2, new JsonSerdeV2<>(AdClick.class));
- MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews
+ MessageStream<KV<String, PageView>> pageViewsRepartitionedByViewId = pageViews
.partitionBy(PageView::getViewId, pv -> pv,
- new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)), "pageViewsByViewId")
- .map(KV::getValue);
+ new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)), "pageViewsByViewId");
+
+ MessageStream<PageView> pageViewsRepartitionedByViewIdValueONly = pageViewsRepartitionedByViewId.map(KV::getValue);
- MessageStream<AdClick> adClicksRepartitionedByViewId = adClicks
+ MessageStream<KV<String, AdClick>> adClicksRepartitionedByViewId = adClicks
.partitionBy(AdClick::getViewId, ac -> ac,
- new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class)), "adClicksByViewId")
- .map(KV::getValue);
+ new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class)), "adClicksByViewId");
+ MessageStream<AdClick> adClicksRepartitionedByViewIdValueOnly = adClicksRepartitionedByViewId.map(KV::getValue);
- MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewId
- .join(adClicksRepartitionedByViewId, new UserPageViewAdClicksJoiner(),
+ MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewIdValueONly
+ .join(adClicksRepartitionedByViewIdValueOnly, new UserPageViewAdClicksJoiner(),
new StringSerde(), new JsonSerdeV2<>(PageView.class), new JsonSerdeV2<>(AdClick.class),
Duration.ofMinutes(1), "pageViewAdClickJoin");
- userPageAdClicks
+ MessageStream<KV<String, UserPageAdClick>> userPageAdClicksByUserId = userPageAdClicks
.partitionBy(UserPageAdClick::getUserId, upac -> upac,
- KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userPageAdClicksByUserId")
- .map(KV::getValue)
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userPageAdClicksByUserId");
+
+ userPageAdClicksByUserId.map(KV::getValue)
.window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3),
new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userAdClickWindow")
.map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
@@ -82,6 +104,16 @@ public class RepartitionJoinWindowApp implements StreamApplication {
taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue()));
});
+
+
+ intermediateStreams.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamSpec());
+ intermediateStreams.add(((IntermediateMessageStreamImpl) adClicksRepartitionedByViewId).getStreamSpec());
+ intermediateStreams.add(((IntermediateMessageStreamImpl) userPageAdClicksByUserId).getStreamSpec());
+
+ }
+
+ public List<StreamSpec> getIntermediateStreams() {
+ return intermediateStreams;
}
private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> {
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
new file mode 100644
index 0000000..e233793
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.operator;
+
+import java.time.Duration;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.PageView;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link StreamApplication} that demonstrates a repartition followed by a windowed count.
+ */
+public class RepartitionWindowApp implements StreamApplication {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RepartitionWindowApp.class);
+
+ static final String INPUT_TOPIC = "page-views";
+ static final String OUTPUT_TOPIC = "Result";
+
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ RepartitionWindowApp reparApp = new RepartitionWindowApp();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(reparApp);
+ runner.waitForFinish();
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ KVSerde<String, PageView>
+ pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class));
+
+ graph.getInputStream(INPUT_TOPIC, pgeMsgSerde)
+ .map(KV::getValue)
+ .partitionBy(PageView::getUserId, m -> m, pgeMsgSerde, "p1")
+ .window(Windows.keyedSessionWindow(m -> m.getKey(), Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new IntegerSerde()), "w1")
+ .map(wp -> KV.of(wp.getKey().getKey().toString(), String.valueOf(wp.getMessage())))
+ .sendTo(graph.getOutputStream(OUTPUT_TOPIC));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 997127e..3224d24 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -19,6 +19,7 @@
package org.apache.samza.test.operator;
+import java.time.Duration;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
@@ -26,13 +27,15 @@ import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.test.operator.data.PageView;
-
-import java.time.Duration;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link StreamApplication} that demonstrates a filter followed by a session window.
@@ -40,10 +43,23 @@ import java.time.Duration;
public class SessionWindowApp implements StreamApplication {
private static final String INPUT_TOPIC = "page-views";
private static final String OUTPUT_TOPIC = "page-view-counts";
+
+ private static final Logger LOG = LoggerFactory.getLogger(SessionWindowApp.class);
private static final String FILTER_KEY = "badKey";
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ SessionWindowApp app = new SessionWindowApp();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
@Override
public void init(StreamGraph graph, Config config) {
+
MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
OutputStream<KV<String, Integer>> outputStream =
graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new IntegerSerde()));
@@ -54,5 +70,6 @@ public class SessionWindowApp implements StreamApplication {
new StringSerde(), new JsonSerdeV2<>(PageView.class)), "sessionWindow")
.map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
.sendTo(outputStream);
+
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index 77cd19a..5424888 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -122,7 +122,7 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
// Verify that messages in the intermediate stream will be deleted in 10 seconds
long startTimeMs = System.currentTimeMillis();
- for (StreamSpec spec: runner.getExecutionPlan(app).getIntermediateStreams()) {
+ for (StreamSpec spec: app.getIntermediateStreams()) {
long remainingMessageNum = -1;
while (remainingMessageNum != 0 && System.currentTimeMillis() - startTimeMs < 10000) {
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
new file mode 100644
index 0000000..fbc315f
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.test.operator.data.PageView;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.samza.test.operator.RepartitionWindowApp.*;
+
+/**
+ * Test driver for {@link RepartitionWindowApp}.
+ */
+public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHarness {
+
+ static final String APP_NAME = "PageViewCounterApp";
+
+ @Test
+ public void testRepartitionedSessionWindowCounter() throws Exception {
+ // create topics
+ createTopic(INPUT_TOPIC, 3);
+ createTopic(OUTPUT_TOPIC, 1);
+
+ // produce messages to different partitions.
+ ObjectMapper mapper = new ObjectMapper();
+ PageView pv = new PageView("india", "5.com", "userId1");
+ produceMessage(INPUT_TOPIC, 0, "userId1", mapper.writeValueAsString(pv));
+ pv = new PageView("china", "4.com", "userId2");
+ produceMessage(INPUT_TOPIC, 1, "userId2", mapper.writeValueAsString(pv));
+ pv = new PageView("india", "1.com", "userId1");
+ produceMessage(INPUT_TOPIC, 2, "userId1", mapper.writeValueAsString(pv));
+ pv = new PageView("india", "2.com", "userId1");
+ produceMessage(INPUT_TOPIC, 0, "userId1", mapper.writeValueAsString(pv));
+ pv = new PageView("india", "3.com", "userId1");
+ produceMessage(INPUT_TOPIC, 1, "userId1", mapper.writeValueAsString(pv));
+
+ Map<String, String> configs = new HashMap<>();
+ configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+ configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
+ configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+ configs.put(String.format("streams.%s.samza.msg.serde", INPUT_TOPIC), "string");
+ configs.put(String.format("streams.%s.samza.key.serde", INPUT_TOPIC), "string");
+
+ // run the application
+ runApplication(new RepartitionWindowApp(), APP_NAME, new MapConfig(configs));
+
+ // consume and validate result
+ List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
+ Assert.assertEquals(messages.size(), 2);
+
+ for (ConsumerRecord<String, String> message : messages) {
+ String key = message.key();
+ String value = message.value();
+ // Assert that there are 4 messages for userId1 and 1 message for userId2.
+ Assert.assertTrue(key.equals("userId1") || key.equals("userId2"));
+ if ("userId1".equals(key)) {
+ Assert.assertEquals(value, "4");
+ } else {
+ Assert.assertEquals(value, "1");
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index 5d2a17c..40a3f91 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -19,6 +19,7 @@
package org.apache.samza.test.operator;
+import java.time.Duration;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
@@ -26,13 +27,16 @@ import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.test.operator.data.PageView;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.time.Duration;
/**
* A {@link StreamApplication} that demonstrates a filter followed by a tumbling window.
@@ -40,10 +44,23 @@ import java.time.Duration;
public class TumblingWindowApp implements StreamApplication {
private static final String INPUT_TOPIC = "page-views";
private static final String OUTPUT_TOPIC = "page-view-counts";
+
+ private static final Logger LOG = LoggerFactory.getLogger(TumblingWindowApp.class);
private static final String FILTER_KEY = "badKey";
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ TumblingWindowApp app = new TumblingWindowApp();
+ LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+ runner.run(app);
+ runner.waitForFinish();
+ }
+
@Override
public void init(StreamGraph graph, Config config) {
+
MessageStream<PageView> pageViews =
graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
OutputStream<KV<String, Integer>> outputStream =
@@ -55,5 +72,6 @@ public class TumblingWindowApp implements StreamApplication {
new StringSerde(), new JsonSerdeV2<>(PageView.class)), "tumblingWindow")
.map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
.sendTo(outputStream);
+
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
index b114b43..e950366 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
@@ -19,9 +19,10 @@
package org.apache.samza.test.operator.data;
+import java.io.Serializable;
import org.codehaus.jackson.annotate.JsonProperty;
-public class PageView {
+public class PageView implements Serializable {
private String viewId;
private String pageId;
private String userId;
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java b/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java
new file mode 100644
index 0000000..9072bd2
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Shared context factories used in unit tests. This is a temporarily solution to enable sharing of test latches in different
+ * scope of context (i.e. in the container or the task). This is not intended for production usage.
+ */
+public class SharedContextFactories {
+
+ public static class SharedContextFactory {
+ static Map<String, SharedContextFactory> sharedContextFactories = new HashMap<>();
+
+ Map<String, Object> sharedObjects = new HashMap<>();
+
+ public Object getSharedObject(String resourceName) {
+ return this.sharedObjects.get(resourceName);
+ }
+
+ public void addSharedObject(String resourceName, Object sharedObj) {
+ this.sharedObjects.put(resourceName, sharedObj);
+ }
+
+ public static SharedContextFactory getInstance(String taskName) {
+ if (sharedContextFactories.get(taskName) == null) {
+ sharedContextFactories.putIfAbsent(taskName, new SharedContextFactory());
+ }
+ return sharedContextFactories.get(taskName);
+ }
+
+ public static void clearAll() {
+ sharedContextFactories.clear();
+ }
+ }
+
+ public static class ProcessorSharedContextFactory extends SharedContextFactory {
+ static Map<String, ProcessorSharedContextFactory> processorSharedFactories = new HashMap<>();
+
+ private final String processorId;
+
+ SharedContextFactory getTaskSharedContextFactory(String taskName) {
+ String globalTaskName = String.format("%s-%s", this.processorId, taskName);
+ return SharedContextFactory.getInstance(globalTaskName);
+ }
+
+ public static ProcessorSharedContextFactory getInstance(String processorId) {
+ if (processorSharedFactories.get(processorId) == null) {
+ processorSharedFactories.putIfAbsent(processorId, new ProcessorSharedContextFactory(processorId));
+ }
+ return processorSharedFactories.get(processorId);
+ }
+
+ ProcessorSharedContextFactory(String processorId) {
+ this.processorId = processorId;
+ }
+
+ public static void clearAll() {
+ processorSharedFactories.clear();
+ }
+ }
+
+ public static class GlobalSharedContextFactory extends SharedContextFactory {
+ static Map<String, GlobalSharedContextFactory> globalSharedContextFactories = new HashMap<>();
+
+ private final String appName;
+
+ GlobalSharedContextFactory(String appName) {
+ this.appName = appName;
+ }
+
+ ProcessorSharedContextFactory getProcessorSharedContextFactory(String processorName) {
+ String globalProcessorName = String.format("%s-%s", this.appName, processorName);
+ return ProcessorSharedContextFactory.getInstance(globalProcessorName);
+ }
+
+ public static GlobalSharedContextFactory getInstance(String appName) {
+ if (globalSharedContextFactories.get(appName) == null) {
+ globalSharedContextFactories.putIfAbsent(appName, new GlobalSharedContextFactory(appName));
+ }
+ return globalSharedContextFactories.get(appName);
+ }
+
+ public static void clearAll() {
+ globalSharedContextFactories.clear();
+ }
+ }
+
+ public static GlobalSharedContextFactory getGlobalSharedContextFactory(String appName) {
+ return GlobalSharedContextFactory.getInstance(appName);
+ }
+
+ public static void clearAll() {
+ GlobalSharedContextFactory.clearAll();
+ ProcessorSharedContextFactory.clearAll();
+ SharedContextFactory.clearAll();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
new file mode 100644
index 0000000..db12351
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.processor;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
+
+
+/**
+ * Test class to create an {@link StreamApplication} instance
+ */
+public class TestStreamApplication implements StreamApplication, Serializable {
+
+ private final String inputTopic;
+ private final String outputTopic;
+ private final String appName;
+ private final String processorName;
+
+ private TestStreamApplication(String inputTopic, String outputTopic, String appName, String processorName) {
+ this.inputTopic = inputTopic;
+ this.outputTopic = outputTopic;
+ this.appName = appName;
+ this.processorName = processorName;
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<String> inputStream = graph.getInputStream(inputTopic, new NoOpSerde<String>());
+ OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new StringSerde());
+ inputStream.map(new MapFunction<String, String>() {
+ transient CountDownLatch latch1;
+ transient CountDownLatch latch2;
+ transient StreamApplicationCallback callback;
+
+ @Override
+ public String apply(String message) {
+ TestKafkaEvent incomingMessage = TestKafkaEvent.fromString(message);
+ if (callback != null) {
+ callback.onMessage(incomingMessage);
+ }
+ if (latch1 != null) {
+ latch1.countDown();
+ }
+ if (latch2 != null) {
+ latch2.countDown();
+ }
+ return incomingMessage.toString();
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ SharedContextFactories.SharedContextFactory contextFactory =
+ SharedContextFactories.getGlobalSharedContextFactory(appName).getProcessorSharedContextFactory(processorName);
+ this.latch1 = (CountDownLatch) contextFactory.getSharedObject("processedMsgLatch");
+ this.latch2 = (CountDownLatch) contextFactory.getSharedObject("kafkaMsgsConsumedLatch");
+ this.callback = (StreamApplicationCallback) contextFactory.getSharedObject("callback");
+ }
+ }).sendTo(outputStream);
+ }
+
+ public interface StreamApplicationCallback {
+ void onMessage(TestKafkaEvent m);
+ }
+
+ public static class TestKafkaEvent implements Serializable {
+
+ // Actual content of the event.
+ private String eventData;
+
+ // Contains Integer value, which is greater than previous message id.
+ private String eventId;
+
+ TestKafkaEvent(String eventId, String eventData) {
+ this.eventData = eventData;
+ this.eventId = eventId;
+ }
+
+ String getEventId() {
+ return eventId;
+ }
+
+ String getEventData() {
+ return eventData;
+ }
+
+ @Override
+ public String toString() {
+ return eventId + "|" + eventData;
+ }
+
+ static TestKafkaEvent fromString(String message) {
+ String[] messageComponents = message.split("|");
+ return new TestKafkaEvent(messageComponents[0], messageComponents[1]);
+ }
+ }
+
+ public static StreamApplication getInstance(
+ String inputTopic,
+ String outputTopic,
+ CountDownLatch processedMessageLatch,
+ StreamApplicationCallback callback,
+ CountDownLatch kafkaEventsConsumedLatch,
+ Config config) {
+ String appName = String.format("%s-%s", config.get(ApplicationConfig.APP_NAME), config.get(ApplicationConfig.APP_ID));
+ String processorName = config.get(JobConfig.PROCESSOR_ID());
+ registerLatches(processedMessageLatch, kafkaEventsConsumedLatch, callback, appName, processorName);
+
+ StreamApplication app = new TestStreamApplication(inputTopic, outputTopic, appName, processorName);
+ return app;
+ }
+
+ private static void registerLatches(CountDownLatch processedMessageLatch, CountDownLatch kafkaEventsConsumedLatch,
+ StreamApplicationCallback callback, String appName, String processorName) {
+ SharedContextFactories.SharedContextFactory contextFactory = SharedContextFactories.getGlobalSharedContextFactory(appName).getProcessorSharedContextFactory(processorName);
+ contextFactory.addSharedObject("processedMsgLatch", processedMessageLatch);
+ contextFactory.addSharedObject("kafkaMsgsConsumedLatch", kafkaEventsConsumedLatch);
+ contextFactory.addSharedObject("callback", callback);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 9d2cd92..0b0a271 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -23,6 +23,14 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import kafka.admin.AdminUtils;
import kafka.utils.TestUtils;
import org.I0Itec.zkclient.ZkClient;
@@ -42,33 +50,23 @@ import org.apache.samza.container.TaskName;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.StringSerde;
import org.apache.samza.test.StandaloneIntegrationTestHarness;
import org.apache.samza.test.StandaloneTestUtils;
+import org.apache.samza.test.processor.TestStreamApplication.StreamApplicationCallback;
+import org.apache.samza.test.processor.TestStreamApplication.TestKafkaEvent;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkKeyBuilder;
import org.apache.samza.zk.ZkUtils;
-import org.junit.*;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -102,9 +100,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
private ApplicationConfig applicationConfig1;
private ApplicationConfig applicationConfig2;
private ApplicationConfig applicationConfig3;
- private LocalApplicationRunner applicationRunner1;
- private LocalApplicationRunner applicationRunner2;
- private LocalApplicationRunner applicationRunner3;
private String testStreamAppName;
private String testStreamAppId;
@@ -141,11 +136,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
zkUtils.connect();
- // Create local application runners.
- applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
- applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
- applicationRunner3 = new LocalApplicationRunner(applicationConfig3);
-
for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) {
LOGGER.info("Creating kafka topic: {}.", kafkaTopic);
TestUtils.createTopic(zkUtils(), kafkaTopic, 5, 1, servers(), new Properties());
@@ -251,9 +241,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
// Set up stream app 2.
CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
- LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig));
- StreamApplication streamApp2 = new TestStreamApplication(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic,
- processedMessagesLatch, null, null);
+ Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig);
+ LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(testAppConfig2);
+ StreamApplication streamApp2 = TestStreamApplication.getInstance(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic,
+ processedMessagesLatch, null, null, testAppConfig2);
// Callback handler for streamApp1.
StreamApplicationCallback streamApplicationCallback = message -> {
@@ -272,9 +263,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2);
// Set up stream app 1.
- LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig));
- StreamApplication streamApp1 = new TestStreamApplication(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic,
- null, streamApplicationCallback, kafkaEventsConsumedLatch);
+ Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig);
+ LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(testAppConfig1);
+ StreamApplication streamApp1 = TestStreamApplication.getInstance(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic,
+ null, streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1);
localApplicationRunner1.run(streamApp1);
kafkaEventsConsumedLatch.await();
@@ -288,6 +280,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
// TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
// ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value.
assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
+
+ // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
+ // localApplicationRunner1.kill(streamApp1);
+ // localApplicationRunner2.kill(streamApp2);
+
+ // localApplicationRunner1.waitForFinish();
+ // localApplicationRunner2.waitForFinish();
}
/**
@@ -326,8 +325,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
// Set up streamApp2.
CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2);
- LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig));
- StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null);
+ Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig);
+ LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(testAppConfig2);
+ StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null, testAppConfig2);
// Callback handler for streamApp1.
StreamApplicationCallback streamApplicationCallback = message -> {
@@ -348,9 +348,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2 + 1);
// Set up stream app 1.
- LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig));
- StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null,
- streamApplicationCallback, kafkaEventsConsumedLatch);
+ Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig);
+ LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(testAppConfig1);
+ StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null,
+ streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1);
localApplicationRunner1.run(streamApp1);
kafkaEventsConsumedLatch.await();
@@ -381,8 +382,16 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
assertEquals(taskModel1.getSystemStreamPartitions(), taskModel2.getSystemStreamPartitions());
assertTrue(!taskModel1.getTaskName().getTaskName().equals(taskModel2.getTaskName().getTaskName()));
- // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
processedMessagesLatch.await();
+
+ assertEquals(ApplicationStatus.Running, localApplicationRunner2.status(streamApp2));
+
+ // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
+ // localApplicationRunner1.kill(streamApp1);
+ // localApplicationRunner2.kill(streamApp2);
+
+ // localApplicationRunner1.waitForFinish();
+ // localApplicationRunner2.waitForFinish();
}
@Test
@@ -396,8 +405,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
- StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
- StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
+ StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
+ StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
+ StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3);
+
+ // Create LocalApplicationRunners
+ LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
+ LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
// Run stream applications.
applicationRunner1.run(streamApp1);
@@ -428,7 +442,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
kafkaEventsConsumedLatch.await();
publishKafkaEvents(inputKafkaTopic, 0, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
- StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch);
+ LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig3);
applicationRunner3.run(streamApp3);
processedMessagesLatch3.await();
@@ -441,6 +455,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
jobModel = zkUtils.getJobModel(jobModelVersion);
assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet());
assertEquals(2, jobModel.getContainers().size());
+
+ // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
+ // applicationRunner2.kill(streamApp2);
+ // applicationRunner3.kill(streamApp3);
+
+ // applicationRunner2.waitForFinish();
+ // applicationRunner3.waitForFinish();
}
@Test
@@ -453,8 +474,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
- StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
- StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
+ StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
+ StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
+
+ // Create LocalApplicationRunners
+ LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
+ LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
// Run stream applications.
applicationRunner1.run(streamApp1);
@@ -464,15 +489,24 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
processedMessagesLatch1.await();
processedMessagesLatch2.await();
- LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(new MapConfig(applicationConfig2));
+ LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig2);
// Create a stream app with same processor id as SP2 and run it. It should fail.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]);
kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
- StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch);
+ StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch, applicationConfig2);
// Fail when the duplicate processor joins.
expectedException.expect(SamzaException.class);
- applicationRunner3.run(streamApp3);
+ try {
+ applicationRunner3.run(streamApp3);
+ } finally {
+ // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
+ // applicationRunner1.kill(streamApp1);
+ // applicationRunner2.kill(streamApp2);
+
+ // applicationRunner1.waitForFinish();
+ // applicationRunner2.waitForFinish();
+ }
}
@Test
@@ -496,13 +530,16 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
+ List<TestKafkaEvent> messagesProcessed = new ArrayList<>();
+ StreamApplicationCallback streamApplicationCallback = m -> messagesProcessed.add(m);
+
// Create StreamApplication from configuration.
CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
- StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
- StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
+ StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1);
+ StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
// Run stream application.
applicationRunner1.run(streamApp1);
@@ -521,7 +558,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1);
processedMessagesLatch1 = new CountDownLatch(1);
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
- streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
+ streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1);
applicationRunner4.run(streamApp1);
processedMessagesLatch1.await();
@@ -532,85 +569,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
assertEquals(Integer.parseInt(jobModelVersion) + 1, Integer.parseInt(newJobModelVersion));
assertEquals(jobModel.getContainers(), newJobModel.getContainers());
- }
-
- public interface StreamApplicationCallback {
- void onMessageReceived(TestKafkaEvent message);
- }
-
- private static class TestKafkaEvent implements Serializable {
-
- // Actual content of the event.
- private String eventData;
-
- // Contains Integer value, which is greater than previous message id.
- private String eventId;
- TestKafkaEvent(String eventId, String eventData) {
- this.eventData = eventData;
- this.eventId = eventId;
- }
-
- String getEventId() {
- return eventId;
- }
-
- String getEventData() {
- return eventData;
- }
-
- @Override
- public String toString() {
- return eventId + "|" + eventData;
- }
+ // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
+ // applicationRunner2.kill(streamApp2);
+ // applicationRunner4.kill(streamApp1);
- static TestKafkaEvent fromString(String message) {
- String[] messageComponents = message.split("\\|");
- return new TestKafkaEvent(messageComponents[0], messageComponents[1]);
- }
+ // applicationRunner2.waitForFinish();
+ // applicationRunner4.waitForFinish();
}
- /**
- * Publishes all input events to output topic(has no processing logic)
- * and triggers {@link StreamApplicationCallback} with each received event.
- **/
- private static class TestStreamApplication implements StreamApplication {
-
- private final String inputTopic;
- private final String outputTopic;
- private final CountDownLatch processedMessagesLatch;
- private final StreamApplicationCallback streamApplicationCallback;
- private final CountDownLatch kafkaEventsConsumedLatch;
-
- TestStreamApplication(String inputTopic, String outputTopic,
- CountDownLatch processedMessagesLatch,
- StreamApplicationCallback streamApplicationCallback, CountDownLatch kafkaEventsConsumedLatch) {
- this.inputTopic = inputTopic;
- this.outputTopic = outputTopic;
- this.processedMessagesLatch = processedMessagesLatch;
- this.streamApplicationCallback = streamApplicationCallback;
- this.kafkaEventsConsumedLatch = kafkaEventsConsumedLatch;
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- MessageStream<String> inputStream = graph.getInputStream(inputTopic, new NoOpSerde<String>());
- OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new StringSerde());
- inputStream
- .map(msg -> {
- TestKafkaEvent incomingMessage = TestKafkaEvent.fromString((String) msg);
- if (streamApplicationCallback != null) {
- streamApplicationCallback.onMessageReceived(incomingMessage);
- }
- if (processedMessagesLatch != null) {
- processedMessagesLatch.countDown();
- }
- if (kafkaEventsConsumedLatch != null) {
- kafkaEventsConsumedLatch.countDown();
- }
- return incomingMessage.toString();
- })
- .sendTo(outputStream);
- }
- }
}