You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/09/10 03:54:11 UTC

[incubator-nemo] branch master updated: [NEMO-203] Beam SQL Aggregation fails to match RowCoder in Combine transform (#114)

This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 84b4a0c  [NEMO-203] Beam SQL Aggregation fails to match RowCoder in Combine transform (#114)
84b4a0c is described below

commit 84b4a0cb8345bea2fb637f8a955e1d5f2dcd1e0f
Author: John Yang <jo...@gmail.com>
AuthorDate: Mon Sep 10 12:54:09 2018 +0900

    [NEMO-203] Beam SQL Aggregation fails to match RowCoder in Combine transform (#114)
    
    JIRA: [NEMO-203: Beam SQL Aggregation fails to match RowCoder in Combine transform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-203)
    
    **Major changes:**
    - SimpleSQL: Runs two BeamSQL queries
    
    **Minor changes to note:**
    - Disable NemoPipelineResult#waitUntilFinish, which hangs the job (filed NEMO-208)
    - Disable local combiner optimization for BeamSQL (filed NEMO-209)
    
    **Tests for the changes:**
    - BeamSQLITCase#testSimpleSQL
    
    **Other comments:**
    - N/A
    
    Closes #GITHUB_PR_NUMBER
---
 .../compiler/frontend/beam/NemoPipelineResult.java |  11 ++-
 .../compiler/frontend/beam/PipelineTranslator.java | 107 +++++++++++++--------
 examples/beam/pom.xml                              |   5 +
 .../apache/nemo/examples/beam/SimpleSumSQL.java    |  89 +++++++++++++++++
 .../nemo/examples/beam/BeamSimpleSumSQLITCase.java |  68 +++++++++++++
 examples/resources/expected_output_simplesql       |   1 +
 6 files changed, 239 insertions(+), 42 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
index 259eadd..0228fef 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
@@ -21,7 +21,6 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.joda.time.Duration;
 
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Beam result.
@@ -47,12 +46,18 @@ public final class NemoPipelineResult extends ClientEndpoint implements Pipeline
 
   @Override
   public State waitUntilFinish(final Duration duration) {
-    return (State) super.waitUntilJobFinish(duration.getMillis(), TimeUnit.MILLISECONDS);
+    throw new UnsupportedOperationException();
+    // TODO #208: NemoPipelineResult#waitUntilFinish hangs
+    // Previous code that hangs the job:
+    // return (State) super.waitUntilJobFinish(duration.getMillis(), TimeUnit.MILLISECONDS);
   }
 
   @Override
   public State waitUntilFinish() {
-    return (State) super.waitUntilJobFinish();
+    throw new UnsupportedOperationException();
+    // TODO #208: NemoPipelineResult#waitUntilFinish hangs
+    // Previous code that hangs the job:
+    // return (State) super.waitUntilJobFinish();
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index d06ee67..6ef96bf 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -35,6 +35,8 @@ import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.annotation.*;
 import java.lang.reflect.InvocationTargetException;
@@ -44,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 import java.util.function.BiFunction;
+import java.util.stream.Stream;
 
 /**
  * Converts DAG of Beam pipeline to Nemo IR DAG.
@@ -54,6 +57,8 @@ import java.util.function.BiFunction;
 public final class PipelineTranslator
     implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
+
   private static final PipelineTranslator INSTANCE = new PipelineTranslator();
 
   private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap<>();
@@ -216,11 +221,23 @@ public final class PipelineTranslator
   private static void combineTranslator(final TranslationContext ctx,
                                         final CompositeTransformVertex transformVertex,
                                         final PTransform<?, ?> transform) {
+    // No optimization for BeamSQL that handles Beam 'Row's.
+    final boolean handlesBeamRow = Stream
+      .concat(transformVertex.getNode().getInputs().values().stream(),
+        transformVertex.getNode().getOutputs().values().stream())
+      .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and output of combine should be KV
+      .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV
+      .anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
+    if (handlesBeamRow) {
+      transformVertex.getDAG().topologicalDo(ctx::translate);
+      return; // return early and give up optimization - TODO #209: Enable Local Combiner for BeamSQL
+    }
+
+    // Local combiner optimization
     final List<TransformVertex> topologicalOrdering = transformVertex.getDAG().getTopologicalSort();
-    final TransformVertex first = topologicalOrdering.get(0);
+    final TransformVertex groupByKeyBeamTransform = topologicalOrdering.get(0);
     final TransformVertex last = topologicalOrdering.get(topologicalOrdering.size() - 1);
-
-    if (first.getNode().getTransform() instanceof GroupByKey) {
+    if (groupByKeyBeamTransform.getNode().getTransform() instanceof GroupByKey) {
       // Translate the given CompositeTransform under OneToOneEdge-enforced context.
       final TranslationContext oneToOneEdgeContext = new TranslationContext(ctx,
           OneToOneCommunicationPatternSelector.INSTANCE);
@@ -229,12 +246,12 @@ public final class PipelineTranslator
       // Attempt to translate the CompositeTransform again.
       // Add GroupByKey, which is the first transform in the given CompositeTransform.
       // Make sure it consumes the output from the last vertex in OneToOneEdge-translated hierarchy.
-      final IRVertex groupByKey = new OperatorVertex(new GroupByKeyTransform());
-      ctx.addVertex(groupByKey);
+      final IRVertex groupByKeyIRVertex = new OperatorVertex(new GroupByKeyTransform());
+      ctx.addVertex(groupByKeyIRVertex);
       last.getNode().getOutputs().values().forEach(outputFromCombiner
-          -> ctx.addEdgeTo(groupByKey, outputFromCombiner));
-      first.getNode().getOutputs().values()
-          .forEach(outputFromGroupByKey -> ctx.registerMainOutputFrom(groupByKey, outputFromGroupByKey));
+          -> ctx.addEdgeTo(groupByKeyIRVertex, outputFromCombiner));
+      groupByKeyBeamTransform.getNode().getOutputs().values()
+          .forEach(outputFromGroupByKey -> ctx.registerMainOutputFrom(groupByKeyIRVertex, outputFromGroupByKey));
 
       // Translate the remaining vertices.
       topologicalOrdering.stream().skip(1).forEach(ctx::translate);
@@ -288,6 +305,48 @@ public final class PipelineTranslator
     Class<? extends PTransform>[] value();
   }
 
+  private static Coder<?> getCoder(final PValue input, final CompositeTransformVertex pipeline) {
+    final Coder<?> coder;
+    if (input instanceof PCollection) {
+      coder = ((PCollection) input).getCoder();
+    } else if (input instanceof PCollectionView) {
+      coder = getCoderForView((PCollectionView) input, pipeline);
+    } else {
+      throw new RuntimeException(String.format("Coder for PValue %s cannot be determined", input));
+    }
+    return coder;
+  }
+
+  /**
+   * Get appropriate coder for {@link PCollectionView}.
+   *
+   * @param view {@link PCollectionView} from the corresponding {@link View.CreatePCollectionView} transform
+   * @return appropriate {@link Coder} for {@link PCollectionView}
+   */
+  private static Coder<?> getCoderForView(final PCollectionView view, final CompositeTransformVertex pipeline) {
+    final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
+    final Coder<?> baseCoder = src.getNode().getInputs().values().stream()
+      .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst()
+      .orElseThrow(() -> new RuntimeException(String.format("No incoming PCollection to %s", src)))
+      .getCoder();
+    final ViewFn viewFn = view.getViewFn();
+    if (viewFn instanceof PCollectionViews.IterableViewFn) {
+      return IterableCoder.of(baseCoder);
+    } else if (viewFn instanceof PCollectionViews.ListViewFn) {
+      return ListCoder.of(baseCoder);
+    } else if (viewFn instanceof PCollectionViews.MapViewFn) {
+      final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
+      return MapCoder.of(inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+    } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
+      final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
+      return MapCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()));
+    } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
+      return baseCoder;
+    } else {
+      throw new UnsupportedOperationException(String.format("Unsupported viewFn %s", viewFn.getClass()));
+    }
+  }
+
   /**
    * Translation context.
    */
@@ -422,7 +481,7 @@ public final class PipelineTranslator
       if (input instanceof PCollection) {
         coder = ((PCollection) input).getCoder();
       } else if (input instanceof PCollectionView) {
-        coder = getCoderForView((PCollectionView) input);
+        coder = getCoderForView((PCollectionView) input, pipeline);
       } else {
         coder = null;
       }
@@ -473,36 +532,6 @@ public final class PipelineTranslator
       pValueToTag.put(output, tag);
       pValueToProducer.put(output, irVertex);
     }
-
-    /**
-     * Get appropriate coder for {@link PCollectionView}.
-     *
-     * @param view {@link PCollectionView} from the corresponding {@link View.CreatePCollectionView} transform
-     * @return appropriate {@link Coder} for {@link PCollectionView}
-     */
-    private Coder<?> getCoderForView(final PCollectionView view) {
-      final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
-      final Coder<?> baseCoder = src.getNode().getInputs().values().stream()
-          .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst()
-          .orElseThrow(() -> new RuntimeException(String.format("No incoming PCollection to %s", src)))
-          .getCoder();
-      final ViewFn viewFn = view.getViewFn();
-      if (viewFn instanceof PCollectionViews.IterableViewFn) {
-        return IterableCoder.of(baseCoder);
-      } else if (viewFn instanceof PCollectionViews.ListViewFn) {
-        return ListCoder.of(baseCoder);
-      } else if (viewFn instanceof PCollectionViews.MapViewFn) {
-        final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
-        return MapCoder.of(inputCoder.getKeyCoder(), inputCoder.getValueCoder());
-      } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
-        final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
-        return MapCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()));
-      } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
-        return baseCoder;
-      } else {
-        throw new UnsupportedOperationException(String.format("Unsupported viewFn %s", viewFn.getClass()));
-      }
-    }
   }
 
   /**
diff --git a/examples/beam/pom.xml b/examples/beam/pom.xml
index 35f31e4..f6fc5e0 100644
--- a/examples/beam/pom.xml
+++ b/examples/beam/pom.xml
@@ -53,6 +53,11 @@ limitations under the License.
             <version>${beam.version}</version>
         </dependency>
         <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-sdks-java-extensions-sql</artifactId>
+          <version>${beam.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
             <version>${hadoop.version}</version>
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
new file mode 100644
index 0000000..29976d2
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.values.*;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A simple SQL application.
+ * (Copied/Refined from the example code in the Beam repository)
+ */
+public final class SimpleSumSQL {
+  /**
+   * Private Constructor.
+   */
+  private SimpleSumSQL() {
+  }
+
+  /**
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String outputFilePath = args[0];
+
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("SimpleSumSQL");
+    final Pipeline p = Pipeline.create(options);
+
+    // define the input row format
+    final Schema schema = Schema.builder()
+      .addInt32Field("c1")
+      .addStringField("c2")
+      .addDoubleField("c3").build();
+
+    // 10 rows with 0 ~ 9.
+    final List<Row> rows = IntStream.range(0, 10)
+      .mapToObj(i -> Row.withSchema(schema).addValues(i, "row", (double) i).build())
+      .collect(Collectors.toList());
+
+    // Create a source PCollection
+    final PCollection<Row> inputTable = PBegin.in(p).apply(Create.of(rows).withCoder(schema.getRowCoder()));
+
+    // Run 2 SQL queries
+    // ==> Sum of ints larger than 1
+    final PCollection<Row> firstQueryResult =
+      inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
+    final PCollection<Row> secondQueryResult = PCollectionTuple
+      .of(new TupleTag<>("FIRST_QUERY_RESULT"), firstQueryResult)
+      .apply(SqlTransform.query("select c2, sum(c3) from FIRST_QUERY_RESULT group by c2"));
+
+    // Write results to a file
+    // The result should be 2 + 3 + 4 + ... + 9 = 44
+    GenericSourceSink.write(secondQueryResult.apply(MapElements.via(new SimpleFunction<Row, String>() {
+      @Override
+      public String apply(final Row input) {
+        final String c2 = input.getString(0);
+        final Double c3 = input.getDouble(1);
+        return c2 + " is " + c3;
+      }
+    })), outputFilePath);
+
+    p.run();
+  }
+}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
new file mode 100644
index 0000000..4d55ade
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test Broadcast program with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class BeamSimpleSumSQLITCase {
+  private static final int TIMEOUT = 180000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+  private static final String outputFileName = "test_output_simplesql";
+  private static final String expectedOutputFileName = "expected_output_simplesql";
+  private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json";
+  private static final String outputFilePath =  fileBasePath + outputFileName;
+
+  @Before
+  public void setUp() throws Exception {
+    builder = new ArgBuilder()
+        .addUserMain(SimpleSumSQL.class.getCanonicalName())
+        .addUserArgs(outputFilePath)
+        .addResourceJson(executorResourceFileName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+
+  @Test (timeout = TIMEOUT)
+  public void test() throws Exception {
+    JobLauncher.main(builder
+        .addJobId(BeamSimpleSumSQLITCase.class.getSimpleName())
+        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+        .build());
+  }
+}
diff --git a/examples/resources/expected_output_simplesql b/examples/resources/expected_output_simplesql
new file mode 100644
index 0000000..6b9caf3
--- /dev/null
+++ b/examples/resources/expected_output_simplesql
@@ -0,0 +1 @@
+row is 44.0