You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/07 09:55:30 UTC

[incubator-nemo] branch master updated: [NEMO-81] Support the Beam 'Partition' transform (#90)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 65ad522  [NEMO-81] Support the Beam 'Partition' transform (#90)
65ad522 is described below

commit 65ad5225e19ea71333ffc803e78e3a7f6ff6579b
Author: Seonghyun Park <se...@gmail.com>
AuthorDate: Tue Aug 7 18:55:28 2018 +0900

    [NEMO-81] Support the Beam 'Partition' transform (#90)
    
    JIRA: [NEMO-81: Support the Beam 'Partition' transform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-81)
    
    **Major changes:**
    - Supports Beam partition transform
    
    **Minor changes to note:**
    - Match coder string to determine whether or not to return Void.TYPE to children stages
    
    **Tests for the changes:**
    - Added PerPercentileAverageITCase
    
    **Other comments:**
    - I've left some comments regarding Beam VoidCoder to ParentDataFetcher
    
    resolves [NEMO-81](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-81)
---
 .../nemo/examples/beam/PerPercentileAverage.java   | 138 +++++++++++++++++++++
 .../examples/beam/PerPercentileAverageITCase.java  |  74 +++++++++++
 examples/resources/expected_output_partition_0     |   1 +
 examples/resources/expected_output_partition_1     |   1 +
 examples/resources/expected_output_partition_2     |   1 +
 examples/resources/expected_output_partition_3     |   1 +
 examples/resources/expected_output_partition_4     |   1 +
 examples/resources/expected_output_partition_5     |   1 +
 examples/resources/expected_output_partition_6     |   1 +
 examples/resources/expected_output_partition_7     |   0
 examples/resources/expected_output_partition_8     |   1 +
 examples/resources/expected_output_partition_9     |   1 +
 examples/resources/test_input_partition            |  28 +++++
 .../executor/task/ParentTaskDataFetcher.java       |  17 ++-
 .../executor/task/ParentTaskDataFetcherTest.java   |  48 ++++++-
 15 files changed, 308 insertions(+), 6 deletions(-)

diff --git a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java
new file mode 100644
index 0000000..61c6b1d
--- /dev/null
+++ b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import com.google.common.collect.Lists;
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Per percentile statistics application.
+ */
+public final class PerPercentileAverage {
+  /**
+   * Private Constructor.
+   */
+  private PerPercentileAverage() {
+  }
+
+  /**
+   * Main function for the MR BEAM program.
+   *
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String inputFilePath = args[0];
+    final String outputFilePath = args[1];
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("PerPercentileAverage");
+
+    final Pipeline p = Pipeline.create(options);
+
+    PCollection<Student> students = GenericSourceSink.read(p, inputFilePath)
+        .apply(ParDo.of(new DoFn<String, Student>() {
+          @ProcessElement
+          public void processElement(final ProcessContext c) {
+            String[] line = c.element().split(" ");
+            c.output(new Student(Integer.parseInt(line[0]), Integer.parseInt(line[1]), Integer.parseInt(line[2])));
+          }
+        }))
+        .setCoder(SerializableCoder.of(Student.class));
+
+    PCollectionList<Student> studentsByPercentile =
+        // Make sure that each partition contain at least one element.
+        // If there are empty PCollections, successive WriteFiles may fail.
+        students.apply(Partition.of(10, new Partition.PartitionFn<Student>() {
+          public int partitionFor(final Student student, final int numPartitions) {
+            return student.getPercentile() / numPartitions;
+          }
+        }));
+
+    PCollection<String> [] results  = new PCollection[10];
+    for (int i = 0; i < 10; i++) {
+      results[i] = studentsByPercentile.get(i)
+          .apply(MapElements.via(new SimpleFunction<Student, KV<String, Integer>>() {
+            @Override
+            public KV<String, Integer> apply(final Student student) {
+              return KV.of("", student.getScore());
+            }
+          }))
+          .apply(GroupByKey.create())
+          .apply(MapElements.via(new SimpleFunction<KV<String, Iterable<Integer>>, String>() {
+            @Override
+            public String apply(final KV<String, Iterable<Integer>> kv) {
+              List<Integer> scores = Lists.newArrayList(kv.getValue());
+              final int sum = scores.stream().reduce(0, (Integer x, Integer y) -> x + y);
+              return scores.size() + " " + (double) sum / scores.size();
+            }
+          }));
+      GenericSourceSink.write(results[i], outputFilePath + "_" + i);
+    }
+
+    p.run();
+  }
+
+  /**
+   * Student Class.
+   */
+  static class Student implements Serializable {
+    private int id;
+    private int percentile;
+    private int score;
+
+    Student(final int id, final int percentile, final int score) {
+      this.id = id;
+      this.percentile = percentile;
+      this.score = score;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    public void setId(final int id) {
+      this.id = id;
+    }
+
+    public int getPercentile() {
+      return percentile;
+    }
+
+    public void setPercentile(final int percentile) {
+      this.percentile = percentile;
+    }
+
+    public int getScore() {
+      return score;
+    }
+
+    public void setScore(final int score) {
+      this.score = score;
+    }
+  }
+}
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java
new file mode 100644
index 0000000..eaeeddc
--- /dev/null
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.test.ArgBuilder;
+import edu.snu.nemo.common.test.ExampleTestUtil;
+import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test PerPercentile Average program with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class PerPercentileAverageITCase {
+  private static final int TIMEOUT = 120000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+  private static final String inputFileName = "test_input_partition";
+  private static final String outputFileName = "test_output_partition";
+  private static final String expectedOutputFileName = "expected_output_partition";
+  private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json";
+  private static final String inputFilePath =  fileBasePath + inputFileName;
+  private static final String outputFilePath =  fileBasePath + outputFileName;
+
+  @Before
+  public void setUp() throws Exception {
+    builder = new ArgBuilder()
+      .addResourceJson(executorResourceFileName)
+      .addUserMain(PerPercentileAverage.class.getCanonicalName())
+      .addUserArgs(inputFilePath, outputFilePath);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      for (int i = 0; i < 10; i++) {
+        ExampleTestUtil.ensureOutputValidity(fileBasePath,
+            outputFileName + "_" + i,
+            expectedOutputFileName + "_" + i);
+      }
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+
+  @Test (timeout = TIMEOUT)
+  public void test() throws Exception {
+    JobLauncher.main(builder
+      .addJobId(PerPercentileAverage.class.getSimpleName())
+      .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+      .build());
+  }
+}
diff --git a/examples/resources/expected_output_partition_0 b/examples/resources/expected_output_partition_0
new file mode 100644
index 0000000..c03e1b0
--- /dev/null
+++ b/examples/resources/expected_output_partition_0
@@ -0,0 +1 @@
+3 13.0
diff --git a/examples/resources/expected_output_partition_1 b/examples/resources/expected_output_partition_1
new file mode 100644
index 0000000..8bfe99f
--- /dev/null
+++ b/examples/resources/expected_output_partition_1
@@ -0,0 +1 @@
+2 22.0
diff --git a/examples/resources/expected_output_partition_2 b/examples/resources/expected_output_partition_2
new file mode 100644
index 0000000..6bd3675
--- /dev/null
+++ b/examples/resources/expected_output_partition_2
@@ -0,0 +1 @@
+2 30.0
diff --git a/examples/resources/expected_output_partition_3 b/examples/resources/expected_output_partition_3
new file mode 100644
index 0000000..c3a9fe4
--- /dev/null
+++ b/examples/resources/expected_output_partition_3
@@ -0,0 +1 @@
+3 36.333333333333336
diff --git a/examples/resources/expected_output_partition_4 b/examples/resources/expected_output_partition_4
new file mode 100644
index 0000000..96f2bca
--- /dev/null
+++ b/examples/resources/expected_output_partition_4
@@ -0,0 +1 @@
+3 46.333333333333336
diff --git a/examples/resources/expected_output_partition_5 b/examples/resources/expected_output_partition_5
new file mode 100644
index 0000000..aed062c
--- /dev/null
+++ b/examples/resources/expected_output_partition_5
@@ -0,0 +1 @@
+2 62.0
diff --git a/examples/resources/expected_output_partition_6 b/examples/resources/expected_output_partition_6
new file mode 100644
index 0000000..86bf440
--- /dev/null
+++ b/examples/resources/expected_output_partition_6
@@ -0,0 +1 @@
+4 67.25
diff --git a/examples/resources/expected_output_partition_7 b/examples/resources/expected_output_partition_7
new file mode 100644
index 0000000..e69de29
diff --git a/examples/resources/expected_output_partition_8 b/examples/resources/expected_output_partition_8
new file mode 100644
index 0000000..d8d34a8
--- /dev/null
+++ b/examples/resources/expected_output_partition_8
@@ -0,0 +1 @@
+4 89.75
diff --git a/examples/resources/expected_output_partition_9 b/examples/resources/expected_output_partition_9
new file mode 100644
index 0000000..15efcb0
--- /dev/null
+++ b/examples/resources/expected_output_partition_9
@@ -0,0 +1 @@
+5 96.0
diff --git a/examples/resources/test_input_partition b/examples/resources/test_input_partition
new file mode 100644
index 0000000..95ee9dc
--- /dev/null
+++ b/examples/resources/test_input_partition
@@ -0,0 +1,28 @@
+1 14 23
+2 49 57
+3 23 29
+5 37 37
+6 41 39
+7 57 63
+8 66 69
+9 91 92
+10 93 93
+11 97 98
+12 96 97
+13 82 90
+14 88 91
+15 7 13
+16 11 21
+17 3 9
+18 81 88
+19 99 100
+20 62 65
+21 83 90
+22 26 31
+23 31 34
+24 39 38
+25 44 43
+26 50 61
+27 61 64
+28 67 71
+29 8 17
\ No newline at end of file
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 68ba2c1..91c80d0 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -15,6 +15,8 @@
  */
 package edu.snu.nemo.runtime.executor.task;
 
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
@@ -24,6 +26,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -111,9 +114,17 @@ class ParentTaskDataFetcher extends DataFetcher {
         if (currentIteratorIndex == expectedNumOfIterators) {
           // Entire fetcher is done
           if (noElementAtAll) {
-            // This shouldn't normally happen, except for cases such as when Beam's VoidCoder is used.
-            noElementAtAll = false;
-            return Void.TYPE;
+            final Optional<DecoderFactory> decoderFactory =
+                readersForParentTask.getRuntimeEdge().getPropertyValue(DecoderProperty.class);
+
+            // TODO #173: Properly handle zero-element task outputs. Currently fetchDataElement relies on
+            // toString() method to distinguish whether to return Void.TYPE or not.
+            if (decoderFactory.get().toString().equals("VoidCoder")) {
+              noElementAtAll = false;
+              return Void.TYPE;
+            } else {
+              return null;
+            }
           } else {
             // This whole fetcher's done
             return null;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 931e751..3f1fd3d 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -15,7 +15,11 @@
  */
 package edu.snu.nemo.runtime.executor.task;
 
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
 import org.junit.Test;
@@ -31,6 +35,7 @@ import java.util.concurrent.Executors;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockingDetails;
 import static org.mockito.Mockito.when;
 
 /**
@@ -41,10 +46,11 @@ import static org.mockito.Mockito.when;
 public final class ParentTaskDataFetcherTest {
 
   @Test(timeout=5000)
-  public void testEmpty() throws Exception {
-    // InputReader
+  public void testVoid() throws Exception {
+    // TODO #173: Properly handle zero-element. This test should be updated too.
     final List<String> dataElements = new ArrayList<>(0); // empty data
-    final InputReader inputReader = generateInputReader(generateCompletableFuture(dataElements.iterator()));
+    final InputReader inputReader = generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
+        "VoidCoder");
 
     // Fetcher
     final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
@@ -54,6 +60,20 @@ public final class ParentTaskDataFetcherTest {
   }
 
   @Test(timeout=5000)
+  public void testEmpty() throws Exception {
+    // TODO #173: Properly handle zero-element. This test should be updated too.
+    final List<String> dataElements = new ArrayList<>(0); // empty data
+    final InputReader inputReader = generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
+        "IntCoder");
+
+    // Fetcher
+    final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
+
+    // Should return Void.TYPE
+    assertEquals(null, fetcher.fetchDataElement());
+  }
+
+  @Test(timeout=5000)
   public void testNonEmpty() throws Exception {
     // InputReader
     final String singleData = "Single";
@@ -111,6 +131,28 @@ public final class ParentTaskDataFetcherTest {
         false);
   }
 
+
+  private DecoderFactory generateCoder(final String coder) {
+    final DecoderFactory decoderFactory = mock(DecoderFactory.class);
+    when(decoderFactory.toString()).thenReturn(coder);
+    return decoderFactory;
+  }
+
+  private RuntimeEdge generateEdge(final String coder) {
+    final String runtimeIREdgeId = "Runtime edge with coder";
+    final ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
+    edgeProperties.put(DecoderProperty.of(generateCoder(coder)));
+    return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, mock(IRVertex.class), mock(IRVertex.class), false);
+  }
+
+  private InputReader generateInputReaderWithCoder(final CompletableFuture completableFuture, final String coder) {
+    final InputReader inputReader = mock(InputReader.class);
+    when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
+    final RuntimeEdge runtimeEdge = generateEdge(coder);
+    when(inputReader.getRuntimeEdge()).thenReturn(runtimeEdge);
+    return inputReader;
+  }
+
   private InputReader generateInputReader(final CompletableFuture completableFuture) {
     final InputReader inputReader = mock(InputReader.class);
     when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));