You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Tyson Hamilton (Jira)" <ji...@apache.org> on 2021/03/15 23:01:00 UTC

[jira] [Updated] (BEAM-10589) Samza ValidatesRunner failure: testParDoWithSideInputsIsCumulative

     [ https://issues.apache.org/jira/browse/BEAM-10589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tyson Hamilton updated BEAM-10589:
----------------------------------
    Status: Open  (was: Triage Needed)

> Samza ValidatesRunner failure: testParDoWithSideInputsIsCumulative
> ------------------------------------------------------------------
>
>                 Key: BEAM-10589
>                 URL: https://issues.apache.org/jira/browse/BEAM-10589
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-samza
>            Reporter: Tyson Hamilton
>            Priority: P1
>              Labels: flake, flaky-test
>
> [Link to build failure|https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/6914/testReport/junit/org.apache.beam.sdk.transforms/ParDoTest$MultipleInputsAndOutputTests/testParDoWithSideInputsIsCumulative/]. Error inline below for posterity:
>  
> {code:java}
> Error Messagejava.lang.AssertionError: ParDo(Test)/ParMultiDo(Test).output: 
> Expected: iterable with items ["processing: 3: [11, 222]", "processing: -42: [11, 222]", "processing: 666: [11, 222]"] in any order
>      but: no item matches: "processing: 3: [11, 222]", "processing: -42: [11, 222]", "processing: 666: [11, 222]" in []Stacktracejava.lang.AssertionError: ParDo(Test)/ParMultiDo(Test).output: 
> Expected: iterable with items ["processing: 3: [11, 222]", "processing: -42: [11, 222]", "processing: 666: [11, 222]"] in any order
>      but: no item matches: "processing: 3: [11, 222]", "processing: -42: [11, 222]", "processing: 666: [11, 222]" in []
> 	at org.apache.beam.sdk.testing.PAssert$PAssertionSite.capture(PAssert.java:171)
> 	at org.apache.beam.sdk.testing.PAssert.that(PAssert.java:413)
> 	at org.apache.beam.sdk.testing.PAssert.that(PAssert.java:405)
> 	at org.apache.beam.sdk.transforms.ParDoTest$MultipleInputsAndOutputTests.testParDoWithSideInputsIsCumulative(ParDoTest.java:1123)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
> 	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:266)
> 	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
> 	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
> 	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
> 	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
> 	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
> 	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> 	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
> 	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
> 	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> 	at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> 	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> 	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> 	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> 	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> 	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
> 	at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> 	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> 	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
> 	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
> 	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
> 	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
> 	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.AssertionError: 
> Expected: iterable with items ["processing: 3: [11, 222]", "processing: -42: [11, 222]", "processing: 666: [11, 222]"] in any order
>      but: no item matches: "processing: 3: [11, 222]", "processing: -42: [11, 222]", "processing: 666: [11, 222]" in []
> 	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
> 	at org.junit.Assert.assertThat(Assert.java:966)
> 	at org.junit.Assert.assertThat(Assert.java:931)
> 	at org.apache.beam.sdk.transforms.ParDoTest$HasExpectedOutput.apply(ParDoTest.java:4756)
> 	at org.apache.beam.sdk.transforms.ParDoTest$HasExpectedOutput.apply(ParDoTest.java:4695)
> 	at org.apache.beam.sdk.testing.PAssert.doChecks(PAssert.java:1469)
> 	at org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn.processElement(PAssert.java:1436)
> 	at org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
> 	at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
> 	at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
> 	at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$processElement$0(DoFnRunnerWithMetrics.java:55)
> 	at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$withMetrics$2(DoFnRunnerWithMetrics.java:93)
> 	at org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper.wrap(FnWithMetricsWrapper.java:42)
> 	at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:91)
> 	at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55)
> 	at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
> 	at org.apache.beam.runners.samza.runtime.DoFnOp.processElement(DoFnOp.java:329)
> 	at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:82)
> 	at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37)
> 	at org.apache.samza.operators.impl.FlatmapOperatorImpl.handleMessageAsync(FlatmapOperatorImpl.java:57)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:173)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$22(OperatorImpl.java:412)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.Collections$2.tryAdvance(Collections.java:4719)
> 	at java.util.Collections$2.forEachRemaining(Collections.java:4727)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:413)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.aggregateWatermark(OperatorImpl.java:360)
> 	at org.apache.samza.task.StreamOperatorTask.lambda$processAsync$1(StreamOperatorTask.java:133)
> 	at org.apache.samza.task.StreamOperatorTask.processAsync(StreamOperatorTask.java:161)
> 	at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:187)
> 	at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> 	at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:185)
> 	at org.apache.samza.container.RunLoop$AsyncTaskWorker.process(RunLoop.java:481)
> 	at org.apache.samza.container.RunLoop$AsyncTaskWorker.run(RunLoop.java:423)
> 	at org.apache.samza.container.RunLoop$AsyncTaskWorker.access$300(RunLoop.java:357)
> 	at org.apache.samza.container.RunLoop.runTasks(RunLoop.java:244)
> 	at org.apache.samza.container.RunLoop.run(RunLoop.java:176)
> 	at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:768)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	... 1 more
> Standard OutputContainer PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=38243 server port=37229 url=service:jmx:rmi://apache-ci-beam-jenkins-5:37229/jndi/rmi://apache-ci-beam-jenkins-5:38243/jmxrmi
> Container PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=36659 server port=42703 url=service:jmx:rmi://apache-ci-beam-jenkins-5:42703/jndi/rmi://apache-ci-beam-jenkins-5:36659/jmxrmi
> Container PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=42837 server port=38501 url=service:jmx:rmi://apache-ci-beam-jenkins-5:38501/jndi/rmi://apache-ci-beam-jenkins-5:42837/jmxrmi
> Container PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=37971 server port=42731 url=service:jmx:rmi://apache-ci-beam-jenkins-5:42731/jndi/rmi://apache-ci-beam-jenkins-5:37971/jmxrmi
> Container PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=46099 server port=42595 url=service:jmx:rmi://apache-ci-beam-jenkins-5:42595/jndi/rmi://apache-ci-beam-jenkins-5:46099/jmxrmi
> Container PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=45297 server port=39671 url=service:jmx:rmi://apache-ci-beam-jenkins-5:39671/jndi/rmi://apache-ci-beam-jenkins-5:45297/jmxrmi
> Container PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=45515 server port=36839 url=service:jmx:rmi://apache-ci-beam-jenkins-5:36839/jndi/rmi://apache-ci-beam-jenkins-5:45515/jmxrmi
> Container PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=43041 server port=35085 url=service:jmx:rmi://apache-ci-beam-jenkins-5:35085/jndi/rmi://apache-ci-beam-jenkins-5:43041/jmxrmi
> Container PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=35435 server port=44019 url=service:jmx:rmi://apache-ci-beam-jenkins-5:44019/jndi/rmi://apache-ci-beam-jenkins-5:35435/jmxrmi
> Container PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=37883 server port=43625 url=service:jmx:rmi://apache-ci-beam-jenkins-5:43625/jndi/rmi://apache-ci-beam-jenkins-5:37883/jmxrmi
> Container PID: 14600@apache-ci-beam-jenkins-5
> JMX Server: JmxServer registry port=42267 server port=35945 url=service:jmx:rmi://apache-ci-beam-jenkins-5:35945/jndi/rmi://apache-ci-beam-jenkins-5:42267/jmxrmi
> Standard ErrorJul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.SamzaRunner run
> INFO: Beam pipeline DOT graph:
> digraph {
>     rankdir=LR
>     subgraph cluster_0 {
>         label = ""
>         subgraph cluster_1 {
>             label = "Create.Values"
>             2 [label="Read(CreateSource)"]
>         }
>         3 [label="ParMultiDo(Anonymous)"]
>         2 -> 3 [style=solid label=""]
>         subgraph cluster_4 {
>             label = "PAssert$203"
>             subgraph cluster_5 {
>                 label = "PAssert$203/GroupGlobally"
>                 subgraph cluster_6 {
>                     label = "PAssert$203/GroupGlobally/Reify.Window"
>                     subgraph cluster_7 {
>                         label = "PAssert$203/GroupGlobally/Reify.Window/ParDo(Anonymous)"
>                         8 [label="ParMultiDo(Anonymous)"]
>                         3 -> 8 [style=solid label=""]
>                     }
>                 }
>                 subgraph cluster_9 {
>                     label = "PAssert$203/GroupGlobally/ParDo(ToSingletonIterables)"
>                     10 [label="ParMultiDo(ToSingletonIterables)"]
>                     8 -> 10 [style=solid label=""]
>                 }
>                 subgraph cluster_11 {
>                     label = "PAssert$203/GroupGlobally/Create.Values"
>                     12 [label="Read(CreateSource)"]
>                 }
>                 13 [label="Flatten.PCollections"]
>                 12 -> 13 [style=solid label=""]
>                 10 -> 13 [style=solid label=""]
>                 subgraph cluster_14 {
>                     label = "PAssert$203/GroupGlobally/Window.Into()"
>                     15 [label="Flatten.PCollections"]
>                     13 -> 15 [style=solid label=""]
>                 }
>                 subgraph cluster_16 {
>                     label = "PAssert$203/GroupGlobally/WithKeys"
>                     subgraph cluster_17 {
>                         label = "PAssert$203/GroupGlobally/WithKeys/AddKeys"
>                         subgraph cluster_18 {
>                             label = "PAssert$203/GroupGlobally/WithKeys/AddKeys/Map"
>                             19 [label="ParMultiDo(Anonymous)"]
>                             15 -> 19 [style=solid label=""]
>                         }
>                     }
>                 }
>                 20 [label="GroupByKey"]
>                 19 -> 20 [style=solid label=""]
>                 subgraph cluster_21 {
>                     label = "PAssert$203/GroupGlobally/Values"
>                     subgraph cluster_22 {
>                         label = "PAssert$203/GroupGlobally/Values/Values"
>                         subgraph cluster_23 {
>                             label = "PAssert$203/GroupGlobally/Values/Values/Map"
>                             24 [label="ParMultiDo(Anonymous)"]
>                             20 -> 24 [style=solid label=""]
>                         }
>                     }
>                 }
>                 subgraph cluster_25 {
>                     label = "PAssert$203/GroupGlobally/ParDo(Concat)"
>                     26 [label="ParMultiDo(Concat)"]
>                     24 -> 26 [style=solid label=""]
>                 }
>             }
>             subgraph cluster_27 {
>                 label = "PAssert$203/GetPane"
>                 subgraph cluster_28 {
>                     label = "PAssert$203/GetPane/Map"
>                     29 [label="ParMultiDo(Anonymous)"]
>                     26 -> 29 [style=solid label=""]
>                 }
>             }
>             subgraph cluster_30 {
>                 label = "PAssert$203/RunChecks"
>                 31 [label="ParMultiDo(GroupedValuesChecker)"]
>                 29 -> 31 [style=solid label=""]
>             }
>             subgraph cluster_32 {
>                 label = "PAssert$203/VerifyAssertions"
>                 subgraph cluster_33 {
>                     label = "PAssert$203/VerifyAssertions/ParDo(DefaultConclude)"
>                     34 [label="ParMultiDo(DefaultConclude)"]
>                     31 -> 34 [style=solid label=""]
>                 }
>             }
>         }
>         subgraph cluster_35 {
>             label = "PAssert$204"
>             subgraph cluster_36 {
>                 label = "PAssert$204/GroupGlobally"
>                 subgraph cluster_37 {
>                     label = "PAssert$204/GroupGlobally/Reify.Window"
>                     subgraph cluster_38 {
>                         label = "PAssert$204/GroupGlobally/Reify.Window/ParDo(Anonymous)"
>                         39 [label="ParMultiDo(Anonymous)"]
>                         3 -> 39 [style=solid label=""]
>                     }
>                 }
>                 subgraph cluster_40 {
>                     label = "PAssert$204/GroupGlobally/ParDo(ToSingletonIterables)"
>                     41 [label="ParMultiDo(ToSingletonIterables)"]
>                     39 -> 41 [style=solid label=""]
>                 }
>                 subgraph cluster_42 {
>                     label = "PAssert$204/GroupGlobally/Create.Values"
>                     43 [label="Read(CreateSource)"]
>                 }
>                 44 [label="Flatten.PCollections"]
>                 43 -> 44 [style=solid label=""]
>                 41 -> 44 [style=solid label=""]
>                 subgraph cluster_45 {
>                     label = "PAssert$204/GroupGlobally/Window.Into()"
>                     46 [label="Flatten.PCollections"]
>                     44 -> 46 [style=solid label=""]
>                 }
>                 subgraph cluster_47 {
>                     label = "PAssert$204/GroupGlobally/WithKeys"
>                     subgraph cluster_48 {
>                         label = "PAssert$204/GroupGlobally/WithKeys/AddKeys"
>                         subgraph cluster_49 {
>                             label = "PAssert$204/GroupGlobally/WithKeys/AddKeys/Map"
>                             50 [label="ParMultiDo(Anonymous)"]
>                             46 -> 50 [style=solid label=""]
>                         }
>                     }
>                 }
>                 51 [label="GroupByKey"]
>                 50 -> 51 [style=solid label=""]
>                 subgraph cluster_52 {
>                     label = "PAssert$204/GroupGlobally/Values"
>                     subgraph cluster_53 {
>                         label = "PAssert$204/GroupGlobally/Values/Values"
>                         subgraph cluster_54 {
>                             label = "PAssert$204/GroupGlobally/Values/Values/Map"
>                             55 [label="ParMultiDo(Anonymous)"]
>                             51 -> 55 [style=solid label=""]
>                         }
>                     }
>                 }
>                 subgraph cluster_56 {
>                     label = "PAssert$204/GroupGlobally/ParDo(Concat)"
>                     57 [label="ParMultiDo(Concat)"]
>                     55 -> 57 [style=solid label=""]
>                 }
>             }
>             subgraph cluster_58 {
>                 label = "PAssert$204/GetPane"
>                 subgraph cluster_59 {
>                     label = "PAssert$204/GetPane/Map"
>                     60 [label="ParMultiDo(Anonymous)"]
>                     57 -> 60 [style=solid label=""]
>                 }
>             }
>             subgraph cluster_61 {
>                 label = "PAssert$204/RunChecks"
>                 62 [label="ParMultiDo(GroupedValuesChecker)"]
>                 60 -> 62 [style=solid label=""]
>             }
>             subgraph cluster_63 {
>                 label = "PAssert$204/VerifyAssertions"
>                 subgraph cluster_64 {
>                     label = "PAssert$204/VerifyAssertions/ParDo(DefaultConclude)"
>                     65 [label="ParMultiDo(DefaultConclude)"]
>                     62 -> 65 [style=solid label=""]
>                 }
>             }
>         }
>     }
> }
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.translation.ConfigBuilder createSystemConfig
> INFO: Execution environment is LOCAL
> Jul 28, 2020 12:03:31 PM org.apache.samza.runtime.LocalApplicationRunner getDefaultCoordinatorStreamStoreFactory
> WARNING: job.coordinator.system or job.default.system not configured, or job.coordinator.factory is not org.apache.samza.zk.ZkJobCoordinatorFactory. No default coordinator stream metadata store will be created.
> Jul 28, 2020 12:03:31 PM org.apache.samza.application.descriptors.ApplicationDescriptorImpl getOrCreateStreamSerdes
> INFO: Using NoOpSerde as the key serde for stream 0-Create_Values_Read_CreateSource__out__PCollection_. Keys will not be (de)serialized
> Jul 28, 2020 12:03:31 PM org.apache.samza.application.descriptors.ApplicationDescriptorImpl getOrCreateStreamSerdes
> INFO: Using NoOpSerde as the value serde for stream 0-Create_Values_Read_CreateSource__out__PCollection_. Values will not be (de)serialized
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name ParMultiDo(Anonymous) is mapped to id bb7e1
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$203/GroupGlobally/Reify.Window/ParDo(Anonymous)/ParMultiDo(Anonymous) is mapped to id 0675c
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$203/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables) is mapped to id 3b4ce
> Jul 28, 2020 12:03:31 PM org.apache.samza.application.descriptors.ApplicationDescriptorImpl getOrCreateStreamSerdes
> INFO: Using NoOpSerde as the key serde for stream 5-PAssert_203_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_. Keys will not be (de)serialized
> Jul 28, 2020 12:03:31 PM org.apache.samza.application.descriptors.ApplicationDescriptorImpl getOrCreateStreamSerdes
> INFO: Using NoOpSerde as the value serde for stream 5-PAssert_203_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_. Values will not be (de)serialized
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$203/GroupGlobally/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) is mapped to id aa667
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$203/GroupGlobally/GroupByKey is mapped to id 242d1
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$203/GroupGlobally/Values/Values/Map/ParMultiDo(Anonymous) is mapped to id 90586
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$203/GroupGlobally/ParDo(Concat)/ParMultiDo(Concat) is mapped to id 7a32e
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$203/GetPane/Map/ParMultiDo(Anonymous) is mapped to id a2ff5
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$203/RunChecks/ParMultiDo(GroupedValuesChecker) is mapped to id 98d30
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$203/VerifyAssertions/ParDo(DefaultConclude)/ParMultiDo(DefaultConclude) is mapped to id 43709
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$204/GroupGlobally/Reify.Window/ParDo(Anonymous)/ParMultiDo(Anonymous) is mapped to id 1e31d
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$204/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables) is mapped to id ae22d
> Jul 28, 2020 12:03:31 PM org.apache.samza.application.descriptors.ApplicationDescriptorImpl getOrCreateStreamSerdes
> INFO: Using NoOpSerde as the key serde for stream 17-PAssert_204_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_. Keys will not be (de)serialized
> Jul 28, 2020 12:03:31 PM org.apache.samza.application.descriptors.ApplicationDescriptorImpl getOrCreateStreamSerdes
> INFO: Using NoOpSerde as the value serde for stream 17-PAssert_204_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_. Values will not be (de)serialized
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$204/GroupGlobally/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) is mapped to id 7bd86
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$204/GroupGlobally/GroupByKey is mapped to id b95f0
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$204/GroupGlobally/Values/Values/Map/ParMultiDo(Anonymous) is mapped to id d5ac7
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$204/GroupGlobally/ParDo(Concat)/ParMultiDo(Concat) is mapped to id 2450d
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$204/GetPane/Map/ParMultiDo(Anonymous) is mapped to id 2efb6
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$204/RunChecks/ParMultiDo(GroupedValuesChecker) is mapped to id 5f68f
> Jul 28, 2020 12:03:31 PM org.apache.beam.runners.samza.util.HashIdGenerator getId
> INFO: Name PAssert$204/VerifyAssertions/ParDo(DefaultConclude)/ParMultiDo(DefaultConclude) is mapped to id 5c68a
> Jul 28, 2020 12:03:31 PM org.apache.samza.runtime.LocalApplicationRunner initializeRunId
> INFO: Not BATCH mode and hence not generating run id
> Jul 28, 2020 12:03:31 PM org.apache.samza.execution.JobPlanner generateSingleJobConfig
> WARNING: job.id is a deprecated configuration, use app.id instead.
> Jul 28, 2020 12:03:31 PM org.apache.samza.execution.JobPlanner generateSingleJobConfig
> WARNING: job.name is a deprecated configuration, use use app.name instead.
> Jul 28, 2020 12:03:31 PM org.apache.samza.execution.JobPlanner generateSingleJobConfig
> INFO: app.name is defined, generating job.name equal to app.name value: pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c
> Jul 28, 2020 12:03:31 PM org.apache.samza.execution.JobPlanner generateSingleJobConfig
> INFO: app.id is defined, generating job.id equal to app.name value: 1
> Jul 28, 2020 12:03:31 PM org.apache.samza.execution.ExecutionPlanner setInputAndOutputStreamPartitionCount
> INFO: Fetched partition count value 1 for stream 17-PAssert_204_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_
> Jul 28, 2020 12:03:31 PM org.apache.samza.execution.ExecutionPlanner setInputAndOutputStreamPartitionCount
> INFO: Fetched partition count value 1 for stream 5-PAssert_203_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_
> Jul 28, 2020 12:03:31 PM org.apache.samza.execution.ExecutionPlanner setInputAndOutputStreamPartitionCount
> INFO: Fetched partition count value 1 for stream 0-Create_Values_Read_CreateSource__out__PCollection_
> Jul 28, 2020 12:03:31 PM org.apache.samza.execution.LocalJobPlanner prepareJobs
> INFO: Execution Plan: 
> {"jobs":[{"jobName":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c","jobId":"1","operatorGraph":{"inputStreams":[{"streamId":"17-PAssert_204_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-44"]},{"streamId":"5-PAssert_203_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-14"]},{"streamId":"0-Create_Values_Read_CreateSource__out__PCollection_","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-1"]}],"outputStreams":[],"operators":{"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-19":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-19","opCode":"FILTER","sourceLocation":"GroupByKeyTranslator.java:169","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-20"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-44":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-44","opCode":"MAP","sourceLocation":"TranslationContext.java:204","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-45"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-51":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-51","opCode":"FLAT_MAP","sourceLocation":"GroupByKeyTranslator.java:196","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-52"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-23":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-23","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-24"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-50":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-50","opCode":"FLAT_MAP","sourceLocation":"GroupByKeyTranslator.java:195","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-51"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-65":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-65","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-66"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-15":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-15","opCode":"MERGE","sourceLocation":"FlattenPCollectionsTranslator.java:120","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-16"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-26":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-26","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-27"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-58":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-58","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-59"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-57":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-57","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-58"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-12":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-12","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-15"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-55":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-55","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-56"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-54":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-54","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-55"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-10":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-10","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-11"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-52":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-52","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-53"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-62":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-62","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-63"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-2":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-2","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-3","pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-5"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-18":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-18","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-19"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-16":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-16","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-17"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-6":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-6","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-7"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-29":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-29","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-30"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-4":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-4","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-37"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-9":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-9","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-10"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-7":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-7","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-8"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-61":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-61","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-62"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-35":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-35","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-36"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-60":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-60","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-61"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-32":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-32","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-33"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-38":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-38","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-39"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-45":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-45","opCode":"MERGE","sourceLocation":"FlattenPCollectionsTranslator.java:120","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-46"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-25":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-25","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-26"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-24":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-24","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-25"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-66":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-66","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":[]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-22":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-22","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-23"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-21":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-21","opCode":"FLAT_MAP","sourceLocation":"GroupByKeyTranslator.java:196","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-22"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-64":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-64","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-65"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-20":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-20","opCode":"FLAT_MAP","sourceLocation":"GroupByKeyTranslator.java:195","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-21"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-63":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-63","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-64"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-28":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-28","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-29"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-27":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-27","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-28"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-49":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-49","opCode":"FILTER","sourceLocation":"GroupByKeyTranslator.java:169","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-50"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-47":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-47","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-48"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-37":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-37","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-38"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-36":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-36","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":[]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-34":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-34","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-35"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-33":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-33","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-34"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-41":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-41","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-42"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-31":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-31","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-32"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-30":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-30","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-31"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-39":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-39","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-40"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-3":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-3","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-4"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-8":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-8","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-9"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-5":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-5","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-6"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-56":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-56","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-57"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-40":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-40","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-41"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-14":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-14","opCode":"MAP","sourceLocation":"TranslationContext.java:204","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-15"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-11":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-11","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-12"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-17":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-17","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-18"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-1":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-1","opCode":"MAP","sourceLocation":"TranslationContext.java:204","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-2"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-59":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-59","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-60"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-48":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-48","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-49"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-46":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-46","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:176","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-47"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-53":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-53","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-54"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-42":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-42","opCode":"FLAT_MAP","sourceLocation":"ParDoBoundMultiTranslator.java:186","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-45"]}}}}],"sourceStreams":{"17-PAssert_204_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_":{"streamSpec":{"id":"17-PAssert_204_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_","systemName":"17-PAssert_204_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_","physicalName":"17-PAssert_204_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_","partitionCount":1},"sourceJobs":[],"targetJobs":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c"]},"5-PAssert_203_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_":{"streamSpec":{"id":"5-PAssert_203_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_","systemName":"5-PAssert_203_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_","physicalName":"5-PAssert_203_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_","partitionCount":1},"sourceJobs":[],"targetJobs":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c"]},"0-Create_Values_Read_CreateSource__out__PCollection_":{"streamSpec":{"id":"0-Create_Values_Read_CreateSource__out__PCollection_","systemName":"0-Create_Values_Read_CreateSource__out__PCollection_","physicalName":"0-Create_Values_Read_CreateSource__out__PCollection_","partitionCount":1},"sourceJobs":[],"targetJobs":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c"]}},"sinkStreams":{},"intermediateStreams":{},"tables":{},"applicationName":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c","applicationId":"1"}
> Jul 28, 2020 12:03:31 PM org.apache.samza.table.TableConfigGenerator generate
> INFO: TableConfigGenerator has generated configs {}
> Jul 28, 2020 12:03:31 PM org.apache.samza.execution.JobNodeConfigurationGenerator generateJobConfig
> INFO: Job pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1 has generated configs {job.id=1, job.name=pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c, samza.internal.execution.plan={"jobs":[{"jobName":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c","jobId":"1","operatorGraph":{"inputStreams":[{"streamId":"17-PAssert_204_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-44"]},{"streamId":"5-PAssert_203_GroupGlobally_Create_Values_Read_CreateSource__out__PCollection_","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-14"]},{"streamId":"0-Create_Values_Read_CreateSource__out__PCollection_","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-1"]}],"outputStreams":[],"operators":{"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-19":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-19","opCode":"FILTER","sourceLocation":"GroupByKeyTranslator.java:169","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-20"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-44":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-map-44","opCode":"MAP","sourceLocation":"TranslationContext.java:204","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-45"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-51":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-51","opCode":"FLAT_MAP","sourceLocation":"GroupByKeyTranslator.java:196","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-52"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-23":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-23","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-24"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-50":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-50","opCode":"FLAT_MAP","sourceLocation":"GroupByKeyTranslator.java:195","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-51"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-65":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-65","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-66"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-15":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-merge-15","opCode":"MERGE","sourceLocation":"FlattenPCollectionsTranslator.java:120","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-16"]},"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-26":{"opId":"pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-filter-26","opCode":"FILTER","sourceLocation":"ParDoBoundMultiTranslator.java:182","nextOperatorIds":["pardotest0multipleinputsandoutputtests0testpardowithonlytaggedoutput-jenkins-0728120331-e20d7b9c-1-flat_map-27"]},"pardotest0multipleinputsa
> ...[truncated 6399601 chars]...
> tractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:190)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessageAsync$4(OperatorImpl.java:191)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$null$22(OperatorImpl.java:412)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> 	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> 	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> 	at java.util.Collections$2.tryAdvance(Collections.java:4719)
> 	at java.util.Collections$2.forEachRemaining(Collections.java:4727)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:413)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateWatermark$26(OperatorImpl.java:433)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> 	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> 	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateWatermark(OperatorImpl.java:434)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> 	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
> 	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
> 	at org.apache.samza.operators.impl.OperatorImpl.onWatermark(OperatorImpl.java:416)
> 	at org.apache.samza.operators.impl.OperatorImpl.aggregateWatermark(OperatorImpl.java:360)
> 	at org.apache.samza.task.StreamOperatorTask.lambda$processAsync$1(StreamOperatorTask.java:133)
> 	at org.apache.samza.task.StreamOperatorTask.processAsync(StreamOperatorTask.java:161)
> 	at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:187)
> 	at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> 	at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:185)
> 	at org.apache.samza.container.RunLoop$AsyncTaskWorker.process(RunLoop.java:481)
> 	at org.apache.samza.container.RunLoop$AsyncTaskWorker.run(RunLoop.java:423)
> 	at org.apache.samza.container.RunLoop$AsyncTaskWorker.access$300(RunLoop.java:357)
> 	at org.apache.samza.container.RunLoop.runTasks(RunLoop.java:244)
> 	at org.apache.samza.container.RunLoop.run(RunLoop.java:176)
> 	at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:768)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	... 1 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)