You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/09/10 03:54:11 UTC
[incubator-nemo] branch master updated: [NEMO-203] Beam SQL
Aggregation fails to match RowCoder in Combine transform (#114)
This is an automated email from the ASF dual-hosted git repository.
jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 84b4a0c [NEMO-203] Beam SQL Aggregation fails to match RowCoder in Combine transform (#114)
84b4a0c is described below
commit 84b4a0cb8345bea2fb637f8a955e1d5f2dcd1e0f
Author: John Yang <jo...@gmail.com>
AuthorDate: Mon Sep 10 12:54:09 2018 +0900
[NEMO-203] Beam SQL Aggregation fails to match RowCoder in Combine transform (#114)
JIRA: [NEMO-203: Beam SQL Aggregation fails to match RowCoder in Combine transform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-203)
**Major changes:**
- SimpleSQL: Runs two BeamSQL queries
**Minor changes to note:**
- Disable NemoPipelineResult#waitUntilFinish, which hangs the job (filed NEMO-208)
- Disable local combiner optimization for BeamSQL (filed NEMO-209)
**Tests for the changes:**
- BeamSQLITCase#testSimpleSQL
**Other comments:**
- N/A
Closes #GITHUB_PR_NUMBER
---
.../compiler/frontend/beam/NemoPipelineResult.java | 11 ++-
.../compiler/frontend/beam/PipelineTranslator.java | 107 +++++++++++++--------
examples/beam/pom.xml | 5 +
.../apache/nemo/examples/beam/SimpleSumSQL.java | 89 +++++++++++++++++
.../nemo/examples/beam/BeamSimpleSumSQLITCase.java | 68 +++++++++++++
examples/resources/expected_output_simplesql | 1 +
6 files changed, 239 insertions(+), 42 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
index 259eadd..0228fef 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
@@ -21,7 +21,6 @@ import org.apache.beam.sdk.metrics.MetricResults;
import org.joda.time.Duration;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
/**
* Beam result.
@@ -47,12 +46,18 @@ public final class NemoPipelineResult extends ClientEndpoint implements Pipeline
@Override
public State waitUntilFinish(final Duration duration) {
- return (State) super.waitUntilJobFinish(duration.getMillis(), TimeUnit.MILLISECONDS);
+ throw new UnsupportedOperationException();
+ // TODO #208: NemoPipelineResult#waitUntilFinish hangs
+ // Previous code that hangs the job:
+ // return (State) super.waitUntilJobFinish(duration.getMillis(), TimeUnit.MILLISECONDS);
}
@Override
public State waitUntilFinish() {
- return (State) super.waitUntilJobFinish();
+ throw new UnsupportedOperationException();
+ // TODO #208: NemoPipelineResult#waitUntilFinish hangs
+ // Previous code that hangs the job:
+ // return (State) super.waitUntilJobFinish();
}
@Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index d06ee67..6ef96bf 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -35,6 +35,8 @@ import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.annotation.*;
import java.lang.reflect.InvocationTargetException;
@@ -44,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Stack;
import java.util.function.BiFunction;
+import java.util.stream.Stream;
/**
* Converts DAG of Beam pipeline to Nemo IR DAG.
@@ -54,6 +57,8 @@ import java.util.function.BiFunction;
public final class PipelineTranslator
implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
+ private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
+
private static final PipelineTranslator INSTANCE = new PipelineTranslator();
private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap<>();
@@ -216,11 +221,23 @@ public final class PipelineTranslator
private static void combineTranslator(final TranslationContext ctx,
final CompositeTransformVertex transformVertex,
final PTransform<?, ?> transform) {
+ // No optimization for BeamSQL that handles Beam 'Row's.
+ final boolean handlesBeamRow = Stream
+ .concat(transformVertex.getNode().getInputs().values().stream(),
+ transformVertex.getNode().getOutputs().values().stream())
+ .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and output of combine should be KV
+ .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV
+ .anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
+ if (handlesBeamRow) {
+ transformVertex.getDAG().topologicalDo(ctx::translate);
+ return; // return early and give up optimization - TODO #209: Enable Local Combiner for BeamSQL
+ }
+
+ // Local combiner optimization
final List<TransformVertex> topologicalOrdering = transformVertex.getDAG().getTopologicalSort();
- final TransformVertex first = topologicalOrdering.get(0);
+ final TransformVertex groupByKeyBeamTransform = topologicalOrdering.get(0);
final TransformVertex last = topologicalOrdering.get(topologicalOrdering.size() - 1);
-
- if (first.getNode().getTransform() instanceof GroupByKey) {
+ if (groupByKeyBeamTransform.getNode().getTransform() instanceof GroupByKey) {
// Translate the given CompositeTransform under OneToOneEdge-enforced context.
final TranslationContext oneToOneEdgeContext = new TranslationContext(ctx,
OneToOneCommunicationPatternSelector.INSTANCE);
@@ -229,12 +246,12 @@ public final class PipelineTranslator
// Attempt to translate the CompositeTransform again.
// Add GroupByKey, which is the first transform in the given CompositeTransform.
// Make sure it consumes the output from the last vertex in OneToOneEdge-translated hierarchy.
- final IRVertex groupByKey = new OperatorVertex(new GroupByKeyTransform());
- ctx.addVertex(groupByKey);
+ final IRVertex groupByKeyIRVertex = new OperatorVertex(new GroupByKeyTransform());
+ ctx.addVertex(groupByKeyIRVertex);
last.getNode().getOutputs().values().forEach(outputFromCombiner
- -> ctx.addEdgeTo(groupByKey, outputFromCombiner));
- first.getNode().getOutputs().values()
- .forEach(outputFromGroupByKey -> ctx.registerMainOutputFrom(groupByKey, outputFromGroupByKey));
+ -> ctx.addEdgeTo(groupByKeyIRVertex, outputFromCombiner));
+ groupByKeyBeamTransform.getNode().getOutputs().values()
+ .forEach(outputFromGroupByKey -> ctx.registerMainOutputFrom(groupByKeyIRVertex, outputFromGroupByKey));
// Translate the remaining vertices.
topologicalOrdering.stream().skip(1).forEach(ctx::translate);
@@ -288,6 +305,48 @@ public final class PipelineTranslator
Class<? extends PTransform>[] value();
}
+ private static Coder<?> getCoder(final PValue input, final CompositeTransformVertex pipeline) {
+ final Coder<?> coder;
+ if (input instanceof PCollection) {
+ coder = ((PCollection) input).getCoder();
+ } else if (input instanceof PCollectionView) {
+ coder = getCoderForView((PCollectionView) input, pipeline);
+ } else {
+ throw new RuntimeException(String.format("Coder for PValue %s cannot be determined", input));
+ }
+ return coder;
+ }
+
+ /**
+ * Get appropriate coder for {@link PCollectionView}.
+ *
+ * @param view {@link PCollectionView} from the corresponding {@link View.CreatePCollectionView} transform
+ * @return appropriate {@link Coder} for {@link PCollectionView}
+ */
+ private static Coder<?> getCoderForView(final PCollectionView view, final CompositeTransformVertex pipeline) {
+ final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
+ final Coder<?> baseCoder = src.getNode().getInputs().values().stream()
+ .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst()
+ .orElseThrow(() -> new RuntimeException(String.format("No incoming PCollection to %s", src)))
+ .getCoder();
+ final ViewFn viewFn = view.getViewFn();
+ if (viewFn instanceof PCollectionViews.IterableViewFn) {
+ return IterableCoder.of(baseCoder);
+ } else if (viewFn instanceof PCollectionViews.ListViewFn) {
+ return ListCoder.of(baseCoder);
+ } else if (viewFn instanceof PCollectionViews.MapViewFn) {
+ final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
+ return MapCoder.of(inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+ } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
+ final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
+ return MapCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()));
+ } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
+ return baseCoder;
+ } else {
+ throw new UnsupportedOperationException(String.format("Unsupported viewFn %s", viewFn.getClass()));
+ }
+ }
+
/**
* Translation context.
*/
@@ -422,7 +481,7 @@ public final class PipelineTranslator
if (input instanceof PCollection) {
coder = ((PCollection) input).getCoder();
} else if (input instanceof PCollectionView) {
- coder = getCoderForView((PCollectionView) input);
+ coder = getCoderForView((PCollectionView) input, pipeline);
} else {
coder = null;
}
@@ -473,36 +532,6 @@ public final class PipelineTranslator
pValueToTag.put(output, tag);
pValueToProducer.put(output, irVertex);
}
-
- /**
- * Get appropriate coder for {@link PCollectionView}.
- *
- * @param view {@link PCollectionView} from the corresponding {@link View.CreatePCollectionView} transform
- * @return appropriate {@link Coder} for {@link PCollectionView}
- */
- private Coder<?> getCoderForView(final PCollectionView view) {
- final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
- final Coder<?> baseCoder = src.getNode().getInputs().values().stream()
- .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst()
- .orElseThrow(() -> new RuntimeException(String.format("No incoming PCollection to %s", src)))
- .getCoder();
- final ViewFn viewFn = view.getViewFn();
- if (viewFn instanceof PCollectionViews.IterableViewFn) {
- return IterableCoder.of(baseCoder);
- } else if (viewFn instanceof PCollectionViews.ListViewFn) {
- return ListCoder.of(baseCoder);
- } else if (viewFn instanceof PCollectionViews.MapViewFn) {
- final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
- return MapCoder.of(inputCoder.getKeyCoder(), inputCoder.getValueCoder());
- } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
- final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
- return MapCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()));
- } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
- return baseCoder;
- } else {
- throw new UnsupportedOperationException(String.format("Unsupported viewFn %s", viewFn.getClass()));
- }
- }
}
/**
diff --git a/examples/beam/pom.xml b/examples/beam/pom.xml
index 35f31e4..f6fc5e0 100644
--- a/examples/beam/pom.xml
+++ b/examples/beam/pom.xml
@@ -53,6 +53,11 @@ limitations under the License.
<version>${beam.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-sql</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
new file mode 100644
index 0000000..29976d2
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.values.*;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A simple SQL application.
+ * (Copied/Refined from the example code in the Beam repository)
+ */
+public final class SimpleSumSQL {
+ /**
+ * Private Constructor.
+ */
+ private SimpleSumSQL() {
+ }
+
+ /**
+ * @param args arguments.
+ */
+ public static void main(final String[] args) {
+ final String outputFilePath = args[0];
+
+ final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+ options.setRunner(NemoPipelineRunner.class);
+ options.setJobName("SimpleSumSQL");
+ final Pipeline p = Pipeline.create(options);
+
+ // define the input row format
+ final Schema schema = Schema.builder()
+ .addInt32Field("c1")
+ .addStringField("c2")
+ .addDoubleField("c3").build();
+
+ // 10 rows with 0 ~ 9.
+ final List<Row> rows = IntStream.range(0, 10)
+ .mapToObj(i -> Row.withSchema(schema).addValues(i, "row", (double) i).build())
+ .collect(Collectors.toList());
+
+ // Create a source PCollection
+ final PCollection<Row> inputTable = PBegin.in(p).apply(Create.of(rows).withCoder(schema.getRowCoder()));
+
+ // Run 2 SQL queries
+ // ==> Sum of ints larger than 1
+ final PCollection<Row> firstQueryResult =
+ inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
+ final PCollection<Row> secondQueryResult = PCollectionTuple
+ .of(new TupleTag<>("FIRST_QUERY_RESULT"), firstQueryResult)
+ .apply(SqlTransform.query("select c2, sum(c3) from FIRST_QUERY_RESULT group by c2"));
+
+ // Write results to a file
+ // The result should be 2 + 3 + 4 + ... + 9 = 44
+ GenericSourceSink.write(secondQueryResult.apply(MapElements.via(new SimpleFunction<Row, String>() {
+ @Override
+ public String apply(final Row input) {
+ final String c2 = input.getString(0);
+ final Double c3 = input.getDouble(1);
+ return c2 + " is " + c3;
+ }
+ })), outputFilePath);
+
+ p.run();
+ }
+}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
new file mode 100644
index 0000000..4d55ade
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test Broadcast program with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class BeamSimpleSumSQLITCase {
+ private static final int TIMEOUT = 180000;
+ private static ArgBuilder builder;
+ private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+ private static final String outputFileName = "test_output_simplesql";
+ private static final String expectedOutputFileName = "expected_output_simplesql";
+ private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json";
+ private static final String outputFilePath = fileBasePath + outputFileName;
+
+ @Before
+ public void setUp() throws Exception {
+ builder = new ArgBuilder()
+ .addUserMain(SimpleSumSQL.class.getCanonicalName())
+ .addUserArgs(outputFilePath)
+ .addResourceJson(executorResourceFileName);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
+ }
+
+ @Test (timeout = TIMEOUT)
+ public void test() throws Exception {
+ JobLauncher.main(builder
+ .addJobId(BeamSimpleSumSQLITCase.class.getSimpleName())
+ .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+}
diff --git a/examples/resources/expected_output_simplesql b/examples/resources/expected_output_simplesql
new file mode 100644
index 0000000..6b9caf3
--- /dev/null
+++ b/examples/resources/expected_output_simplesql
@@ -0,0 +1 @@
+row is 44.0