You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/08 00:21:35 UTC
[incubator-nemo] branch master updated: [NEMO-272] Fix incorrect
uses of Beam FinishBundle (#154)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new dbad69d [NEMO-272] Fix incorrect uses of Beam FinishBundle (#154)
dbad69d is described below
commit dbad69d4df40cc6dc64087526822e051de09aadf
Author: John Yang <jo...@gmail.com>
AuthorDate: Thu Nov 8 09:16:48 2018 +0900
[NEMO-272] Fix incorrect uses of Beam FinishBundle (#154)
JIRA: [NEMO-272: Fix incorrect uses of Beam FinishBundle](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-272)
**Major changes:**
- Fixs GenericSourceSink
- Fixs MultinomialLogisticRegression
- Adds TODOs for fixing the root causes
---
.../nemo/examples/beam/GenericSourceSink.java | 19 ++++++++--------
.../beam/MultinomialLogisticRegression.java | 25 ++++++++++------------
2 files changed, 20 insertions(+), 24 deletions(-)
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 4e3d49c..2ab09a7 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -132,14 +132,14 @@ final class HDFSWrite extends DoFn<String, Void> {
}
/**
- * Start bundle.
- * The number of output files are determined according to the parallelism.
+ * Writes to exactly one file.
+ * (The number of total output files are determined according to the parallelism.)
* i.e. if parallelism is 2, then there are total 2 output files.
- * Each output file is written as a bundle.
- * @param c bundle context {@link StartBundleContext}
*/
- @StartBundle
- public void startBundle(final StartBundleContext c) {
+ @Setup
+ public void setup() {
+ // Creating a side-effect in Setup is discouraged, but we do it anyways for now as we're extending DoFn.
+ // TODO #273: Our custom HDFSWrite should implement WriteOperation
fileName = new Path(path + UUID.randomUUID().toString());
try {
fileSystem = fileName.getFileSystem(new JobConf());
@@ -166,12 +166,11 @@ final class HDFSWrite extends DoFn<String, Void> {
}
/**
- * finish bundle.
- * @param c context
+ * Teardown.
* @throws IOException output stream exception
*/
- @FinishBundle
- public void finishBundle(final FinishBundleContext c) throws IOException {
+ @Teardown
+ public void tearDown() throws IOException {
outputStream.close();
}
}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index f0f30f6..921b862 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -18,8 +18,6 @@
*/
package org.apache.nemo.examples.beam;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
import org.apache.nemo.common.Pair;
@@ -61,6 +59,9 @@ public final class MultinomialLogisticRegression {
private final PCollectionView<Map<Integer, List<Double>>> modelView;
private Map<Integer, List<Double>> model;
+ // TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
+ private ProcessContext savedContextHack;
+
/**
* Constructor for CalculateGradient DoFn class.
* @param modelView PCollectionView of the model.
@@ -124,6 +125,9 @@ public final class MultinomialLogisticRegression {
*/
@ProcessElement
public void processElement(final ProcessContext c) throws Exception {
+ // TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
+ savedContextHack = c;
+
final KV<Integer, Pair<List<Integer>, List<Double>>> data = parseLine(c.element());
if (data == null) { // comments and newlines
return;
@@ -224,15 +228,15 @@ public final class MultinomialLogisticRegression {
}
/**
- * FinishBundle method for BEAM.
- * @param c Context.
+ * Teardown, since this logic at the moment should be executed exactly once after consuming the final data element.
+ * TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
*/
- @FinishBundle
- public void finishBundle(final FinishBundleContext c) {
+ @Teardown
+ public void tearDown() {
for (Integer i = 0; i < gradients.size(); i++) {
// this enforces a global window (batching),
// where all data elements of the corresponding PCollection are grouped and emitted downstream together
- c.output(KV.of(i, gradients.get(i)), BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
+ savedContextHack.output(KV.of(i, gradients.get(i)));
}
LOG.info("stats: " + gradients.get(numClasses - 1).toString());
}
@@ -302,13 +306,6 @@ public final class MultinomialLogisticRegression {
c.output(KV.of(kv.getKey(), ret));
}
}
-
- /**
- * FinishBundle method for BEAM.
- */
- @FinishBundle
- public void finishBundle() {
- }
}
/**