You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/08 00:21:35 UTC

[incubator-nemo] branch master updated: [NEMO-272] Fix incorrect uses of Beam FinishBundle (#154)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new dbad69d  [NEMO-272] Fix incorrect uses of Beam FinishBundle (#154)
dbad69d is described below

commit dbad69d4df40cc6dc64087526822e051de09aadf
Author: John Yang <jo...@gmail.com>
AuthorDate: Thu Nov 8 09:16:48 2018 +0900

    [NEMO-272] Fix incorrect uses of Beam FinishBundle (#154)
    
    JIRA: [NEMO-272: Fix incorrect uses of Beam FinishBundle](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-272)
    
    **Major changes:**
    - Fixs GenericSourceSink
    - Fixs MultinomialLogisticRegression
    - Adds TODOs for fixing the root causes
---
 .../nemo/examples/beam/GenericSourceSink.java      | 19 ++++++++--------
 .../beam/MultinomialLogisticRegression.java        | 25 ++++++++++------------
 2 files changed, 20 insertions(+), 24 deletions(-)

diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 4e3d49c..2ab09a7 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -132,14 +132,14 @@ final class HDFSWrite extends DoFn<String, Void> {
   }
 
   /**
-   * Start bundle.
-   * The number of output files are determined according to the parallelism.
+   * Writes to exactly one file.
+   * (The number of total output files are determined according to the parallelism.)
    * i.e. if parallelism is 2, then there are total 2 output files.
-   * Each output file is written as a bundle.
-   * @param c      bundle context {@link StartBundleContext}
    */
-  @StartBundle
-  public void startBundle(final StartBundleContext c) {
+  @Setup
+  public void setup() {
+    // Creating a side-effect in Setup is discouraged, but we do it anyways for now as we're extending DoFn.
+    // TODO #273: Our custom HDFSWrite should implement WriteOperation
     fileName = new Path(path + UUID.randomUUID().toString());
     try {
       fileSystem = fileName.getFileSystem(new JobConf());
@@ -166,12 +166,11 @@ final class HDFSWrite extends DoFn<String, Void> {
   }
 
   /**
-   * finish bundle.
-   * @param c             context
+   * Teardown.
    * @throws IOException  output stream exception
    */
-  @FinishBundle
-  public void finishBundle(final FinishBundleContext c) throws IOException {
+  @Teardown
+  public void tearDown() throws IOException {
     outputStream.close();
   }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index f0f30f6..921b862 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -18,8 +18,6 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
 import org.apache.nemo.common.Pair;
@@ -61,6 +59,9 @@ public final class MultinomialLogisticRegression {
     private final PCollectionView<Map<Integer, List<Double>>> modelView;
     private Map<Integer, List<Double>> model;
 
+    // TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
+    private ProcessContext savedContextHack;
+
     /**
      * Constructor for CalculateGradient DoFn class.
      * @param modelView PCollectionView of the model.
@@ -124,6 +125,9 @@ public final class MultinomialLogisticRegression {
      */
     @ProcessElement
     public void processElement(final ProcessContext c) throws Exception {
+      // TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
+      savedContextHack = c;
+
       final KV<Integer, Pair<List<Integer>, List<Double>>> data = parseLine(c.element());
       if (data == null) { // comments and newlines
         return;
@@ -224,15 +228,15 @@ public final class MultinomialLogisticRegression {
     }
 
     /**
-     * FinishBundle method for BEAM.
-     * @param c Context.
+     * Teardown, since this logic at the moment should be executed exactly once after consuming the final data element.
+     * TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
      */
-    @FinishBundle
-    public void finishBundle(final FinishBundleContext c) {
+    @Teardown
+    public void tearDown() {
       for (Integer i = 0; i < gradients.size(); i++) {
         // this enforces a global window (batching),
         // where all data elements of the corresponding PCollection are grouped and emitted downstream together
-        c.output(KV.of(i, gradients.get(i)), BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
+        savedContextHack.output(KV.of(i, gradients.get(i)));
       }
       LOG.info("stats: " + gradients.get(numClasses - 1).toString());
     }
@@ -302,13 +306,6 @@ public final class MultinomialLogisticRegression {
         c.output(KV.of(kv.getKey(), ret));
       }
     }
-
-    /**
-     * FinishBundle method for BEAM.
-     */
-    @FinishBundle
-    public void finishBundle() {
-    }
   }
 
   /**