You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/07 09:55:30 UTC
[incubator-nemo] branch master updated: [NEMO-81] Support the Beam
'Partition' transform (#90)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 65ad522 [NEMO-81] Support the Beam 'Partition' transform (#90)
65ad522 is described below
commit 65ad5225e19ea71333ffc803e78e3a7f6ff6579b
Author: Seonghyun Park <se...@gmail.com>
AuthorDate: Tue Aug 7 18:55:28 2018 +0900
[NEMO-81] Support the Beam 'Partition' transform (#90)
JIRA: [NEMO-81: Support the Beam 'Partition' transform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-81)
**Major changes:**
- Supports Beam partition transform
**Minor changes to note:**
- Match coder string to determine whether or not to return Void.TYPE to children stages
**Tests for the changes:**
- Added PerPercentileAverageITCase
**Other comments:**
- I've left some comments regarding Beam VoidCoder to ParentDataFetcher
resolves [NEMO-81](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-81)
---
.../nemo/examples/beam/PerPercentileAverage.java | 138 +++++++++++++++++++++
.../examples/beam/PerPercentileAverageITCase.java | 74 +++++++++++
examples/resources/expected_output_partition_0 | 1 +
examples/resources/expected_output_partition_1 | 1 +
examples/resources/expected_output_partition_2 | 1 +
examples/resources/expected_output_partition_3 | 1 +
examples/resources/expected_output_partition_4 | 1 +
examples/resources/expected_output_partition_5 | 1 +
examples/resources/expected_output_partition_6 | 1 +
examples/resources/expected_output_partition_7 | 0
examples/resources/expected_output_partition_8 | 1 +
examples/resources/expected_output_partition_9 | 1 +
examples/resources/test_input_partition | 28 +++++
.../executor/task/ParentTaskDataFetcher.java | 17 ++-
.../executor/task/ParentTaskDataFetcherTest.java | 48 ++++++-
15 files changed, 308 insertions(+), 6 deletions(-)
diff --git a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java
new file mode 100644
index 0000000..61c6b1d
--- /dev/null
+++ b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import com.google.common.collect.Lists;
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Per percentile statistics application.
+ */
+public final class PerPercentileAverage {
+ /**
+ * Private Constructor.
+ */
+ private PerPercentileAverage() {
+ }
+
+ /**
+ * Main function for the MR BEAM program.
+ *
+ * @param args arguments.
+ */
+ public static void main(final String[] args) {
+ final String inputFilePath = args[0];
+ final String outputFilePath = args[1];
+ final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+ options.setRunner(NemoPipelineRunner.class);
+ options.setJobName("PerPercentileAverage");
+
+ final Pipeline p = Pipeline.create(options);
+
+ PCollection<Student> students = GenericSourceSink.read(p, inputFilePath)
+ .apply(ParDo.of(new DoFn<String, Student>() {
+ @ProcessElement
+ public void processElement(final ProcessContext c) {
+ String[] line = c.element().split(" ");
+ c.output(new Student(Integer.parseInt(line[0]), Integer.parseInt(line[1]), Integer.parseInt(line[2])));
+ }
+ }))
+ .setCoder(SerializableCoder.of(Student.class));
+
+ PCollectionList<Student> studentsByPercentile =
+ // Make sure that each partition contain at least one element.
+ // If there are empty PCollections, successive WriteFiles may fail.
+ students.apply(Partition.of(10, new Partition.PartitionFn<Student>() {
+ public int partitionFor(final Student student, final int numPartitions) {
+ return student.getPercentile() / numPartitions;
+ }
+ }));
+
+ PCollection<String> [] results = new PCollection[10];
+ for (int i = 0; i < 10; i++) {
+ results[i] = studentsByPercentile.get(i)
+ .apply(MapElements.via(new SimpleFunction<Student, KV<String, Integer>>() {
+ @Override
+ public KV<String, Integer> apply(final Student student) {
+ return KV.of("", student.getScore());
+ }
+ }))
+ .apply(GroupByKey.create())
+ .apply(MapElements.via(new SimpleFunction<KV<String, Iterable<Integer>>, String>() {
+ @Override
+ public String apply(final KV<String, Iterable<Integer>> kv) {
+ List<Integer> scores = Lists.newArrayList(kv.getValue());
+ final int sum = scores.stream().reduce(0, (Integer x, Integer y) -> x + y);
+ return scores.size() + " " + (double) sum / scores.size();
+ }
+ }));
+ GenericSourceSink.write(results[i], outputFilePath + "_" + i);
+ }
+
+ p.run();
+ }
+
+ /**
+ * Student Class.
+ */
+ static class Student implements Serializable {
+ private int id;
+ private int percentile;
+ private int score;
+
+ Student(final int id, final int percentile, final int score) {
+ this.id = id;
+ this.percentile = percentile;
+ this.score = score;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(final int id) {
+ this.id = id;
+ }
+
+ public int getPercentile() {
+ return percentile;
+ }
+
+ public void setPercentile(final int percentile) {
+ this.percentile = percentile;
+ }
+
+ public int getScore() {
+ return score;
+ }
+
+ public void setScore(final int score) {
+ this.score = score;
+ }
+ }
+}
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java
new file mode 100644
index 0000000..eaeeddc
--- /dev/null
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.test.ArgBuilder;
+import edu.snu.nemo.common.test.ExampleTestUtil;
+import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test PerPercentile Average program with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class PerPercentileAverageITCase {
+ private static final int TIMEOUT = 120000;
+ private static ArgBuilder builder;
+ private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+ private static final String inputFileName = "test_input_partition";
+ private static final String outputFileName = "test_output_partition";
+ private static final String expectedOutputFileName = "expected_output_partition";
+ private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json";
+ private static final String inputFilePath = fileBasePath + inputFileName;
+ private static final String outputFilePath = fileBasePath + outputFileName;
+
+ @Before
+ public void setUp() throws Exception {
+ builder = new ArgBuilder()
+ .addResourceJson(executorResourceFileName)
+ .addUserMain(PerPercentileAverage.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ for (int i = 0; i < 10; i++) {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath,
+ outputFileName + "_" + i,
+ expectedOutputFileName + "_" + i);
+ }
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
+ }
+
+ @Test (timeout = TIMEOUT)
+ public void test() throws Exception {
+ JobLauncher.main(builder
+ .addJobId(PerPercentileAverage.class.getSimpleName())
+ .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+}
diff --git a/examples/resources/expected_output_partition_0 b/examples/resources/expected_output_partition_0
new file mode 100644
index 0000000..c03e1b0
--- /dev/null
+++ b/examples/resources/expected_output_partition_0
@@ -0,0 +1 @@
+3 13.0
diff --git a/examples/resources/expected_output_partition_1 b/examples/resources/expected_output_partition_1
new file mode 100644
index 0000000..8bfe99f
--- /dev/null
+++ b/examples/resources/expected_output_partition_1
@@ -0,0 +1 @@
+2 22.0
diff --git a/examples/resources/expected_output_partition_2 b/examples/resources/expected_output_partition_2
new file mode 100644
index 0000000..6bd3675
--- /dev/null
+++ b/examples/resources/expected_output_partition_2
@@ -0,0 +1 @@
+2 30.0
diff --git a/examples/resources/expected_output_partition_3 b/examples/resources/expected_output_partition_3
new file mode 100644
index 0000000..c3a9fe4
--- /dev/null
+++ b/examples/resources/expected_output_partition_3
@@ -0,0 +1 @@
+3 36.333333333333336
diff --git a/examples/resources/expected_output_partition_4 b/examples/resources/expected_output_partition_4
new file mode 100644
index 0000000..96f2bca
--- /dev/null
+++ b/examples/resources/expected_output_partition_4
@@ -0,0 +1 @@
+3 46.333333333333336
diff --git a/examples/resources/expected_output_partition_5 b/examples/resources/expected_output_partition_5
new file mode 100644
index 0000000..aed062c
--- /dev/null
+++ b/examples/resources/expected_output_partition_5
@@ -0,0 +1 @@
+2 62.0
diff --git a/examples/resources/expected_output_partition_6 b/examples/resources/expected_output_partition_6
new file mode 100644
index 0000000..86bf440
--- /dev/null
+++ b/examples/resources/expected_output_partition_6
@@ -0,0 +1 @@
+4 67.25
diff --git a/examples/resources/expected_output_partition_7 b/examples/resources/expected_output_partition_7
new file mode 100644
index 0000000..e69de29
diff --git a/examples/resources/expected_output_partition_8 b/examples/resources/expected_output_partition_8
new file mode 100644
index 0000000..d8d34a8
--- /dev/null
+++ b/examples/resources/expected_output_partition_8
@@ -0,0 +1 @@
+4 89.75
diff --git a/examples/resources/expected_output_partition_9 b/examples/resources/expected_output_partition_9
new file mode 100644
index 0000000..15efcb0
--- /dev/null
+++ b/examples/resources/expected_output_partition_9
@@ -0,0 +1 @@
+5 96.0
diff --git a/examples/resources/test_input_partition b/examples/resources/test_input_partition
new file mode 100644
index 0000000..95ee9dc
--- /dev/null
+++ b/examples/resources/test_input_partition
@@ -0,0 +1,28 @@
+1 14 23
+2 49 57
+3 23 29
+5 37 37
+6 41 39
+7 57 63
+8 66 69
+9 91 92
+10 93 93
+11 97 98
+12 96 97
+13 82 90
+14 88 91
+15 7 13
+16 11 21
+17 3 9
+18 81 88
+19 99 100
+20 62 65
+21 83 90
+22 26 31
+23 31 34
+24 39 38
+25 44 43
+26 50 61
+27 61 64
+28 67 71
+29 8 17
\ No newline at end of file
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 68ba2c1..91c80d0 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -15,6 +15,8 @@
*/
package edu.snu.nemo.runtime.executor.task;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
@@ -24,6 +26,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
@@ -111,9 +114,17 @@ class ParentTaskDataFetcher extends DataFetcher {
if (currentIteratorIndex == expectedNumOfIterators) {
// Entire fetcher is done
if (noElementAtAll) {
- // This shouldn't normally happen, except for cases such as when Beam's VoidCoder is used.
- noElementAtAll = false;
- return Void.TYPE;
+ final Optional<DecoderFactory> decoderFactory =
+ readersForParentTask.getRuntimeEdge().getPropertyValue(DecoderProperty.class);
+
+ // TODO #173: Properly handle zero-element task outputs. Currently fetchDataElement relies on
+ // toString() method to distinguish whether to return Void.TYPE or not.
+ if (decoderFactory.get().toString().equals("VoidCoder")) {
+ noElementAtAll = false;
+ return Void.TYPE;
+ } else {
+ return null;
+ }
} else {
// This whole fetcher's done
return null;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 931e751..3f1fd3d 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -15,7 +15,11 @@
*/
package edu.snu.nemo.runtime.executor.task;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
import org.junit.Test;
@@ -31,6 +35,7 @@ import java.util.concurrent.Executors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockingDetails;
import static org.mockito.Mockito.when;
/**
@@ -41,10 +46,11 @@ import static org.mockito.Mockito.when;
public final class ParentTaskDataFetcherTest {
@Test(timeout=5000)
- public void testEmpty() throws Exception {
- // InputReader
+ public void testVoid() throws Exception {
+ // TODO #173: Properly handle zero-element. This test should be updated too.
final List<String> dataElements = new ArrayList<>(0); // empty data
- final InputReader inputReader = generateInputReader(generateCompletableFuture(dataElements.iterator()));
+ final InputReader inputReader = generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
+ "VoidCoder");
// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
@@ -54,6 +60,20 @@ public final class ParentTaskDataFetcherTest {
}
@Test(timeout=5000)
+ public void testEmpty() throws Exception {
+ // TODO #173: Properly handle zero-element. This test should be updated too.
+ final List<String> dataElements = new ArrayList<>(0); // empty data
+ final InputReader inputReader = generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
+ "IntCoder");
+
+ // Fetcher
+ final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
+
+ // Should return Void.TYPE
+ assertEquals(null, fetcher.fetchDataElement());
+ }
+
+ @Test(timeout=5000)
public void testNonEmpty() throws Exception {
// InputReader
final String singleData = "Single";
@@ -111,6 +131,28 @@ public final class ParentTaskDataFetcherTest {
false);
}
+
+ private DecoderFactory generateCoder(final String coder) {
+ final DecoderFactory decoderFactory = mock(DecoderFactory.class);
+ when(decoderFactory.toString()).thenReturn(coder);
+ return decoderFactory;
+ }
+
+ private RuntimeEdge generateEdge(final String coder) {
+ final String runtimeIREdgeId = "Runtime edge with coder";
+ final ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
+ edgeProperties.put(DecoderProperty.of(generateCoder(coder)));
+ return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, mock(IRVertex.class), mock(IRVertex.class), false);
+ }
+
+ private InputReader generateInputReaderWithCoder(final CompletableFuture completableFuture, final String coder) {
+ final InputReader inputReader = mock(InputReader.class);
+ when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
+ final RuntimeEdge runtimeEdge = generateEdge(coder);
+ when(inputReader.getRuntimeEdge()).thenReturn(runtimeEdge);
+ return inputReader;
+ }
+
private InputReader generateInputReader(final CompletableFuture completableFuture) {
final InputReader inputReader = mock(InputReader.class);
when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));