You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:05 UTC
[06/29] samza git commit: SAMZA-1890: Fix the creation of MapFunction
in TestExecutionPlanner.
SAMZA-1890: Fix the creation of MapFunction in TestExecutionPlanner.
dnishimura Kindly take a look.
Author: Pawas Chhokra <pc...@pchhokra-mn3.linkedin.biz>
Reviewers: Sanil Jain <sn...@linkedin.com>, Daniel Nishimura <dn...@linkedin.com>
Closes #648 from PawasChhokra/TestExecutionPlanner
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c48bcd2e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c48bcd2e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c48bcd2e
Branch: refs/heads/NewKafkaSystemConsumer
Commit: c48bcd2e1d6892b236b232d8dc569d2388751a4c
Parents: 1755268
Author: Pawas Chhokra <pc...@linkedin.com>
Authored: Wed Sep 19 16:34:40 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Wed Sep 19 16:34:40 2018 -0700
----------------------------------------------------------------------
.../org/apache/samza/execution/TestExecutionPlanner.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c48bcd2e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index c089225..ad6b386 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -178,12 +178,12 @@ public class TestExecutionPlanner {
OutputStream<KV<Object, Object>> output2 = appDesc.getOutputStream(output2Descriptor);
messageStream1.map(m -> m)
- .filter(m -> true)
- .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1");
+ .filter(m -> true)
+ .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), (Serde<KV<Object, Object>>) mock(Serde.class), (Serde<KV<Object, Object>>) mock(Serde.class)), "w1");
messageStream2.map(m -> m)
- .filter(m -> true)
- .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2");
+ .filter(m -> true)
+ .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), (Serde<KV<Object, Object>>) mock(Serde.class), (Serde<KV<Object, Object>>) mock(Serde.class)), "w2");
messageStream1.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1").sendTo(output1);