You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/07 03:48:40 UTC

[incubator-nemo] branch master updated: [NEMO-268] Consider start/finish bundles in Transform (#152)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 837dc3e  [NEMO-268] Consider start/finish bundles in Transform (#152)
837dc3e is described below

commit 837dc3e3693e3349b9f9098f54bbcb45245ed98b
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Wed Nov 7 12:48:36 2018 +0900

    [NEMO-268] Consider start/finish bundles in Transform (#152)
    
    JIRA: [NEMO-268: Consider start/finish bundles in Transform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-268)
    
    **Major changes:**
    - Add `checkAndInvokeBundle` and `checkAndFinishBundle` methods to `AbstractDoFnTransform`
    - Use the methods in `DoFnTransform` and `GroupByKeyAndWindowDoFnTransform`
    
    **Minor changes to note:**
    - Add getters/setters for count and millis bundles to `NemoPipelineOptions`
    
    **Tests for the changes:**
    - Add `testCountBundle()` and `testTimeBundle()` to `DoFnTransformTest`
---
 .../frontend/beam/NemoPipelineOptions.java         |  13 +++
 .../beam/transform/AbstractDoFnTransform.java      |  49 +++++++-
 .../frontend/beam/transform/DoFnTransform.java     |   4 +
 .../GroupByKeyAndWindowDoFnTransform.java          |   6 +
 .../frontend/beam/transform/DoFnTransformTest.java | 125 +++++++++++++++++++++
 .../examples/beam/WindowedWordCountITCase.java     |   2 +-
 6 files changed, 195 insertions(+), 4 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java
index c07ae48..1dd3f06 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java
@@ -18,10 +18,23 @@
  */
 package org.apache.nemo.compiler.frontend.beam;
 
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * NemoPipelineOptions.
  */
 public interface NemoPipelineOptions extends PipelineOptions {
+  @Description("The maximum number of elements in a bundle.")
+  @Default.Long(1000)
+  Long getMaxBundleSize();
+
+  void setMaxBundleSize(Long size);
+
+  @Description("The maximum time to wait before finalising a bundle (in milliseconds).")
+  @Default.Long(1000)
+  Long getMaxBundleTimeMills();
+
+  void setMaxBundleTimeMills(Long time);
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 92a1f16..3585ea2 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -32,6 +32,8 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.List;
@@ -46,6 +48,7 @@ import java.util.Map;
  */
 public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
   Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractDoFnTransform.class.getName());
 
   private final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> additionalOutputTags;
@@ -62,6 +65,16 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
   private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
   private transient DoFnRunners.OutputManager outputManager;
 
+  // For bundle
+  // we consider count and time millis for start/finish bundle
+  // if # of processed elements > bundleSize
+  // or elapsed time > bundleMillis, we finish the current bundle and start a new one
+  private transient long bundleSize;
+  private transient long bundleMillis;
+  private long prevBundleStartTime;
+  private long currBundleCount = 0;
+  private boolean bundleFinished = true;
+
   /**
    * AbstractDoFnTransform constructor.
    * @param doFn doFn
@@ -115,12 +128,42 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
     return doFn;
   }
 
+  /**
+   * Checks whether the bundle is finished or not.
+   * Starts the bundle if it is done.
+   */
+  protected final void checkAndInvokeBundle() {
+    if (bundleFinished) {
+      bundleFinished = false;
+      doFnRunner.startBundle();
+      prevBundleStartTime = System.currentTimeMillis();
+      currBundleCount = 0;
+    }
+    currBundleCount += 1;
+  }
+
+
+  /**
+   * Checks whether it is time to finish the bundle and finish it.
+   */
+  protected final void checkAndFinishBundle() {
+    if (!bundleFinished) {
+      if (currBundleCount >= bundleSize || System.currentTimeMillis() - prevBundleStartTime >= bundleMillis) {
+        bundleFinished = true;
+        doFnRunner.finishBundle();
+      }
+    }
+  }
+
   @Override
   public final void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
     // deserialize pipeline option
     final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
     this.outputCollector = oc;
 
+    this.bundleMillis = options.getMaxBundleTimeMills();
+    this.bundleSize = options.getMaxBundleSize();
+
     // create output manager
     outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);
 
@@ -162,8 +205,6 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
       inputCoder,
       outputCoders,
       windowingStrategy);
-
-    doFnRunner.startBundle();
   }
 
   public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
@@ -173,7 +214,9 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
   @Override
   public final void close() {
     beforeClose();
-    doFnRunner.finishBundle();
+    if (!bundleFinished) {
+      doFnRunner.finishBundle();
+    }
     doFnInvoker.invokeTeardown();
   }
 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 4a57ada..433f9df 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -64,14 +64,18 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
 
   @Override
   public void onData(final WindowedValue<InputT> data) {
+    checkAndInvokeBundle();
     getDoFnRunner().processElement(data);
+    checkAndFinishBundle();
   }
 
   @Override
   public void onWatermark(final Watermark watermark) {
+    checkAndInvokeBundle();
     // TODO #216: We should consider push-back data that waits for side input
     // TODO #216: If there are push-back data, input watermark >= output watermark
     getOutputCollector().emitWatermark(watermark);
+    checkAndFinishBundle();
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 7d20f26..3727846 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -103,6 +103,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
    */
   @Override
   public void onData(final WindowedValue<KV<K, InputT>> element) {
+    checkAndInvokeBundle();
+
     // We can call Beam's DoFnRunner#processElement here,
     // but it may generate some overheads if we call the method for each data.
     // The `processElement` requires a `Iterator` of data, so we emit the buffered data every watermark.
@@ -111,6 +113,8 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
     final KV<K, InputT> kv = element.getValue();
     keyToValues.putIfAbsent(kv.getKey(), new ArrayList<>());
     keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
+
+    checkAndFinishBundle();
   }
 
   /**
@@ -158,7 +162,9 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
 
   @Override
   public void onWatermark(final Watermark inputWatermark) {
+    checkAndInvokeBundle();
     processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
+    checkAndFinishBundle();
   }
 
   /**
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index b2fbaeb..bad7584 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -20,12 +20,14 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -90,6 +92,98 @@ public final class DoFnTransformTest {
     doFnTransform.close();
   }
 
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCountBundle() {
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final NemoPipelineOptions pipelineOptions = PipelineOptionsFactory.as(NemoPipelineOptions.class);
+    pipelineOptions.setMaxBundleSize(3L);
+    pipelineOptions.setMaxBundleTimeMills(10000000L);
+
+    final List<Integer> bundleOutput = new ArrayList<>();
+
+    final DoFnTransform<String, String> doFnTransform =
+      new DoFnTransform<>(
+        new BundleTestDoFn(bundleOutput),
+        NULL_INPUT_CODER,
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        Collections.emptyList(),
+        WindowingStrategy.globalDefault(),
+        emptyList(), /* side inputs */
+        pipelineOptions);
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+
+    assertEquals(3, (int) bundleOutput.get(0));
+
+    bundleOutput.clear();
+
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+
+    assertEquals(3, (int) bundleOutput.get(0));
+
+    doFnTransform.close();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testTimeBundle() {
+
+    final long maxBundleTimeMills = 1000L;
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final NemoPipelineOptions pipelineOptions = PipelineOptionsFactory.as(NemoPipelineOptions.class);
+    pipelineOptions.setMaxBundleSize(10000000L);
+    pipelineOptions.setMaxBundleTimeMills(maxBundleTimeMills);
+
+    final List<Integer> bundleOutput = new ArrayList<>();
+
+    final DoFnTransform<String, String> doFnTransform =
+      new DoFnTransform<>(
+        new BundleTestDoFn(bundleOutput),
+        NULL_INPUT_CODER,
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        Collections.emptyList(),
+        WindowingStrategy.globalDefault(),
+        emptyList(), /* side inputs */
+        pipelineOptions);
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
+
+    long startTime = System.currentTimeMillis();
+    doFnTransform.prepare(context, oc);
+
+    int count = 0;
+    while (bundleOutput.isEmpty()) {
+      doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+      count += 1;
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+    }
+
+    long endTime = System.currentTimeMillis();
+    assertEquals(count, (int) bundleOutput.get(0));
+    assertTrue(endTime - startTime >= maxBundleTimeMills);
+
+    doFnTransform.close();
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testMultiOutputOutput() {
@@ -194,11 +288,42 @@ public final class DoFnTransformTest {
     doFnTransform.close();
   }
 
+
+  /**
+   * Bundle test do fn.
+   */
+  private static class BundleTestDoFn extends DoFn<String, String> {
+    int count;
+
+    private final List<Integer> bundleOutput;
+
+    BundleTestDoFn(final List<Integer> bundleOutput) {
+      this.bundleOutput = bundleOutput;
+    }
+
+    @ProcessElement
+    public void processElement(final ProcessContext c) throws Exception {
+      count += 1;
+      c.output(c.element());
+    }
+
+    @StartBundle
+    public void startBundle(final StartBundleContext c) {
+      count = 0;
+    }
+
+    @FinishBundle
+    public void finishBundle(final FinishBundleContext c) {
+      bundleOutput.add(count);
+    }
+  }
+
   /**
    * Identitiy do fn.
    * @param <T> type
    */
   private static class IdentityDoFn<T> extends DoFn<T, T> {
+
     @ProcessElement
     public void processElement(final ProcessContext c) throws Exception {
       c.output(c.element());
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index 57303fc..55ed19d 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -34,7 +34,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public final class WindowedWordCountITCase {
-  
+
   private static final int TIMEOUT = 120000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";