You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/03/15 22:05:58 UTC

[beam] branch master updated: [BEAM-6814] toListRow in BeamEnumerableConverter.

This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b2fb1c  [BEAM-6814] toListRow in BeamEnumerableConverter.
     new 652adec  Merge pull request #8044 from amaliujia/rw_add_list_row
5b2fb1c is described below

commit 5b2fb1c854706369472a7afbde2e8bcabd4df659
Author: amaliujia <am...@163.com>
AuthorDate: Tue Mar 12 15:51:17 2019 -0700

    [BEAM-6814] toListRow in BeamEnumerableConverter.
---
 .../sql/impl/rel/BeamEnumerableConverter.java      | 63 ++++++++++++++++------
 .../sql/impl/rel/BeamEnumerableConverterTest.java  | 17 ++++++
 2 files changed, 65 insertions(+), 15 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 3fb9a67..755d589 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -129,6 +129,15 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
     return options;
   }
 
+  static List<Row> toRowList(PipelineOptions options, BeamRelNode node) {
+    if (node instanceof BeamIOSinkRel) {
+      throw new UnsupportedOperationException("Does not support BeamIOSinkRel in toRowList.");
+    } else if (isLimitQuery(node)) {
+      throw new UnsupportedOperationException("Does not support queries with LIMIT in toRowList.");
+    }
+    return collectRowList(options, node);
+  }
+
   static Enumerable<Object> toEnumerable(PipelineOptions options, BeamRelNode node) {
     if (node instanceof BeamIOSinkRel) {
       return count(options, node);
@@ -143,7 +152,7 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
       PipelineOptions options,
       BeamRelNode node,
       DoFn<Row, Void> doFn,
-      Queue<Object[]> values,
+      Queue<Row> values,
       int limitCount) {
     options.as(DirectOptions.class).setBlockOnRun(false);
     Pipeline pipeline = Pipeline.create(options);
@@ -174,9 +183,36 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
     return result;
   }
 
+  private static void runCollector(PipelineOptions options, BeamRelNode node) {
+    Pipeline pipeline = Pipeline.create(options);
+    PCollection<Row> resultCollection = BeamSqlRelUtils.toPCollection(pipeline, node);
+    resultCollection.apply(ParDo.of(new Collector()));
+    PipelineResult result = pipeline.run();
+    result.waitUntilFinish();
+  }
+
+  private static List<Row> collectRowList(PipelineOptions options, BeamRelNode node) {
+    long id = options.getOptionsId();
+    Queue<Row> values = new ConcurrentLinkedQueue<>();
+
+    checkArgument(
+        options
+            .getRunner()
+            .getCanonicalName()
+            .equals("org.apache.beam.runners.direct.DirectRunner"),
+        "collectRowList is only available in direct runner.");
+
+    Collector.globalValues.put(id, values);
+
+    runCollector(options, node);
+
+    Collector.globalValues.remove(id);
+    return values.stream().collect(Collectors.toList());
+  }
+
   private static Enumerable<Object> collect(PipelineOptions options, BeamRelNode node) {
     long id = options.getOptionsId();
-    Queue<Object[]> values = new ConcurrentLinkedQueue<>();
+    Queue<Row> values = new ConcurrentLinkedQueue<>();
 
     checkArgument(
         options
@@ -187,20 +223,16 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
 
     Collector.globalValues.put(id, values);
 
-    Pipeline pipeline = Pipeline.create(options);
-    PCollection<Row> resultCollection = BeamSqlRelUtils.toPCollection(pipeline, node);
-    resultCollection.apply(ParDo.of(new Collector()));
-    PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
+    runCollector(options, node);
 
     Collector.globalValues.remove(id);
 
-    return Linq4j.asEnumerable(unboxValues(values));
+    return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues(values));
   }
 
   private static Enumerable<Object> limitCollect(PipelineOptions options, BeamRelNode node) {
     long id = options.getOptionsId();
-    ConcurrentLinkedQueue<Object[]> values = new ConcurrentLinkedQueue<>();
+    ConcurrentLinkedQueue<Row> values = new ConcurrentLinkedQueue<>();
 
     checkArgument(
         options
@@ -220,15 +252,15 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
       values.remove();
     }
 
-    return Linq4j.asEnumerable(unboxValues(values));
+    return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues(values));
   }
 
   private static class Collector extends DoFn<Row, Void> {
 
     // This will only work on the direct runner.
-    private static final Map<Long, Queue<Object[]>> globalValues = new ConcurrentHashMap<>();
+    private static final Map<Long, Queue<Row>> globalValues = new ConcurrentHashMap<>();
 
-    @Nullable private volatile Queue<Object[]> values;
+    @Nullable private volatile Queue<Row> values;
 
     @StartBundle
     public void startBundle(StartBundleContext context) {
@@ -238,14 +270,15 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
 
     @ProcessElement
     public void processElement(ProcessContext context) {
-      values.add(rowToAvatica(context.element()));
+      values.add(context.element());
     }
   }
 
-  private static List<Object> unboxValues(Queue<Object[]> values) {
+  private static List<Object> rowToAvaticaAndUnboxValues(Queue<Row> values) {
     return values.stream()
         .map(
-            objects -> {
+            row -> {
+              Object[] objects = rowToAvatica(row);
               if (objects.length == 1) {
                 // if objects.length == 1, that means input Row contains only 1 column/element,
                 // then an Object instead of Object[] should be returned because of
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
index 5011aa3..76e952e 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
 import java.math.BigDecimal;
+import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -97,6 +98,22 @@ public class BeamEnumerableConverterTest {
   }
 
   @Test
+  public void testToListRow_collectMultiple() {
+    Schema schema = Schema.builder().addInt64Field("id").addInt64Field("otherid").build();
+    RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY);
+    ImmutableList<ImmutableList<RexLiteral>> tuples =
+        ImmutableList.of(
+            ImmutableList.of(
+                rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+                rexBuilder.makeBigintLiteral(BigDecimal.ONE)));
+    BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null);
+
+    List<Row> rowList = BeamEnumerableConverter.toRowList(options, node);
+    assertTrue(rowList.size() == 1);
+    assertEquals(Row.withSchema(schema).addValues(0L, 1L).build(), rowList.get(0));
+  }
+
+  @Test
   public void testToEnumerable_collectNullValue() {
     Schema schema = Schema.builder().addNullableField("id", FieldType.INT64).build();
     RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY);