You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/09/10 03:54:11 UTC

[GitHub] seojangho closed pull request #114: [NEMO-203] Beam SQL Aggregation fails to match RowCoder in Combine transform

seojangho closed pull request #114: [NEMO-203] Beam SQL Aggregation fails to match RowCoder in Combine transform
URL: https://github.com/apache/incubator-nemo/pull/114
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
index 259eadd93..0228fefdf 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
@@ -21,7 +21,6 @@
 import org.joda.time.Duration;
 
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Beam result.
@@ -47,12 +46,18 @@ public State cancel() throws IOException {
 
   @Override
   public State waitUntilFinish(final Duration duration) {
-    return (State) super.waitUntilJobFinish(duration.getMillis(), TimeUnit.MILLISECONDS);
+    throw new UnsupportedOperationException();
+    // TODO #208: NemoPipelineResult#waitUntilFinish hangs
+    // Previous code that hangs the job:
+    // return (State) super.waitUntilJobFinish(duration.getMillis(), TimeUnit.MILLISECONDS);
   }
 
   @Override
   public State waitUntilFinish() {
-    return (State) super.waitUntilJobFinish();
+    throw new UnsupportedOperationException();
+    // TODO #208: NemoPipelineResult#waitUntilFinish hangs
+    // Previous code that hangs the job:
+    // return (State) super.waitUntilJobFinish();
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index d06ee6759..6ef96bf5b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -35,6 +35,8 @@
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.annotation.*;
 import java.lang.reflect.InvocationTargetException;
@@ -44,6 +46,7 @@
 import java.util.Map;
 import java.util.Stack;
 import java.util.function.BiFunction;
+import java.util.stream.Stream;
 
 /**
  * Converts DAG of Beam pipeline to Nemo IR DAG.
@@ -54,6 +57,8 @@
 public final class PipelineTranslator
     implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
+
   private static final PipelineTranslator INSTANCE = new PipelineTranslator();
 
   private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap<>();
@@ -216,11 +221,23 @@ private static void topologicalTranslator(final TranslationContext ctx,
   private static void combineTranslator(final TranslationContext ctx,
                                         final CompositeTransformVertex transformVertex,
                                         final PTransform<?, ?> transform) {
+    // No optimization for BeamSQL that handles Beam 'Row's.
+    final boolean handlesBeamRow = Stream
+      .concat(transformVertex.getNode().getInputs().values().stream(),
+        transformVertex.getNode().getOutputs().values().stream())
+      .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and output of combine should be KV
+      .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV
+      .anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
+    if (handlesBeamRow) {
+      transformVertex.getDAG().topologicalDo(ctx::translate);
+      return; // return early and give up optimization - TODO #209: Enable Local Combiner for BeamSQL
+    }
+
+    // Local combiner optimization
     final List<TransformVertex> topologicalOrdering = transformVertex.getDAG().getTopologicalSort();
-    final TransformVertex first = topologicalOrdering.get(0);
+    final TransformVertex groupByKeyBeamTransform = topologicalOrdering.get(0);
     final TransformVertex last = topologicalOrdering.get(topologicalOrdering.size() - 1);
-
-    if (first.getNode().getTransform() instanceof GroupByKey) {
+    if (groupByKeyBeamTransform.getNode().getTransform() instanceof GroupByKey) {
       // Translate the given CompositeTransform under OneToOneEdge-enforced context.
       final TranslationContext oneToOneEdgeContext = new TranslationContext(ctx,
           OneToOneCommunicationPatternSelector.INSTANCE);
@@ -229,12 +246,12 @@ private static void combineTranslator(final TranslationContext ctx,
       // Attempt to translate the CompositeTransform again.
       // Add GroupByKey, which is the first transform in the given CompositeTransform.
       // Make sure it consumes the output from the last vertex in OneToOneEdge-translated hierarchy.
-      final IRVertex groupByKey = new OperatorVertex(new GroupByKeyTransform());
-      ctx.addVertex(groupByKey);
+      final IRVertex groupByKeyIRVertex = new OperatorVertex(new GroupByKeyTransform());
+      ctx.addVertex(groupByKeyIRVertex);
       last.getNode().getOutputs().values().forEach(outputFromCombiner
-          -> ctx.addEdgeTo(groupByKey, outputFromCombiner));
-      first.getNode().getOutputs().values()
-          .forEach(outputFromGroupByKey -> ctx.registerMainOutputFrom(groupByKey, outputFromGroupByKey));
+          -> ctx.addEdgeTo(groupByKeyIRVertex, outputFromCombiner));
+      groupByKeyBeamTransform.getNode().getOutputs().values()
+          .forEach(outputFromGroupByKey -> ctx.registerMainOutputFrom(groupByKeyIRVertex, outputFromGroupByKey));
 
       // Translate the remaining vertices.
       topologicalOrdering.stream().skip(1).forEach(ctx::translate);
@@ -288,6 +305,48 @@ private static void loopTranslator(final TranslationContext ctx,
     Class<? extends PTransform>[] value();
   }
 
+  private static Coder<?> getCoder(final PValue input, final CompositeTransformVertex pipeline) {
+    final Coder<?> coder;
+    if (input instanceof PCollection) {
+      coder = ((PCollection) input).getCoder();
+    } else if (input instanceof PCollectionView) {
+      coder = getCoderForView((PCollectionView) input, pipeline);
+    } else {
+      throw new RuntimeException(String.format("Coder for PValue %s cannot be determined", input));
+    }
+    return coder;
+  }
+
+  /**
+   * Get appropriate coder for {@link PCollectionView}.
+   *
+   * @param view {@link PCollectionView} from the corresponding {@link View.CreatePCollectionView} transform
+   * @return appropriate {@link Coder} for {@link PCollectionView}
+   */
+  private static Coder<?> getCoderForView(final PCollectionView view, final CompositeTransformVertex pipeline) {
+    final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
+    final Coder<?> baseCoder = src.getNode().getInputs().values().stream()
+      .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst()
+      .orElseThrow(() -> new RuntimeException(String.format("No incoming PCollection to %s", src)))
+      .getCoder();
+    final ViewFn viewFn = view.getViewFn();
+    if (viewFn instanceof PCollectionViews.IterableViewFn) {
+      return IterableCoder.of(baseCoder);
+    } else if (viewFn instanceof PCollectionViews.ListViewFn) {
+      return ListCoder.of(baseCoder);
+    } else if (viewFn instanceof PCollectionViews.MapViewFn) {
+      final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
+      return MapCoder.of(inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+    } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
+      final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
+      return MapCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()));
+    } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
+      return baseCoder;
+    } else {
+      throw new UnsupportedOperationException(String.format("Unsupported viewFn %s", viewFn.getClass()));
+    }
+  }
+
   /**
    * Translation context.
    */
@@ -422,7 +481,7 @@ private void addEdgeTo(final IRVertex dst, final PValue input) {
       if (input instanceof PCollection) {
         coder = ((PCollection) input).getCoder();
       } else if (input instanceof PCollectionView) {
-        coder = getCoderForView((PCollectionView) input);
+        coder = getCoderForView((PCollectionView) input, pipeline);
       } else {
         coder = null;
       }
@@ -473,36 +532,6 @@ private void registerAdditionalOutputFrom(final IRVertex irVertex, final PValue
       pValueToTag.put(output, tag);
       pValueToProducer.put(output, irVertex);
     }
-
-    /**
-     * Get appropriate coder for {@link PCollectionView}.
-     *
-     * @param view {@link PCollectionView} from the corresponding {@link View.CreatePCollectionView} transform
-     * @return appropriate {@link Coder} for {@link PCollectionView}
-     */
-    private Coder<?> getCoderForView(final PCollectionView view) {
-      final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
-      final Coder<?> baseCoder = src.getNode().getInputs().values().stream()
-          .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst()
-          .orElseThrow(() -> new RuntimeException(String.format("No incoming PCollection to %s", src)))
-          .getCoder();
-      final ViewFn viewFn = view.getViewFn();
-      if (viewFn instanceof PCollectionViews.IterableViewFn) {
-        return IterableCoder.of(baseCoder);
-      } else if (viewFn instanceof PCollectionViews.ListViewFn) {
-        return ListCoder.of(baseCoder);
-      } else if (viewFn instanceof PCollectionViews.MapViewFn) {
-        final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
-        return MapCoder.of(inputCoder.getKeyCoder(), inputCoder.getValueCoder());
-      } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
-        final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
-        return MapCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()));
-      } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
-        return baseCoder;
-      } else {
-        throw new UnsupportedOperationException(String.format("Unsupported viewFn %s", viewFn.getClass()));
-      }
-    }
   }
 
   /**
diff --git a/examples/beam/pom.xml b/examples/beam/pom.xml
index 35f31e421..f6fc5e08d 100644
--- a/examples/beam/pom.xml
+++ b/examples/beam/pom.xml
@@ -52,6 +52,11 @@ limitations under the License.
             <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
             <version>${beam.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-sdks-java-extensions-sql</artifactId>
+          <version>${beam.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
new file mode 100644
index 000000000..29976d234
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.values.*;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A simple SQL application.
+ * (Copied/Refined from the example code in the Beam repository)
+ */
+public final class SimpleSumSQL {
+  /**
+   * Private Constructor.
+   */
+  private SimpleSumSQL() {
+  }
+
+  /**
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String outputFilePath = args[0];
+
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("SimpleSumSQL");
+    final Pipeline p = Pipeline.create(options);
+
+    // define the input row format
+    final Schema schema = Schema.builder()
+      .addInt32Field("c1")
+      .addStringField("c2")
+      .addDoubleField("c3").build();
+
+    // 10 rows with 0 ~ 9.
+    final List<Row> rows = IntStream.range(0, 10)
+      .mapToObj(i -> Row.withSchema(schema).addValues(i, "row", (double) i).build())
+      .collect(Collectors.toList());
+
+    // Create a source PCollection
+    final PCollection<Row> inputTable = PBegin.in(p).apply(Create.of(rows).withCoder(schema.getRowCoder()));
+
+    // Run 2 SQL queries
+    // ==> Sum of ints larger than 1
+    final PCollection<Row> firstQueryResult =
+      inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
+    final PCollection<Row> secondQueryResult = PCollectionTuple
+      .of(new TupleTag<>("FIRST_QUERY_RESULT"), firstQueryResult)
+      .apply(SqlTransform.query("select c2, sum(c3) from FIRST_QUERY_RESULT group by c2"));
+
+    // Write results to a file
+    // The result should be 2 + 3 + 4 + ... + 9 = 44
+    GenericSourceSink.write(secondQueryResult.apply(MapElements.via(new SimpleFunction<Row, String>() {
+      @Override
+      public String apply(final Row input) {
+        final String c2 = input.getString(0);
+        final Double c3 = input.getDouble(1);
+        return c2 + " is " + c3;
+      }
+    })), outputFilePath);
+
+    p.run();
+  }
+}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
new file mode 100644
index 000000000..4d55ade5e
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test Broadcast program with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class BeamSimpleSumSQLITCase {
+  private static final int TIMEOUT = 180000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+  private static final String outputFileName = "test_output_simplesql";
+  private static final String expectedOutputFileName = "expected_output_simplesql";
+  private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json";
+  private static final String outputFilePath =  fileBasePath + outputFileName;
+
+  @Before
+  public void setUp() throws Exception {
+    builder = new ArgBuilder()
+        .addUserMain(SimpleSumSQL.class.getCanonicalName())
+        .addUserArgs(outputFilePath)
+        .addResourceJson(executorResourceFileName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+
+  @Test (timeout = TIMEOUT)
+  public void test() throws Exception {
+    JobLauncher.main(builder
+        .addJobId(BeamSimpleSumSQLITCase.class.getSimpleName())
+        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+        .build());
+  }
+}
diff --git a/examples/resources/expected_output_simplesql b/examples/resources/expected_output_simplesql
new file mode 100644
index 000000000..6b9caf3e9
--- /dev/null
+++ b/examples/resources/expected_output_simplesql
@@ -0,0 +1 @@
+row is 44.0


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services