You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/07 03:48:40 UTC
[incubator-nemo] branch master updated: [NEMO-268] Consider
start/finish bundles in Transform (#152)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 837dc3e [NEMO-268] Consider start/finish bundles in Transform (#152)
837dc3e is described below
commit 837dc3e3693e3349b9f9098f54bbcb45245ed98b
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Wed Nov 7 12:48:36 2018 +0900
[NEMO-268] Consider start/finish bundles in Transform (#152)
JIRA: [NEMO-268: Consider start/finish bundles in Transform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-268)
**Major changes:**
- Add `checkAndInvokeBundle` and `checkAndFinishBundle` methods to `AbstractDoFnTransform`
- Use the methods in `DoFnTransform` and `GroupByKeyAndWindowDoFnTransform`
**Minor changes to note:**
- Add getters/setters for count and millis bundles to `NemoPipelineOptions`
**Tests for the changes:**
- Add `testCountBundle()` and `testTimeBundle()` to `DoFnTransformTest`
---
.../frontend/beam/NemoPipelineOptions.java | 13 +++
.../beam/transform/AbstractDoFnTransform.java | 49 +++++++-
.../frontend/beam/transform/DoFnTransform.java | 4 +
.../GroupByKeyAndWindowDoFnTransform.java | 6 +
.../frontend/beam/transform/DoFnTransformTest.java | 125 +++++++++++++++++++++
.../examples/beam/WindowedWordCountITCase.java | 2 +-
6 files changed, 195 insertions(+), 4 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java
index c07ae48..1dd3f06 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java
@@ -18,10 +18,23 @@
*/
package org.apache.nemo.compiler.frontend.beam;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
/**
* NemoPipelineOptions.
*/
public interface NemoPipelineOptions extends PipelineOptions {
+ @Description("The maximum number of elements in a bundle.")
+ @Default.Long(1000)
+ Long getMaxBundleSize();
+
+ void setMaxBundleSize(Long size);
+
+ @Description("The maximum time to wait before finalising a bundle (in milliseconds).")
+ @Default.Long(1000)
+ Long getMaxBundleTimeMills();
+
+ void setMaxBundleTimeMills(Long time);
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 92a1f16..3585ea2 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -32,6 +32,8 @@ import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
@@ -46,6 +48,7 @@ import java.util.Map;
*/
public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDoFnTransform.class.getName());
private final TupleTag<OutputT> mainOutputTag;
private final List<TupleTag<?>> additionalOutputTags;
@@ -62,6 +65,16 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
private transient DoFnRunners.OutputManager outputManager;
+ // For bundle
+ // we consider count and time millis for start/finish bundle
+ // if # of processed elements > bundleSize
+ // or elapsed time > bundleMillis, we finish the current bundle and start a new one
+ private transient long bundleSize;
+ private transient long bundleMillis;
+ private long prevBundleStartTime;
+ private long currBundleCount = 0;
+ private boolean bundleFinished = true;
+
/**
* AbstractDoFnTransform constructor.
* @param doFn doFn
@@ -115,12 +128,42 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
return doFn;
}
+ /**
+ * Checks whether the bundle is finished or not.
+ * Starts the bundle if it is done.
+ */
+ protected final void checkAndInvokeBundle() {
+ if (bundleFinished) {
+ bundleFinished = false;
+ doFnRunner.startBundle();
+ prevBundleStartTime = System.currentTimeMillis();
+ currBundleCount = 0;
+ }
+ currBundleCount += 1;
+ }
+
+
+ /**
+ * Checks whether it is time to finish the bundle and finish it.
+ */
+ protected final void checkAndFinishBundle() {
+ if (!bundleFinished) {
+ if (currBundleCount >= bundleSize || System.currentTimeMillis() - prevBundleStartTime >= bundleMillis) {
+ bundleFinished = true;
+ doFnRunner.finishBundle();
+ }
+ }
+ }
+
@Override
public final void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
// deserialize pipeline option
final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
this.outputCollector = oc;
+ this.bundleMillis = options.getMaxBundleTimeMills();
+ this.bundleSize = options.getMaxBundleSize();
+
// create output manager
outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);
@@ -162,8 +205,6 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
inputCoder,
outputCoders,
windowingStrategy);
-
- doFnRunner.startBundle();
}
public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
@@ -173,7 +214,9 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
@Override
public final void close() {
beforeClose();
- doFnRunner.finishBundle();
+ if (!bundleFinished) {
+ doFnRunner.finishBundle();
+ }
doFnInvoker.invokeTeardown();
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 4a57ada..433f9df 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -64,14 +64,18 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
@Override
public void onData(final WindowedValue<InputT> data) {
+ checkAndInvokeBundle();
getDoFnRunner().processElement(data);
+ checkAndFinishBundle();
}
@Override
public void onWatermark(final Watermark watermark) {
+ checkAndInvokeBundle();
// TODO #216: We should consider push-back data that waits for side input
// TODO #216: If there are push-back data, input watermark >= output watermark
getOutputCollector().emitWatermark(watermark);
+ checkAndFinishBundle();
}
@Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 7d20f26..3727846 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -103,6 +103,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
*/
@Override
public void onData(final WindowedValue<KV<K, InputT>> element) {
+ checkAndInvokeBundle();
+
// We can call Beam's DoFnRunner#processElement here,
// but it may generate some overheads if we call the method for each data.
// The `processElement` requires a `Iterator` of data, so we emit the buffered data every watermark.
@@ -111,6 +113,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
final KV<K, InputT> kv = element.getValue();
keyToValues.putIfAbsent(kv.getKey(), new ArrayList<>());
keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
+
+ checkAndFinishBundle();
}
/**
@@ -158,7 +162,9 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
@Override
public void onWatermark(final Watermark inputWatermark) {
+ checkAndInvokeBundle();
processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
+ checkAndFinishBundle();
}
/**
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index b2fbaeb..bad7584 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -20,12 +20,14 @@ package org.apache.nemo.compiler.frontend.beam.transform;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -90,6 +92,98 @@ public final class DoFnTransformTest {
doFnTransform.close();
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCountBundle() {
+
+ final TupleTag<String> outputTag = new TupleTag<>("main-output");
+ final NemoPipelineOptions pipelineOptions = PipelineOptionsFactory.as(NemoPipelineOptions.class);
+ pipelineOptions.setMaxBundleSize(3L);
+ pipelineOptions.setMaxBundleTimeMills(10000000L);
+
+ final List<Integer> bundleOutput = new ArrayList<>();
+
+ final DoFnTransform<String, String> doFnTransform =
+ new DoFnTransform<>(
+ new BundleTestDoFn(bundleOutput),
+ NULL_INPUT_CODER,
+ NULL_OUTPUT_CODERS,
+ outputTag,
+ Collections.emptyList(),
+ WindowingStrategy.globalDefault(),
+ emptyList(), /* side inputs */
+ pipelineOptions);
+
+ final Transform.Context context = mock(Transform.Context.class);
+ final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
+ doFnTransform.prepare(context, oc);
+
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+
+ assertEquals(3, (int) bundleOutput.get(0));
+
+ bundleOutput.clear();
+
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+
+ assertEquals(3, (int) bundleOutput.get(0));
+
+ doFnTransform.close();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTimeBundle() {
+
+ final long maxBundleTimeMills = 1000L;
+ final TupleTag<String> outputTag = new TupleTag<>("main-output");
+ final NemoPipelineOptions pipelineOptions = PipelineOptionsFactory.as(NemoPipelineOptions.class);
+ pipelineOptions.setMaxBundleSize(10000000L);
+ pipelineOptions.setMaxBundleTimeMills(maxBundleTimeMills);
+
+ final List<Integer> bundleOutput = new ArrayList<>();
+
+ final DoFnTransform<String, String> doFnTransform =
+ new DoFnTransform<>(
+ new BundleTestDoFn(bundleOutput),
+ NULL_INPUT_CODER,
+ NULL_OUTPUT_CODERS,
+ outputTag,
+ Collections.emptyList(),
+ WindowingStrategy.globalDefault(),
+ emptyList(), /* side inputs */
+ pipelineOptions);
+
+ final Transform.Context context = mock(Transform.Context.class);
+ final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
+
+ long startTime = System.currentTimeMillis();
+ doFnTransform.prepare(context, oc);
+
+ int count = 0;
+ while (bundleOutput.isEmpty()) {
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+ count += 1;
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ long endTime = System.currentTimeMillis();
+ assertEquals(count, (int) bundleOutput.get(0));
+ assertTrue(endTime - startTime >= maxBundleTimeMills);
+
+ doFnTransform.close();
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testMultiOutputOutput() {
@@ -194,11 +288,42 @@ public final class DoFnTransformTest {
doFnTransform.close();
}
+
+ /**
+ * Bundle test do fn.
+ */
+ private static class BundleTestDoFn extends DoFn<String, String> {
+ int count;
+
+ private final List<Integer> bundleOutput;
+
+ BundleTestDoFn(final List<Integer> bundleOutput) {
+ this.bundleOutput = bundleOutput;
+ }
+
+ @ProcessElement
+ public void processElement(final ProcessContext c) throws Exception {
+ count += 1;
+ c.output(c.element());
+ }
+
+ @StartBundle
+ public void startBundle(final StartBundleContext c) {
+ count = 0;
+ }
+
+ @FinishBundle
+ public void finishBundle(final FinishBundleContext c) {
+ bundleOutput.add(count);
+ }
+ }
+
/**
* Identitiy do fn.
* @param <T> type
*/
private static class IdentityDoFn<T> extends DoFn<T, T> {
+
@ProcessElement
public void processElement(final ProcessContext c) throws Exception {
c.output(c.element());
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index 57303fc..55ed19d 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -34,7 +34,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
public final class WindowedWordCountITCase {
-
+
private static final int TIMEOUT = 120000;
private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";