You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2020/10/14 07:33:44 UTC
[incubator-nemo] branch master updated: [NEMO-392] Support combine
in streaming (#299)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 1e9a8db [NEMO-392] Support combine in streaming (#299)
1e9a8db is described below
commit 1e9a8db7ea21c4b5069272a680f1f3613e44238e
Author: jaehwan0214 <60...@users.noreply.github.com>
AuthorDate: Wed Oct 14 16:33:37 2020 +0900
[NEMO-392] Support combine in streaming (#299)
JIRA: [NEMO-392: Support combine in streaming] (https://issues.apache.org/jira/projects/NEMO/issues/NEMO-392)
**Major changes:**
- Added GBKTransform class to support stream processing for GroupByKey and Combine.Perkey operation.
- Each time an element arrives, GBKTransform invokes runner to process a single element and stores its state, instead of waiting to process it until the time to emit it to downstream.
- Removed GroupByKeyAndWindowDoFnTransform since GBKTransform can support both Combine.Perkey
and GroupByKeyAndWindow operation.
**Minor changes to note:**
- N/A
**Tests for the changes:**
- Added GBKStreamingTransformTest.
**Other comments:**
- Most of the work is done by TaeGun Um
Closes #299
---
.../frontend/beam/PipelineTranslationContext.java | 2 +-
.../compiler/frontend/beam/PipelineTranslator.java | 114 +++-
.../beam/transform/AbstractDoFnTransform.java | 1 -
.../frontend/beam/transform/FinalCombineFn.java | 72 +++
.../frontend/beam/transform/GBKTransform.java | 300 ++++++++++
.../GroupByKeyAndWindowDoFnTransform.java | 405 -------------
.../transform/InMemoryStateInternalsFactory.java | 51 ++
.../transform/InMemoryTimerInternalsFactory.java | 81 +++
.../frontend/beam/transform/PartialCombineFn.java | 70 +++
.../frontend/beam/transform/GBKTransformTest.java | 654 +++++++++++++++++++++
.../GroupByKeyAndWindowDoFnTransformTest.java | 366 ------------
11 files changed, 1313 insertions(+), 803 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
index 523851d..2d1b90b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -275,7 +275,7 @@ final class PipelineTranslationContext {
if (srcTransform instanceof FlattenTransform) {
return CommunicationPatternProperty.Value.ONE_TO_ONE;
}
- if (dstTransform instanceof GroupByKeyAndWindowDoFnTransform
+ if (dstTransform instanceof GBKTransform
|| dstTransform instanceof GroupByKeyTransform) {
return CommunicationPatternProperty.Value.SHUFFLE;
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index e18f4ae..1e429c3 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -348,49 +349,100 @@ final class PipelineTranslator {
final TransformHierarchy.Node beamNode,
final PTransform<?, ?> transform) {
- // Check if the partial combining optimization can be applied.
- // If not, simply use the default Combine implementation by entering into it.
- if (!(isMainInputBounded(beamNode, ctx.getPipeline()) && isGlobalWindow(beamNode, ctx.getPipeline()))) {
- // TODO #263: Partial Combining for Beam Streaming
- return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
- }
final Combine.PerKey perKey = (Combine.PerKey) transform;
+
+ // If there's any side inputs, translate each primitive transforms in this composite transform one by one.
if (!perKey.getSideInputs().isEmpty()) {
// TODO #264: Partial Combining with Beam SideInputs
return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
}
- // This Combine can be optimized as the following sequence of Nemo IRVertices.
- // Combine Input -> Combine(Partial Combine -> KV<InputT, AccumT> -> Final Combine) -> Combine Output
final CombineFnBase.GlobalCombineFn combineFn = perKey.getFn();
-
- // (Step 1) To Partial Combine
- final IRVertex partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn));
- ctx.addVertex(partialCombine);
- beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(partialCombine, input));
-
- // (Step 2) To Final Combine
- final PCollection input = (PCollection) Iterables.getOnlyElement(
+ final PCollection<?> mainInput = (PCollection<?>) Iterables.getOnlyElement(
+ TransformInputs.nonAdditionalInputs(beamNode.toAppliedPTransform(ctx.getPipeline())));
+ final PCollection inputs = (PCollection) Iterables.getOnlyElement(
TransformInputs.nonAdditionalInputs(beamNode.toAppliedPTransform(ctx.getPipeline())));
- final KvCoder inputCoder = (KvCoder) input.getCoder();
+ final KvCoder inputCoder = (KvCoder) inputs.getCoder();
final Coder accumulatorCoder;
+
+ // Check if accumulator coder exists
try {
accumulatorCoder =
combineFn.getAccumulatorCoder(ctx.getPipeline().getCoderRegistry(), inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
throw new RuntimeException(e);
}
- final IRVertex finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
- ctx.addVertex(finalCombine);
- final IREdge edge = new IREdge(CommunicationPatternProperty.Value.SHUFFLE, partialCombine, finalCombine);
- ctx.addEdge(
- edge,
- KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
- input.getWindowingStrategy().getWindowFn().windowCoder());
- // (Step 3) To Combine Output
+ // If there's no side inputs,
+ // this Combine can be optimized as the following sequence of Nemo IRVertices.
+ // Combine Input -> partialCombine -> finalCombine -> Combine Output
+ final IRVertex partialCombine;
+ final IRVertex finalCombine;
+
+ // Choose between batch processing and stream processing based on window type and boundedness of data
+ if (isMainInputBounded(beamNode, ctx.getPipeline()) && isGlobalWindow(beamNode, ctx.getPipeline())) {
+ // Batch processing, using CombinePartialTransform and CombineFinalTransform
+ partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn));
+ finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
+ // Stream data processing, using GBKTransform
+ } else {
+ final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
+ final CombineFnBase.GlobalCombineFn partialCombineFn = new PartialCombineFn(
+ (Combine.CombineFn) combineFn, accumulatorCoder);
+ final CombineFnBase.GlobalCombineFn finalCombineFn = new FinalCombineFn(
+ (Combine.CombineFn) combineFn, accumulatorCoder);
+ final SystemReduceFn partialSystemReduceFn =
+ SystemReduceFn.combining(
+ inputCoder.getKeyCoder(),
+ AppliedCombineFn.withInputCoder(partialCombineFn,
+ ctx.getPipeline().getCoderRegistry(), inputCoder,
+ null,
+ mainInput.getWindowingStrategy()));
+ final SystemReduceFn finalSystemReduceFn =
+ SystemReduceFn.combining(
+ inputCoder.getKeyCoder(),
+ AppliedCombineFn.withInputCoder(finalCombineFn,
+ ctx.getPipeline().getCoderRegistry(),
+ KvCoder.of(inputCoder.getKeyCoder(),
+ accumulatorCoder),
+ null, mainInput.getWindowingStrategy()));
+ final GBKTransform partialCombineStreamTransform =
+ new GBKTransform(
+ getOutputCoders(pTransform),
+ new TupleTag<>(),
+ mainInput.getWindowingStrategy(),
+ ctx.getPipelineOptions(),
+ partialSystemReduceFn,
+ DoFnSchemaInformation.create(),
+ DisplayData.from(beamNode.getTransform()));
+
+ final GBKTransform finalCombineStreamTransform =
+ new GBKTransform(
+ getOutputCoders(pTransform),
+ new TupleTag<>(),
+ mainInput.getWindowingStrategy(),
+ ctx.getPipelineOptions(),
+ finalSystemReduceFn,
+ DoFnSchemaInformation.create(),
+ DisplayData.from(beamNode.getTransform()));
+
+ partialCombine = new OperatorVertex(partialCombineStreamTransform);
+ finalCombine = new OperatorVertex(finalCombineStreamTransform);
+ }
+
+ // (Step 1) Partial Combine
+ ctx.addVertex(partialCombine);
+ beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(partialCombine, input));
+
+ // (Step 2) Final Combine
+ ctx.addVertex(finalCombine);
beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, finalCombine, output));
+ // (Step 3) Adding an edge from partialCombine vertex to finalCombine vertex
+ final IREdge edge = new IREdge(CommunicationPatternProperty.Value.SHUFFLE, partialCombine, finalCombine);
+ final Coder intermediateCoder = KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder);
+ ctx.addEdge(edge, intermediateCoder, mainInput.getWindowingStrategy().getWindowFn().windowCoder());
+
// This composite transform has been translated in its entirety.
return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
}
@@ -490,30 +542,32 @@ final class PipelineTranslator {
}
/**
- * Create a group by key transform.
- * It returns GroupByKeyAndWindowDoFnTransform if window function is not default.
+ * Returns the correct type of GroupByKey transform by checking whether global windowing strategy is used.
*
* @param ctx translation context
* @param beamNode the beam node to be translated
- * @return group by key transform
+ * @return GroupByKey transform
*/
private static Transform createGBKTransform(
final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode) {
- final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
+ final AppliedPTransform<?, ?, ?> pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
final PCollection<?> mainInput = (PCollection<?>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
final TupleTag mainOutputTag = new TupleTag<>();
if (isGlobalWindow(beamNode, ctx.getPipeline())) {
+ // GroupByKey Transform when using a global windowing strategy.
return new GroupByKeyTransform();
} else {
- return new GroupByKeyAndWindowDoFnTransform(
+ // GroupByKey Transform when using a non-global windowing strategy.
+ return new GBKTransform<>(
getOutputCoders(pTransform),
mainOutputTag,
mainInput.getWindowingStrategy(),
ctx.getPipelineOptions(),
SystemReduceFn.buffering(mainInput.getCoder()),
+ DoFnSchemaInformation.create(),
DisplayData.from(beamNode.getTransform()));
}
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 9a50bb5..79b31c6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -37,7 +37,6 @@ import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FinalCombineFn.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FinalCombineFn.java
new file mode 100644
index 0000000..c5ad209
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FinalCombineFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/**
+ * Wrapper class for {@link Combine.CombineFn}.
+ * When adding input, it merges its accumulator and input accumulator into a single accumulator.
+ * @param <AccumT> accumulator type
+ * @param <Output> output type
+ */
+public final class FinalCombineFn<AccumT, Output> extends Combine.CombineFn<AccumT, AccumT, Output> {
+ private static final Logger LOG = LoggerFactory.getLogger(FinalCombineFn.class.getName());
+ private final Combine.CombineFn<?, AccumT, Output> originFn;
+ private final Coder<AccumT> accumCoder;
+
+ public FinalCombineFn(final Combine.CombineFn<?, AccumT, Output> originFn,
+ final Coder<AccumT> accumCoder) {
+ this.originFn = originFn;
+ this.accumCoder = accumCoder;
+ }
+
+ @Override
+ public AccumT createAccumulator() {
+ return originFn.createAccumulator();
+ }
+
+ @Override
+ public AccumT addInput(final AccumT accumulator, final AccumT input) {
+ final AccumT result = originFn.mergeAccumulators(Arrays.asList(accumulator, input));
+ return result;
+ }
+
+ @Override
+ public Coder<AccumT> getAccumulatorCoder(final CoderRegistry registry, final Coder<AccumT> ac) {
+ return accumCoder;
+ }
+
+ @Override
+ public AccumT mergeAccumulators(final Iterable<AccumT> accumulators) {
+ return originFn.mergeAccumulators(accumulators);
+ }
+
+ @Override
+ public Output extractOutput(final AccumT accumulator) {
+ final Output result = originFn.extractOutput(accumulator);
+ return result;
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
new file mode 100644
index 0000000..37952ea
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation when input data is unbounded or is not in
+ * global window.
+ * @param <K> key type
+ * @param <InputT> input type
+ * @param <OutputT> output type
+ */
+public final class GBKTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
+ private static final Logger LOG = LoggerFactory.getLogger(GBKTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K> inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K> inMemoryStateInternalsFactory;
+ private final Map<K, Watermark> keyOutputWatermarkMap = new HashMap<>();
+ private Watermark prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+ private Watermark inputWatermark = new Watermark(Long.MIN_VALUE);
+ private boolean dataReceived = false;
+ private transient OutputCollector originOc;
+
+ public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
+ final TupleTag<KV<K, OutputT>> mainOutputTag,
+ final WindowingStrategy<?, ?> windowingStrategy,
+ final PipelineOptions options,
+ final SystemReduceFn reduceFn,
+ final DoFnSchemaInformation doFnSchemaInformation,
+ final DisplayData displayData) {
+ super(null,
+ null,
+ outputCoders,
+ mainOutputTag,
+ Collections.emptyList(), /* no additional outputs */
+ windowingStrategy,
+ Collections.emptyMap(), /* no additional side inputs */
+ options,
+ displayData,
+ doFnSchemaInformation,
+ Collections.emptyMap()); /* does not have side inputs */
+ this.reduceFn = reduceFn;
+ }
+
+ /**
+ * This creates a new DoFn that groups elements by key and window.
+ * @param doFn original doFn.
+ * @return GroupAlsoByWindowViaWindowSetNewDoFn
+ */
+ @Override
+ protected DoFn wrapDoFn(final DoFn doFn) {
+ if (inMemoryStateInternalsFactory == null) {
+ this.inMemoryStateInternalsFactory = new InMemoryStateInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryStateInternalFactory is already set");
+ }
+
+ if (inMemoryTimerInternalsFactory == null) {
+ this.inMemoryTimerInternalsFactory = new InMemoryTimerInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryTimerInternalsFactory is already set");
+ }
+
+ // This function performs group by key and window operation.
+ return
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ getWindowingStrategy(),
+ inMemoryStateInternalsFactory,
+ inMemoryTimerInternalsFactory,
+ null, // does not have side input.
+ reduceFn,
+ getOutputManager(),
+ getMainOutputTag());
+ }
+
+ /** Wrapper function of output collector. */
+ @Override
+ OutputCollector wrapOutputCollector(final OutputCollector oc) {
+ originOc = oc;
+ return new GBKOutputCollector(oc);
+ }
+
+ /**
+ * Every time a single element arrives, this method invokes runner to process a single element.
+ * @param element input data element.
+ */
+ @Override
+ public void onData(final WindowedValue<KV<K, InputT>> element) {
+ dataReceived = true;
+ try {
+ checkAndInvokeBundle();
+ final KV<K, InputT> kv = element.getValue();
+ final KeyedWorkItem<K, InputT> keyedWorkItem =
+ KeyedWorkItems.elementsWorkItem(kv.getKey(),
+ Collections.singletonList(element.withValue(kv.getValue())));
+ getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+ checkAndFinishBundle();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("Exception triggered element " + element.toString());
+ }
+ }
+
+ /**
+ * Trigger timers that need to be fired at {@param watermark} and emit output watermark.
+ * @param watermark watermark
+ */
+ @Override
+ public void onWatermark(final Watermark watermark) throws RuntimeException {
+ if (watermark.getTimestamp() <= inputWatermark.getTimestamp()) {
+ throw new RuntimeException(
+ "Received watermark " + watermark.getTimestamp()
+ + " is before the previous inputWatermark " + inputWatermark.getTimestamp() + " in GBKTransform.");
+ }
+ checkAndInvokeBundle();
+ inputWatermark = watermark;
+ try {
+ // Trigger timers
+ triggerTimers(Instant.now(), Instant.now(), inputWatermark);
+ // Emit output watermark
+ emitOutputWatermark();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ checkAndFinishBundle();
+ }
+
+ /**
+ * This advances the input watermark and processing time to the timestamp max value
+ * in order to emit all data.
+ */
+ @Override
+ protected void beforeClose() {
+ // Finish any pending windows by advancing the input watermark to timestamp max value.
+ inputWatermark = new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ // Trigger all the remaining timers that have not been fired yet.
+ triggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE, inputWatermark);
+ // Emit output watermark
+ emitOutputWatermark();
+ }
+
+ /**
+ * Trigger eligible timers. When triggering, it emits the output to downstream operators.
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ * @param watermark watermark
+ */
+ private void triggerTimers(final Instant processingTime,
+ final Instant synchronizedTime,
+ final Watermark watermark) {
+ final Iterator<Map.Entry<K, InMemoryTimerInternals>> iter =
+ inMemoryTimerInternalsFactory.getTimerInternalsMap().entrySet().iterator();
+ while (iter.hasNext()) {
+ final Map.Entry<K, InMemoryTimerInternals> curr = iter.next();
+ try {
+ curr.getValue().advanceInputWatermark(new Instant(watermark.getTimestamp()));
+ curr.getValue().advanceProcessingTime(processingTime);
+ curr.getValue().advanceSynchronizedProcessingTime(synchronizedTime);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException();
+ }
+ for (final TimeDomain domain : TimeDomain.values()) {
+ processTrigger(curr.getKey(), curr.getValue(), domain);
+ }
+ // Remove timerInternals and stateInternals that are no longer needed.
+ if (inMemoryTimerInternalsFactory.isEmpty(curr.getValue())) {
+ iter.remove();
+ inMemoryStateInternalsFactory.getStateInternalMap().remove(curr.getKey());
+ }
+ }
+ }
+
+ /**
+ * Fetch eligible timers in {@param timeDomain} and trigger them.
+ * @param key key
+ * @param timerInternal timerInternal to be accessed
+ * @param timeDomain time domain
+ */
+ private void processTrigger(final K key, final InMemoryTimerInternals timerInternal, final TimeDomain timeDomain) {
+ TimerInternals.TimerData timer;
+ // Get all eligible timers and trigger them.
+ while ((timer = inMemoryTimerInternalsFactory.pollTimer(timerInternal, timeDomain)) != null) {
+ final KeyedWorkItem<K, InputT> timerWorkItem =
+ KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timer));
+ getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
+ }
+ }
+
+ /**
+ * Emit watermark to downstream operators.
+ * Output watermark = max(prev output watermark, min(input watermark, watermark holds)).
+ */
+ private void emitOutputWatermark() {
+ // Find min watermark hold
+ Watermark minWatermarkHold = keyOutputWatermarkMap.isEmpty()
+ ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+ : Collections.min(keyOutputWatermarkMap.values());
+
+ Watermark outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
+
+ while (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
+ // Progress
+ prevOutputWatermark = outputWatermarkCandidate;
+ // Emit watermark
+ getOutputCollector().emitWatermark(outputWatermarkCandidate);
+ // Remove minimum watermark holds
+ if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) {
+ final long minWatermarkTimestamp = minWatermarkHold.getTimestamp();
+ keyOutputWatermarkMap.entrySet()
+ .removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkTimestamp);
+ }
+
+ minWatermarkHold = keyOutputWatermarkMap.isEmpty()
+ ? new Watermark(Long.MAX_VALUE) : Collections.min(keyOutputWatermarkMap.values());
+
+ outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
+ }
+ }
+
+ /** Wrapper class for {@link OutputCollector}. */
+ public class GBKOutputCollector implements OutputCollector<WindowedValue<KV<K, OutputT>>> {
+ private final OutputCollector<WindowedValue<KV<K, OutputT>>> oc;
+
+ public GBKOutputCollector(final OutputCollector oc) {
+ this.oc = oc;
+ }
+
+ /** Emit output. If {@param output} is emitted on-time, save its timestamp in the output watermark map. */
+ @Override
+ public void emit(final WindowedValue<KV<K, OutputT>> output) {
+ // The watermark advances only in ON_TIME
+ if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
+ KV<K, OutputT> value = output.getValue();
+ final K key = value.getKey();
+ final InMemoryTimerInternals timerInternals =
+ (InMemoryTimerInternals) inMemoryTimerInternalsFactory.timerInternalsForKey(key);
+ // Add the output timestamp to the watermark hold of each key.
+ // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999.
+ keyOutputWatermarkMap.put(key,
+ new Watermark(output.getTimestamp().getMillis() + 1));
+ timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
+ }
+ oc.emit(output);
+ }
+
+ /** Emit watermark. */
+ @Override
+ public void emitWatermark(final Watermark watermark) {
+ oc.emitWatermark(watermark);
+ }
+
+ /** Emit output value to {@param dstVertexId}. */
+ @Override
+ public <T> void emit(final String dstVertexId, final T output) {
+ oc.emit(dstVertexId, output);
+ }
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
deleted file mode 100644
index 0818270..0000000
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import org.apache.beam.runners.core.*;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.punctuation.Watermark;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Groups elements according to key and window.
- *
- * @param <K> key type.
- * @param <InputT> input type.
- */
-public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
- extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K, Iterable<InputT>>> {
- private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransform.class.getName());
-
- private final SystemReduceFn reduceFn;
- private final Map<K, List<WindowedValue<InputT>>> keyToValues;
- private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
- private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
- private Watermark prevOutputWatermark;
- private final Map<K, Watermark> keyAndWatermarkHoldMap;
- private boolean dataReceived = false;
-
- /**
- * GroupByKey constructor.
- *
- * @param outputCoders output coders
- * @param mainOutputTag main output tag
- * @param windowingStrategy windowing strategy
- * @param options pipeline options
- * @param reduceFn reduce function
- * @param displayData display data.
- */
- public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
- final TupleTag<KV<K, Iterable<InputT>>> mainOutputTag,
- final WindowingStrategy<?, ?> windowingStrategy,
- final PipelineOptions options,
- final SystemReduceFn reduceFn,
- final DisplayData displayData) {
- super(null, /* doFn */
- null, /* inputCoder */
- outputCoders,
- mainOutputTag,
- Collections.emptyList(), /* GBK does not have additional outputs */
- windowingStrategy,
- Collections.emptyMap(), /* GBK does not have additional side inputs */
- options,
- displayData,
- DoFnSchemaInformation.create(),
- Collections.emptyMap());
- this.keyToValues = new HashMap<>();
- this.reduceFn = reduceFn;
- this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
- this.keyAndWatermarkHoldMap = new HashMap<>();
- }
-
- /**
- * This creates a new DoFn that groups elements by key and window.
- *
- * @param doFn original doFn.
- * @return GroupAlsoByWindowViaWindowSetNewDoFn
- */
- @Override
- protected DoFn wrapDoFn(final DoFn doFn) {
- final Map<K, StateAndTimerForKey> map = new HashMap<>();
- this.inMemoryStateInternalsFactory = new InMemoryStateInternalsFactory(map);
- this.inMemoryTimerInternalsFactory = new InMemoryTimerInternalsFactory(map);
-
- // This function performs group by key and window operation
- return
- GroupAlsoByWindowViaWindowSetNewDoFn.create(
- getWindowingStrategy(),
- inMemoryStateInternalsFactory,
- inMemoryTimerInternalsFactory,
- null, // GBK has no sideinput.
- reduceFn,
- getOutputManager(),
- getMainOutputTag());
- }
-
- @Override
- OutputCollector wrapOutputCollector(final OutputCollector oc) {
- return new GBKWOutputCollector(oc);
- }
-
- /**
- * It collects data for each key.
- * The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
- *
- * @param element data element
- */
- @Override
- public void onData(final WindowedValue<KV<K, InputT>> element) {
- checkAndInvokeBundle();
- dataReceived = true;
-
- // We can call Beam's DoFnRunner#processElement here,
- // but it may generate some overheads if we call the method for each data.
- // The `processElement` requires a `Iterator` of data, so we emit the buffered data every watermark.
- // TODO #250: But, this approach can delay the event processing in streaming,
- // TODO #250: if the watermark is not triggered for a long time.
- final KV<K, InputT> kv = element.getValue();
- keyToValues.putIfAbsent(kv.getKey(), new ArrayList<>());
- keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
-
- checkAndFinishBundle();
- }
-
- /**
- * Process the collected data and trigger timers.
- *
- * @param inputWatermark current input watermark
- * @param processingTime processing time
- * @param synchronizedTime synchronized time
- */
- private void processElementsAndTriggerTimers(final Watermark inputWatermark,
- final Instant processingTime,
- final Instant synchronizedTime) {
- for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) {
- final K key = entry.getKey();
- final List<WindowedValue<InputT>> values = entry.getValue();
-
- // for each key
- // Process elements
- if (!values.isEmpty()) {
- final KeyedWorkItem<K, InputT> keyedWorkItem =
- KeyedWorkItems.elementsWorkItem(key, values);
- // The DoFnRunner interface requires WindowedValue,
- // but this windowed value is actually not used in the ReduceFnRunner internal.
- getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
- }
-
- // Trigger timers
- triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
-
- // Remove values
- values.clear();
- }
- }
-
- /**
- * Output watermark
- * = max(prev output watermark,
- * min(input watermark, watermark holds)).
- *
- * @param inputWatermark input watermark
- */
- private void emitOutputWatermark(final Watermark inputWatermark) {
- // Find min watermark hold
- final Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
- ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
- // set this to MAX, in order not to emit input watermark when there are no outputs.
- : Collections.min(keyAndWatermarkHoldMap.values());
- final Watermark outputWatermarkCandidate = new Watermark(
- Math.max(prevOutputWatermark.getTimestamp(),
- Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Watermark hold: {}, "
- + "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark);
- }
-
- if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
- // progress!
- prevOutputWatermark = outputWatermarkCandidate;
- // emit watermark
- getOutputCollector().emitWatermark(outputWatermarkCandidate);
- // Remove minimum watermark holds
- if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) {
- keyAndWatermarkHoldMap.entrySet()
- .removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkHold.getTimestamp());
- }
- }
- }
-
- @Override
- public void onWatermark(final Watermark inputWatermark) {
- checkAndInvokeBundle();
- processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
- // Emit watermark to downstream operators
- emitOutputWatermark(inputWatermark);
- checkAndFinishBundle();
- }
-
- /**
- * This advances the input watermark and processing time to the timestamp max value
- * in order to emit all data.
- */
- @Override
- protected void beforeClose() {
- // Finish any pending windows by advancing the input watermark to infinity.
- processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
- BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
- }
-
- /**
- * Trigger times for current key.
- * When triggering, it emits the windowed data to downstream operators.
- *
- * @param key key
- * @param watermark watermark
- * @param processingTime processing time
- * @param synchronizedTime synchronized time
- */
- private void triggerTimers(final K key,
- final Watermark watermark,
- final Instant processingTime,
- final Instant synchronizedTime) {
- final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
- inMemoryTimerInternalsFactory.timerInternalsForKey(key);
- try {
- timerInternals.advanceInputWatermark(new Instant(watermark.getTimestamp()));
- timerInternals.advanceProcessingTime(processingTime);
- timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
-
- final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
-
- if (!timerDataList.isEmpty()) {
- // Trigger timers and emit windowed data
- final KeyedWorkItem<K, InputT> timerWorkItem =
- KeyedWorkItems.timersWorkItem(key, timerDataList);
- // The DoFnRunner interface requires WindowedValue,
- // but this windowed value is actually not used in the ReduceFnRunner internal.
- getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
- }
- }
-
- /**
- * Get timer data.
- *
- * @param timerInternals in-memory timer internals.
- * @return list of timer datas.
- */
- private List<TimerInternals.TimerData> getEligibleTimers(final InMemoryTimerInternals timerInternals) {
- final List<TimerInternals.TimerData> timerData = new LinkedList<>();
-
- while (true) {
- TimerInternals.TimerData timer;
- boolean hasFired = false;
-
- while ((timer = timerInternals.removeNextEventTimer()) != null) {
- hasFired = true;
- timerData.add(timer);
- }
- while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
- hasFired = true;
- timerData.add(timer);
- }
- while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
- hasFired = true;
- timerData.add(timer);
- }
- if (!hasFired) {
- break;
- }
- }
-
- return timerData;
- }
-
- /**
- * State and timer internal.
- */
- final class StateAndTimerForKey {
- private StateInternals stateInternals;
- private TimerInternals timerInternals;
-
- /**
- * @param stateInternals state internals.
- * @param timerInternals timer internals.
- */
- StateAndTimerForKey(final StateInternals stateInternals,
- final TimerInternals timerInternals) {
- this.stateInternals = stateInternals;
- this.timerInternals = timerInternals;
- }
- }
-
- /**
- * InMemoryStateInternalsFactory.
- */
- final class InMemoryStateInternalsFactory implements StateInternalsFactory<K> {
- private final Map<K, StateAndTimerForKey> map;
-
- /**
- * @param map initial map.
- */
- InMemoryStateInternalsFactory(final Map<K, StateAndTimerForKey> map) {
- this.map = map;
- }
-
- @Override
- public StateInternals stateInternalsForKey(final K key) {
- map.putIfAbsent(key, new StateAndTimerForKey(InMemoryStateInternals.forKey(key), null));
- final StateAndTimerForKey stateAndTimerForKey = map.get(key);
- if (stateAndTimerForKey.stateInternals == null) {
- stateAndTimerForKey.stateInternals = InMemoryStateInternals.forKey(key);
- }
- return stateAndTimerForKey.stateInternals;
- }
- }
-
- /**
- * InMemoryTimerInternalsFactory.
- */
- final class InMemoryTimerInternalsFactory implements TimerInternalsFactory<K> {
- private final Map<K, StateAndTimerForKey> map;
-
- /**
- * @param map initial map.
- */
- InMemoryTimerInternalsFactory(final Map<K, StateAndTimerForKey> map) {
- this.map = map;
- }
-
- @Override
- public TimerInternals timerInternalsForKey(final K key) {
- map.putIfAbsent(key, new StateAndTimerForKey(null, new InMemoryTimerInternals()));
- final StateAndTimerForKey stateAndTimerForKey = map.get(key);
- if (stateAndTimerForKey.timerInternals == null) {
- stateAndTimerForKey.timerInternals = new InMemoryTimerInternals();
- }
- return stateAndTimerForKey.timerInternals;
- }
- }
-
- /**
- * This class wraps the output collector to track the watermark hold of each key.
- */
- final class GBKWOutputCollector implements OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> {
- private final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector;
-
- /**
- * @param outputCollector output collector.
- */
- GBKWOutputCollector(final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector) {
- this.outputCollector = outputCollector;
- }
-
- @Override
- public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) {
-
- // The watermark advances only in ON_TIME
- if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
- final K key = output.getValue().getKey();
- final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
- inMemoryTimerInternalsFactory.timerInternalsForKey(key);
- keyAndWatermarkHoldMap.put(key,
- // adds the output timestamp to the watermark hold of each key
- // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999
- new Watermark(output.getTimestamp().getMillis() + 1));
- timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
- }
- outputCollector.emit(output);
- }
-
- @Override
- public void emitWatermark(final Watermark watermark) {
- outputCollector.emitWatermark(watermark);
- }
-
- @Override
- public <T> void emit(final String dstVertexId, final T output) {
- outputCollector.emit(dstVertexId, output);
- }
- }
-}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryStateInternalsFactory.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryStateInternalsFactory.java
new file mode 100644
index 0000000..f57a707
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryStateInternalsFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * InMemoryStateInternalsFactory.
+ * @param <K> key type
+ */
+public final class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K> {
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateInternalsFactory.class.getName());
+ private final Map<K, StateInternals> stateInternalMap = new HashMap<>();
+
+ @Override
+ public String toString() {
+ return "StateInternalMap: " + stateInternalMap;
+ }
+
+ @Override
+ public StateInternals stateInternalsForKey(final K key) {
+ stateInternalMap.putIfAbsent(key,
+ InMemoryStateInternals.forKey(key));
+ return stateInternalMap.get(key);
+ }
+
+ public Map<K, StateInternals> getStateInternalMap() {
+ return stateInternalMap;
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryTimerInternalsFactory.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryTimerInternalsFactory.java
new file mode 100644
index 0000000..0072463
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryTimerInternalsFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.*;
+
+/**
+ * InMemoryTimerInternalsFactory.
+ * @param <K> key type
+ */
+public final class InMemoryTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryTimerInternalsFactory.class.getName());
+ private Map<K, InMemoryTimerInternals> timerInternalsMap = new HashMap<>();
+
+ @Override
+ public String toString() {
+ return "TimerInternalsMap: " + timerInternalsMap;
+ }
+
+ @Override
+ public TimerInternals timerInternalsForKey(final K key) {
+ if (timerInternalsMap.get(key) != null) {
+ return timerInternalsMap.get(key);
+ } else {
+ final InMemoryTimerInternals internal = new InMemoryTimerInternals();
+ timerInternalsMap.put(key, internal);
+ return internal;
+ }
+ }
+
+ /** Remove the next eligible timer in {@param timeDomain}. */
+ public TimerInternals.TimerData pollTimer(final InMemoryTimerInternals timerInternal, final TimeDomain timeDomain) {
+ switch (timeDomain) {
+ case EVENT_TIME :
+ return timerInternal.removeNextEventTimer();
+ case PROCESSING_TIME:
+ return timerInternal.removeNextProcessingTimer();
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return timerInternal.removeNextSynchronizedProcessingTimer();
+ default :
+ return null;
+ }
+ }
+
+ /** Accessor for timerInternalsMap. */
+ public Map<K, InMemoryTimerInternals> getTimerInternalsMap() {
+ return timerInternalsMap;
+ }
+
+ /** Helper method to check if {@param timerInternals} doesn't have any timers left. */
+ public boolean isEmpty(final InMemoryTimerInternals timerInternal) {
+ for (final TimeDomain domain : TimeDomain.values()) {
+ if (timerInternal.getNextTimer(domain) != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PartialCombineFn.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PartialCombineFn.java
new file mode 100644
index 0000000..8a94dea
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PartialCombineFn.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper class for {@link Combine.CombineFn}.
+ * When invoked to output, it outputs its accumulator, instead of the output from its original combine function.
+ * @param <InputT> input type
+ * @param <AccumT> accumulator type
+ */
+public final class PartialCombineFn<InputT, AccumT> extends Combine.CombineFn<InputT, AccumT, AccumT> {
+ private static final Logger LOG = LoggerFactory.getLogger(PartialCombineFn.class.getName());
+ private final Combine.CombineFn<InputT, AccumT, ?> originFn;
+ private final Coder<AccumT> accumCoder;
+
+ public PartialCombineFn(final Combine.CombineFn<InputT, AccumT, ?> originFn,
+ final Coder<AccumT> accumCoder) {
+ this.originFn = originFn;
+ this.accumCoder = accumCoder;
+ }
+
+ @Override
+ public AccumT createAccumulator() {
+ return originFn.createAccumulator();
+ }
+
+ @Override
+ public AccumT addInput(final AccumT accumulator, final InputT input) {
+ return originFn.addInput(accumulator, input);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(final Iterable<AccumT> accumulators) {
+ return originFn.mergeAccumulators(accumulators);
+ }
+
+ @Override
+ public AccumT extractOutput(final AccumT accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<AccumT> getAccumulatorCoder(final CoderRegistry registry, final Coder<InputT> inputCoder)
+ throws CannotProvideCoderException {
+ return accumCoder;
+ }
+}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
new file mode 100644
index 0000000..9a98932
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import junit.framework.TestCase;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.*;
+import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.*;
+
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.*;
+import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
+import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class GBKTransformTest extends TestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(GBKTransformTest.class.getName());
+ private final static Coder STRING_CODER = StringUtf8Coder.of();
+ private final static Coder INTEGER_CODER = BigEndianIntegerCoder.of();
+ private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
+
+ private void checkOutput(final KV<String, Integer> expected, final KV<String, Integer> result) {
+ // check key
+ assertEquals(expected.getKey(), result.getKey());
+ // check value
+ assertEquals(expected.getValue(), result.getValue());
+ }
+
+ private void checkOutput2(final KV<String, List<String>> expected, final KV<String, Iterable<String>> result) {
+ // check key
+ assertEquals(expected.getKey(), result.getKey());
+ // check value
+ final List<String> resultValue = new ArrayList<>();
+ final List<String> expectedValue = new ArrayList<>(expected.getValue());
+ result.getValue().iterator().forEachRemaining(resultValue::add);
+ Collections.sort(resultValue);
+ Collections.sort(expectedValue);
+ assertEquals(expectedValue, resultValue);
+ }
+
+
+ // Test Combine.Perkey operation.
+
+ // Define combine function.
+ public static class CountFn extends Combine.CombineFn<Integer, CountFn.Accum, Integer> {
+
+ public static class Accum {
+ int sum = 0;
+ }
+
+ @Override
+ public Accum createAccumulator() {
+ return new Accum();
+ }
+
+ @Override
+ public Accum addInput(Accum accum, Integer input) {
+ accum.sum += input;
+ return accum;
+ }
+
+ @Override
+ public Accum mergeAccumulators(Iterable<Accum> accums) {
+ Accum merged = createAccumulator();
+ for (Accum accum : accums) {
+ merged.sum += accum.sum;
+ }
+ return merged;
+ }
+
+ @Override
+ public Integer extractOutput(Accum accum) {
+ return accum.sum;
+ }
+
+ @Override
+ public Coder<CountFn.Accum> getAccumulatorCoder(CoderRegistry registry, Coder<Integer> inputcoder) {
+ return AvroCoder.of(CountFn.Accum.class);
+ }
+ }
+
+ public final static Combine.CombineFn combine_fn = new CountFn();
+
+ // window size: 10 sec
+ // period: 5 sec
+ //
+ // [----------------------- window1 --------------------------]
+ // [---------------------------window2-------------------------]
+ // [-------------------------window3----------------------]
+ //
+ // ts1 -- ts2 -------------------- ts3 -- w1 -- ts4 --------- ts5 - w2 ------------ ts6 -----ts7 -- w3 -- ts8 --ts9 --- w4
+
+ // Test without late data
+ @Test
+ @SuppressWarnings("unchecked")
+ public void test_combine() {
+ final TupleTag<String> outputTag = new TupleTag<>("main-output");
+ final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(10))
+ .every(Duration.standardSeconds(5));
+
+ final Instant ts1 = new Instant(1000);
+ final Instant ts2 = new Instant(2000);
+ final Instant ts3 = new Instant(6000);
+ final Instant ts4 = new Instant(8000);
+ final Instant ts5 = new Instant(11000);
+ final Instant ts6 = new Instant(14000);
+ final Instant ts7 = new Instant(16000);
+ final Instant ts8 = new Instant(17000);
+ final Instant ts9 = new Instant(19000);
+ final Watermark watermark1 = new Watermark(7000);
+ final Watermark watermark2 = new Watermark(12000);
+ final Watermark watermark3 = new Watermark(18000);
+ final Watermark watermark4 = new Watermark(21000);
+
+ AppliedCombineFn<String, Integer, CountFn.Accum, Integer> applied_combine_fn =
+ AppliedCombineFn.withInputCoder(
+ combine_fn,
+ CoderRegistry.createDefault(),
+ KvCoder.of(STRING_CODER, INTEGER_CODER),
+ null,
+ WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES)
+ );
+
+ final GBKTransform<String, Integer, Integer> combine_transform =
+ new GBKTransform(
+ NULL_OUTPUT_CODERS,
+ outputTag,
+ WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES),
+ PipelineOptionsFactory.as(NemoPipelineOptions.class),
+ SystemReduceFn.combining(STRING_CODER, applied_combine_fn),
+ DoFnSchemaInformation.create(),
+ DisplayData.none());
+
+ // window1 : [-5000, 5000) in millisecond
+ // window2 : [0, 10000)
+ // window3 : [5000, 15000)
+ // window4 : [10000, 20000)
+ List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
+ Collections.sort(sortedWindows, IntervalWindow::compareTo);
+ final IntervalWindow window1 = sortedWindows.get(0);
+ final IntervalWindow window2 = sortedWindows.get(1);
+ sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts5));
+ Collections.sort(sortedWindows, IntervalWindow::compareTo);
+ final IntervalWindow window3 = sortedWindows.get(0);
+ final IntervalWindow window4 = sortedWindows.get(1);
+
+ // Prepare to test CombineStreamTransform
+ final Transform.Context context = mock(Transform.Context.class);
+ final TestOutputCollector<KV<String, Integer>> oc = new TestOutputCollector();
+ combine_transform.prepare(context, oc);
+
+ combine_transform.onData(WindowedValue.of(
+ KV.of("a", 1), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
+ combine_transform.onData(WindowedValue.of(
+ KV.of("c", 1), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
+ combine_transform.onData(WindowedValue.of(
+ KV.of("b", 1), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
+
+ // Emit outputs of window1
+ combine_transform.onWatermark(watermark1);
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+ // Check outputs
+ assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+ assertEquals(2, oc.outputs.size());
+ checkOutput(KV.of("a", 1), oc.outputs.get(0).getValue());
+ checkOutput(KV.of("c", 1), oc.outputs.get(1).getValue());
+ oc.outputs.clear();
+ oc.watermarks.clear();
+
+ combine_transform.onData(WindowedValue.of(
+ KV.of("a", 1), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
+ combine_transform.onData(WindowedValue.of(
+ KV.of("c", 1), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
+
+ // Emit outputs of window2
+ combine_transform.onWatermark(watermark2);
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+ // Check outputs
+ assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows());
+ assertEquals(3, oc.outputs.size());
+ checkOutput(KV.of("a", 2), oc.outputs.get(0).getValue());
+ checkOutput(KV.of("b", 1), oc.outputs.get(1).getValue());
+ checkOutput(KV.of("c", 1), oc.outputs.get(2).getValue());
+ oc.outputs.clear();
+ oc.watermarks.clear();
+
+ combine_transform.onData(WindowedValue.of(
+ KV.of("b", 1), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING));
+ combine_transform.onData(WindowedValue.of(
+ KV.of("b", 1), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING));
+ combine_transform.onData(WindowedValue.of(
+ KV.of("a", 1), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING));
+
+ // Emit outputs of window3
+ combine_transform.onWatermark(watermark3);
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+ // Check outputs
+ assertEquals(Arrays.asList(window3), oc.outputs.get(0).getWindows());
+ checkOutput(KV.of("a", 1), oc.outputs.get(0).getValue());
+ checkOutput(KV.of("b", 2), oc.outputs.get(1).getValue());
+ checkOutput(KV.of("c", 1), oc.outputs.get(2).getValue());
+ oc.outputs.clear();
+ oc.watermarks.clear();
+
+
+ combine_transform.onData(WindowedValue.of(
+ KV.of("c", 3), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING));
+
+ // Emit outputs of window3
+ combine_transform.onWatermark(watermark4);
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+ // Check outputs
+ assertEquals(Arrays.asList(window4), oc.outputs.get(0).getWindows());
+ checkOutput(KV.of("a", 1), oc.outputs.get(0).getValue());
+ checkOutput(KV.of("b", 2), oc.outputs.get(1).getValue());
+ checkOutput(KV.of("c", 4), oc.outputs.get(2).getValue());
+
+ oc.outputs.clear();
+ oc.watermarks.clear();
+ }
+
+ // Test with late data
+ @Test
+ @SuppressWarnings("unchecked")
+ public void test_combine_lateData() {
+ final TupleTag<String> outputTag = new TupleTag<>("main-output");
+ final Duration lateness = Duration.standardSeconds(2);
+ final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(10))
+ .every(Duration.standardSeconds(5));
+
+ final Instant ts1 = new Instant(1000);
+ final Instant ts2 = new Instant(2000);
+ final Instant ts3 = new Instant(4500);
+ final Instant ts4 = new Instant(11000);
+ final Watermark watermark1 = new Watermark(6500);
+ final Watermark watermark2 = new Watermark(8000);
+
+ AppliedCombineFn<String, Integer, CountFn.Accum, Integer> applied_combine_fn =
+ AppliedCombineFn.withInputCoder(
+ combine_fn,
+ CoderRegistry.createDefault(),
+ KvCoder.of(STRING_CODER, INTEGER_CODER),
+ null,
+ WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES).withAllowedLateness(lateness)
+ );
+
+ final GBKTransform<String, Integer, Integer> combine_transform =
+ new GBKTransform(
+ NULL_OUTPUT_CODERS,
+ outputTag,
+ WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES).withAllowedLateness(lateness),
+ PipelineOptionsFactory.as(NemoPipelineOptions.class),
+ SystemReduceFn.combining(STRING_CODER, applied_combine_fn),
+ DoFnSchemaInformation.create(),
+ DisplayData.none());
+
+ // window1 : [-5000, 5000) in millisecond
+ // window2 : [0, 10000)
+ // window3 : [5000, 15000)
+ // window4 : [10000, 20000)
+ List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
+ Collections.sort(sortedWindows, IntervalWindow::compareTo);
+ final IntervalWindow window1 = sortedWindows.get(0);
+ final IntervalWindow window2 = sortedWindows.get(1);
+ sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
+ Collections.sort(sortedWindows, IntervalWindow::compareTo);
+ final IntervalWindow window3 = sortedWindows.get(0);
+ final IntervalWindow window4 = sortedWindows.get(1);
+
+ // Prepare to test
+ final Transform.Context context = mock(Transform.Context.class);
+ final TestOutputCollector<KV<String, Integer>> oc = new TestOutputCollector();
+ combine_transform.prepare(context, oc);
+
+ combine_transform.onData(WindowedValue.of(
+ KV.of("a", 1), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
+ combine_transform.onData(WindowedValue.of(
+ KV.of("b", 1), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
+
+ // On-time firing of window1. Skipping checking outputs since test1 checks output from non-late data
+ combine_transform.onWatermark(watermark1);
+ oc.outputs.clear();
+
+ // Late data in window 1. Should be accumulated since EOW + allowed lateness > current Watermark
+ combine_transform.onData(WindowedValue.of(
+ KV.of("a", 5), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
+
+ // Check outputs
+ assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+ assertEquals(1,oc.outputs.size());
+ assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+ checkOutput(KV.of("a", 6), oc.outputs.get(0).getValue());
+
+ oc.outputs.clear();
+ oc.watermarks.clear();
+
+ // Late data in window 1. Should NOT be accumulated to outputs of window1 since EOW + allowed lateness > current Watermark
+ combine_transform.onWatermark(watermark2);
+ combine_transform.onData(WindowedValue.of(
+ KV.of("a", 10), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+
+ // Check outputs
+ assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+ assertEquals(1, oc.outputs.size());
+ assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+ checkOutput(KV.of("a", 10), oc.outputs.get(0).getValue());
+ oc.outputs.clear();
+ oc.watermarks.clear();
+ }
+
+ // Now testing GroupbyKey Operation.
+
+ // window size: 2 sec
+ // interval size: 1 sec
+ //
+ // [--------------window2------------------------------]
+ // [----------------------- window1 --------------------------]
+ // [-------window0-------]
+ // ts1 -- ts2 -- ts3 -- w -- ts4 -- w2 -- ts5 --ts6 --ts7 -- w3 -- ts8 --ts9 - --w4
+ // (1, "hello")
+ // (1, "world")
+ // (2, "hello")
+ // ==> window1: {(1,["hello","world"]), (2, ["hello"])}
+ // (1, "a")
+ // (2,"a")
+ // (3,"a")
+ // (2,"b")
+ // => window2: {(1,"a"), (2,["a","b"]), (3,"a")}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void test_gbk() {
+
+ final TupleTag<String> outputTag = new TupleTag<>("main-output");
+ final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(2))
+ .every(Duration.standardSeconds(1));
+
+ final GBKTransform<String, String, Iterable<String>> doFnTransform =
+ new GBKTransform(
+ NULL_OUTPUT_CODERS,
+ outputTag,
+ WindowingStrategy.of(slidingWindows),
+ PipelineOptionsFactory.as(NemoPipelineOptions.class),
+ SystemReduceFn.buffering(STRING_CODER),
+ DoFnSchemaInformation.create(),
+ DisplayData.none());
+
+ final Instant ts1 = new Instant(1);
+ final Instant ts2 = new Instant(100);
+ final Instant ts3 = new Instant(300);
+ final Watermark watermark = new Watermark(1003);
+ final Instant ts4 = new Instant(1200);
+ final Watermark watermark2 = new Watermark(1400);
+ final Instant ts5 = new Instant(1600);
+ final Instant ts6 = new Instant(1800);
+ final Instant ts7 = new Instant(1900);
+ final Watermark watermark3 = new Watermark(2100);
+ final Instant ts8 = new Instant(2200);
+ final Instant ts9 = new Instant(2300);
+ final Watermark watermark4 = new Watermark(3000);
+
+
+ List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
+ Collections.sort(sortedWindows, IntervalWindow::compareTo);
+
+ // [0---1000)
+ final IntervalWindow window0 = sortedWindows.get(0);
+ // [0---2000)
+ final IntervalWindow window1 = sortedWindows.get(1);
+
+ sortedWindows.clear();
+ sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
+ Collections.sort(sortedWindows, IntervalWindow::compareTo);
+
+ // [1000--3000)
+ final IntervalWindow window2 = sortedWindows.get(1);
+
+
+ final Transform.Context context = mock(Transform.Context.class);
+ final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
+ doFnTransform.prepare(context, oc);
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
+
+ doFnTransform.onWatermark(watermark);
+
+ // output
+ // 1: ["hello", "world"]
+ // 2: ["hello"]
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+ // windowed result for key 1
+ assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows());
+ checkOutput2(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
+
+ // windowed result for key 2
+ assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows());
+ checkOutput2(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+
+ assertEquals(2, oc.outputs.size());
+ assertEquals(2, oc.watermarks.size());
+
+ // check output watermark
+ assertEquals(1000,
+ oc.watermarks.get(0).getTimestamp());
+
+ oc.outputs.clear();
+ oc.watermarks.clear();
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
+
+
+ doFnTransform.onWatermark(watermark2);
+
+ assertEquals(0, oc.outputs.size()); // do not emit anything
+ assertEquals(0, oc.watermarks.size());
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING));
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING));
+
+ // emit window1
+ doFnTransform.onWatermark(watermark3);
+
+ // output
+ // 1: ["hello", "world", "a"]
+ // 2: ["hello"]
+ // 3: ["a", "a", "b"]
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+
+ // windowed result for key 1
+ assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+ checkOutput2(KV.of("1", Arrays.asList("hello", "world", "a")), oc.outputs.get(0).getValue());
+
+ // windowed result for key 2
+ assertEquals(Arrays.asList(window1), oc.outputs.get(1).getWindows());
+ checkOutput2(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+
+ // windowed result for key 3
+ assertEquals(Arrays.asList(window1), oc.outputs.get(2).getWindows());
+ checkOutput2(KV.of("3", Arrays.asList("a", "a", "b")), oc.outputs.get(2).getValue());
+
+ // check output watermark
+ assertEquals(2000,
+ oc.watermarks.get(0).getTimestamp());
+
+ oc.outputs.clear();
+ oc.watermarks.clear();
+
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "a"), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING));
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("3", "b"), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING));
+
+ // emit window2
+ doFnTransform.onWatermark(watermark4);
+
+ // output
+ // 1: ["a", "a"]
+ // 3: ["a", "a", "b", "b"]
+ Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+ assertEquals(2, oc.outputs.size());
+
+ // windowed result for key 1
+ assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows());
+ checkOutput2(KV.of("1", Arrays.asList("a", "a")), oc.outputs.get(0).getValue());
+
+ // windowed result for key 3
+ assertEquals(Arrays.asList(window2), oc.outputs.get(1).getWindows());
+ checkOutput2(KV.of("3", Arrays.asList("a", "a", "b", "b")), oc.outputs.get(1).getValue());
+
+ // check output watermark
+ assertEquals(3000,
+ oc.watermarks.get(0).getTimestamp());
+
+ doFnTransform.close();
+ }
+
+ /**
+ * Test complex triggers that emit early and late firing.
+ */
+ @Test
+ public void test_gbk_eventTimeTrigger() {
+ final Duration lateness = Duration.standardSeconds(1);
+ final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow()
+ // early firing
+ .withEarlyFirings(
+ AfterProcessingTime
+ .pastFirstElementInPane()
+ // early firing 1 sec after receiving an element
+ .plusDelayOf(Duration.millis(1000)))
+ // late firing: Fire on any late data.
+ .withLateFirings(AfterPane.elementCountAtLeast(1));
+
+ final FixedWindows window = (FixedWindows) Window.into(
+ FixedWindows.of(Duration.standardSeconds(5)))
+ // lateness
+ .withAllowedLateness(lateness)
+ .triggering(trigger)
+ // TODO #308: Test discarding of refinements
+ .accumulatingFiredPanes().getWindowFn();
+
+ final TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ final GBKTransform<String, String, Iterable<String>> doFnTransform =
+ new GBKTransform(
+ NULL_OUTPUT_CODERS,
+ outputTag,
+ WindowingStrategy.of(window).withTrigger(trigger)
+ .withMode(ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(lateness),
+ PipelineOptionsFactory.as(NemoPipelineOptions.class),
+ SystemReduceFn.buffering(STRING_CODER),
+ DoFnSchemaInformation.create(),
+ DisplayData.none());
+
+
+ final Transform.Context context = mock(Transform.Context.class);
+ final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
+ doFnTransform.prepare(context, oc);
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING));
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // early firing is not related to the watermark progress
+ doFnTransform.onWatermark(new Watermark(2));
+ assertEquals(1, oc.outputs.size());
+ assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+ oc.outputs.clear();
+
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
+ // EARLY firing... waiting >= 1 sec
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // GBKTransform emits data when receiving watermark
+ // TODO #250: element-wise processing
+ doFnTransform.onWatermark(new Watermark(5));
+ assertEquals(1, oc.outputs.size());
+ assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+ // ACCUMULATION MODE
+ checkOutput2(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
+ oc.outputs.clear();
+
+ // ON TIME
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
+ doFnTransform.onWatermark(new Watermark(5001));
+ assertEquals(1, oc.outputs.size());
+ assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming());
+ // ACCUMULATION MODE
+ checkOutput2(KV.of("1", Arrays.asList("hello", "world", "!!")), oc.outputs.get(0).getValue());
+ oc.outputs.clear();
+
+ // LATE DATA
+ // actual window: [0-5000)
+ // allowed lateness: 1000 (ms)
+ // current watermark: 5001
+ // data: 1000
+ // End of current window + allowed lateness >= current watermark (4999 + 1000 >= 5001)
+ // so it should be accumulated to the prev window
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "bye!"), new Instant(1000),
+ window.assignWindow(new Instant(1000)), PaneInfo.NO_FIRING));
+ doFnTransform.onWatermark(new Watermark(6000));
+ assertEquals(1, oc.outputs.size());
+ assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+ // The data should be accumulated to the previous window because it allows 1 second lateness
+ checkOutput2(KV.of("1", Arrays.asList("hello", "world", "!!", "bye!")), oc.outputs.get(0).getValue());
+ oc.outputs.clear();
+
+ // LATE DATA
+ // actual window: [0-5000)
+ // allowed lateness: 1000 (ms)
+ // data timestamp: 4800
+ // current watermark: 6000
+ // End of current window + allowed lateness < current watermark (4999 + 1000 < 6000)
+ // It should not be accumulated to the prev window
+ doFnTransform.onData(WindowedValue.of(
+ KV.of("1", "hello again!"), new Instant(4800),
+ window.assignWindow(new Instant(4800)), PaneInfo.NO_FIRING));
+ doFnTransform.onWatermark(new Watermark(6300));
+ assertEquals(1, oc.outputs.size());
+ assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+ checkOutput2(KV.of("1", Arrays.asList("hello again!")), oc.outputs.get(0).getValue());
+ oc.outputs.clear();
+ doFnTransform.close();
+ }
+}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
deleted file mode 100644
index 1af392c..0000000
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.*;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.common.punctuation.Watermark;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.*;
-import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-public final class GroupByKeyAndWindowDoFnTransformTest {
- private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransformTest.class.getName());
- private final static Coder NULL_INPUT_CODER = null;
- private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
-
- private void checkOutput(final KV<String, List<String>> expected, final KV<String, Iterable<String>> result) {
-
- // check key
- assertEquals(expected.getKey(), result.getKey());
-
- // check value
- final List<String> resultValue = new ArrayList<>();
- final List<String> expectedValue = new ArrayList<>(expected.getValue());
- result.getValue().iterator().forEachRemaining(resultValue::add);
- Collections.sort(resultValue);
- Collections.sort(expectedValue);
-
- assertEquals(expectedValue, resultValue);
- }
-
-
- // window size: 2 sec
- // interval size: 1 sec
- //
- // [--------------window2------------------------------]
- // [----------------------- window1 --------------------------]
- // [-------window0-------]
- // ts1 -- ts2 -- ts3 -- w -- ts4 -- w2 -- ts5 --ts6 --ts7 -- w3 -- ts8 --ts9 - --w4
- // (1, "hello")
- // (1, "world")
- // (2, "hello")
- // ==> window1: {(1,["hello","world"]), (2, ["hello"])}
- // (1, "a")
- // (2,"a")
- // (3,"a")
- // (2,"b")
- // => window2: {(1,"a"), (2,["a","b"]), (3,"a")}
- @Test
- @SuppressWarnings("unchecked")
- public void test() {
-
- final TupleTag<String> outputTag = new TupleTag<>("main-output");
- final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(2))
- .every(Duration.standardSeconds(1));
-
- final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
- new GroupByKeyAndWindowDoFnTransform(
- NULL_OUTPUT_CODERS,
- outputTag,
- WindowingStrategy.of(slidingWindows),
- PipelineOptionsFactory.as(NemoPipelineOptions.class),
- SystemReduceFn.buffering(NULL_INPUT_CODER),
- DisplayData.none());
-
- final Instant ts1 = new Instant(1);
- final Instant ts2 = new Instant(100);
- final Instant ts3 = new Instant(300);
- final Watermark watermark = new Watermark(1003);
- final Instant ts4 = new Instant(1200);
- final Watermark watermark2 = new Watermark(1400);
- final Instant ts5 = new Instant(1600);
- final Instant ts6 = new Instant(1800);
- final Instant ts7 = new Instant(1900);
- final Watermark watermark3 = new Watermark(2100);
- final Instant ts8 = new Instant(2200);
- final Instant ts9 = new Instant(2300);
- final Watermark watermark4 = new Watermark(3000);
-
-
- List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
- Collections.sort(sortedWindows, IntervalWindow::compareTo);
-
- // [0---1000)
- final IntervalWindow window0 = sortedWindows.get(0);
- // [0---2000)
- final IntervalWindow window1 = sortedWindows.get(1);
-
- sortedWindows.clear();
- sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
- Collections.sort(sortedWindows, IntervalWindow::compareTo);
-
- // [1000--3000)
- final IntervalWindow window2 = sortedWindows.get(1);
-
-
- final Transform.Context context = mock(Transform.Context.class);
- final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
- doFnTransform.prepare(context, oc);
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
-
- doFnTransform.onWatermark(watermark);
-
- // output
- // 1: ["hello", "world"]
- // 2: ["hello"]
- Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
-
- // windowed result for key 1
- assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows());
- checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
-
- // windowed result for key 2
- assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows());
- checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
-
- assertEquals(2, oc.outputs.size());
- assertEquals(1, oc.watermarks.size());
-
- // check output watermark
- assertEquals(1000,
- oc.watermarks.get(0).getTimestamp());
-
- oc.outputs.clear();
- oc.watermarks.clear();
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
-
-
- doFnTransform.onWatermark(watermark2);
-
- assertEquals(0, oc.outputs.size()); // do not emit anything
- assertEquals(0, oc.watermarks.size());
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING));
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING));
-
- // emit window1
- doFnTransform.onWatermark(watermark3);
-
- // output
- // 1: ["hello", "world", "a"]
- // 2: ["hello"]
- // 3: ["a", "a", "b"]
- Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
-
-
- // windowed result for key 1
- assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
- checkOutput(KV.of("1", Arrays.asList("hello", "world", "a")), oc.outputs.get(0).getValue());
-
- // windowed result for key 2
- assertEquals(Arrays.asList(window1), oc.outputs.get(1).getWindows());
- checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
-
- // windowed result for key 3
- assertEquals(Arrays.asList(window1), oc.outputs.get(2).getWindows());
- checkOutput(KV.of("3", Arrays.asList("a", "a", "b")), oc.outputs.get(2).getValue());
-
- // check output watermark
- assertEquals(2000,
- oc.watermarks.get(0).getTimestamp());
-
- oc.outputs.clear();
- oc.watermarks.clear();
-
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("1", "a"), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING));
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("3", "b"), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING));
-
- // emit window2
- doFnTransform.onWatermark(watermark4);
-
- // output
- // 1: ["a", "a"]
- // 3: ["a", "a", "b", "b"]
- Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
-
- assertEquals(2, oc.outputs.size());
-
- // windowed result for key 1
- assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows());
- checkOutput(KV.of("1", Arrays.asList("a", "a")), oc.outputs.get(0).getValue());
-
- // windowed result for key 3
- assertEquals(Arrays.asList(window2), oc.outputs.get(1).getWindows());
- checkOutput(KV.of("3", Arrays.asList("a", "a", "b", "b")), oc.outputs.get(1).getValue());
-
- // check output watermark
- assertEquals(3000,
- oc.watermarks.get(0).getTimestamp());
-
- doFnTransform.close();
- }
-
- /**
- * Test complex triggers that emit early and late firing.
- */
- @Test
- public void eventTimeTriggerTest() {
- final Duration lateness = Duration.standardSeconds(1);
- final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow()
- // early firing
- .withEarlyFirings(
- AfterProcessingTime
- .pastFirstElementInPane()
- // early firing 1 sec after receiving an element
- .plusDelayOf(Duration.millis(1000)))
- // late firing: Fire on any late data.
- .withLateFirings(AfterPane.elementCountAtLeast(1));
-
- final FixedWindows window = (FixedWindows) Window.into(
- FixedWindows.of(Duration.standardSeconds(5)))
- // lateness
- .withAllowedLateness(lateness)
- .triggering(trigger)
- // TODO #308: Test discarding of refinements
- .accumulatingFiredPanes().getWindowFn();
-
- final TupleTag<String> outputTag = new TupleTag<>("main-output");
- final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
- new GroupByKeyAndWindowDoFnTransform(
- NULL_OUTPUT_CODERS,
- outputTag,
- WindowingStrategy.of(window).withTrigger(trigger)
- .withMode(ACCUMULATING_FIRED_PANES)
- .withAllowedLateness(lateness),
- PipelineOptionsFactory.as(NemoPipelineOptions.class),
- SystemReduceFn.buffering(NULL_INPUT_CODER),
- DisplayData.none());
-
-
- final Transform.Context context = mock(Transform.Context.class);
- final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
- doFnTransform.prepare(context, oc);
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING));
-
- // early firing is not related to the watermark progress
- doFnTransform.onWatermark(new Watermark(2));
- assertEquals(1, oc.outputs.size());
- assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
- LOG.info("Output: {}", oc.outputs.get(0));
- oc.outputs.clear();
-
- doFnTransform.onData(WindowedValue.of(
- KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
- // EARLY firing... waiting >= 1 sec
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- // GBKTransform emits data when receiving watermark
- // TODO #250: element-wise processing
- doFnTransform.onWatermark(new Watermark(5));
- assertEquals(1, oc.outputs.size());
- assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
- // ACCUMULATION MODE
- checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
- LOG.info("Output: {}", oc.outputs.get(0));
- oc.outputs.clear();
-
- // ON TIME
- doFnTransform.onData(WindowedValue.of(
- KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
- doFnTransform.onWatermark(new Watermark(5001));
- assertEquals(1, oc.outputs.size());
- assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming());
- LOG.info("Output: {}", oc.outputs.get(0));
- // ACCUMULATION MODE
- checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), oc.outputs.get(0).getValue());
- oc.outputs.clear();
-
- // LATE DATA
- // actual window: [0-5000)
- // allowed lateness: 1000 (ms)
- // current watermark: 5001
- // data: 4500
- // the data timestamp + allowed lateness > current watermark,
- // so it should be accumulated to the prev window
- doFnTransform.onData(WindowedValue.of(
- KV.of("1", "bye!"), new Instant(4500),
- window.assignWindow(new Instant(4500)), PaneInfo.NO_FIRING));
- doFnTransform.onWatermark(new Watermark(6000));
- assertEquals(1, oc.outputs.size());
- assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
- LOG.info("Output: {}", oc.outputs.get(0));
- // The data should be accumulated to the previous window because it allows 1 second lateness
- checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!", "bye!")), oc.outputs.get(0).getValue());
- oc.outputs.clear();
-
- // LATE DATA
- // data timestamp: 4800
- // current watermark: 6000
- // data timestamp + allowed lateness < current watermark
- // It should not be accumulated to the prev window
- doFnTransform.onData(WindowedValue.of(
- KV.of("1", "hello again!"), new Instant(4800),
- window.assignWindow(new Instant(4800)), PaneInfo.NO_FIRING));
- doFnTransform.onWatermark(new Watermark(6300));
- assertEquals(1, oc.outputs.size());
- assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
- LOG.info("Output: {}", oc.outputs.get(0));
- checkOutput(KV.of("1", Arrays.asList("hello again!")), oc.outputs.get(0).getValue());
- oc.outputs.clear();
-
-
- doFnTransform.close();
-
- }
-}