You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/03/15 22:05:58 UTC
[beam] branch master updated: [BEAM-6814] toListRow in
BeamEnumerableConverter.
This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5b2fb1c [BEAM-6814] toListRow in BeamEnumerableConverter.
new 652adec Merge pull request #8044 from amaliujia/rw_add_list_row
5b2fb1c is described below
commit 5b2fb1c854706369472a7afbde2e8bcabd4df659
Author: amaliujia <am...@163.com>
AuthorDate: Tue Mar 12 15:51:17 2019 -0700
[BEAM-6814] toListRow in BeamEnumerableConverter.
---
.../sql/impl/rel/BeamEnumerableConverter.java | 63 ++++++++++++++++------
.../sql/impl/rel/BeamEnumerableConverterTest.java | 17 ++++++
2 files changed, 65 insertions(+), 15 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 3fb9a67..755d589 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -129,6 +129,15 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
return options;
}
+ static List<Row> toRowList(PipelineOptions options, BeamRelNode node) {
+ if (node instanceof BeamIOSinkRel) {
+ throw new UnsupportedOperationException("Does not support BeamIOSinkRel in toRowList.");
+ } else if (isLimitQuery(node)) {
+ throw new UnsupportedOperationException("Does not support queries with LIMIT in toRowList.");
+ }
+ return collectRowList(options, node);
+ }
+
static Enumerable<Object> toEnumerable(PipelineOptions options, BeamRelNode node) {
if (node instanceof BeamIOSinkRel) {
return count(options, node);
@@ -143,7 +152,7 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
PipelineOptions options,
BeamRelNode node,
DoFn<Row, Void> doFn,
- Queue<Object[]> values,
+ Queue<Row> values,
int limitCount) {
options.as(DirectOptions.class).setBlockOnRun(false);
Pipeline pipeline = Pipeline.create(options);
@@ -174,9 +183,36 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
return result;
}
+ private static void runCollector(PipelineOptions options, BeamRelNode node) {
+ Pipeline pipeline = Pipeline.create(options);
+ PCollection<Row> resultCollection = BeamSqlRelUtils.toPCollection(pipeline, node);
+ resultCollection.apply(ParDo.of(new Collector()));
+ PipelineResult result = pipeline.run();
+ result.waitUntilFinish();
+ }
+
+ private static List<Row> collectRowList(PipelineOptions options, BeamRelNode node) {
+ long id = options.getOptionsId();
+ Queue<Row> values = new ConcurrentLinkedQueue<>();
+
+ checkArgument(
+ options
+ .getRunner()
+ .getCanonicalName()
+ .equals("org.apache.beam.runners.direct.DirectRunner"),
+ "collectRowList is only available in direct runner.");
+
+ Collector.globalValues.put(id, values);
+
+ runCollector(options, node);
+
+ Collector.globalValues.remove(id);
+ return values.stream().collect(Collectors.toList());
+ }
+
private static Enumerable<Object> collect(PipelineOptions options, BeamRelNode node) {
long id = options.getOptionsId();
- Queue<Object[]> values = new ConcurrentLinkedQueue<>();
+ Queue<Row> values = new ConcurrentLinkedQueue<>();
checkArgument(
options
@@ -187,20 +223,16 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
Collector.globalValues.put(id, values);
- Pipeline pipeline = Pipeline.create(options);
- PCollection<Row> resultCollection = BeamSqlRelUtils.toPCollection(pipeline, node);
- resultCollection.apply(ParDo.of(new Collector()));
- PipelineResult result = pipeline.run();
- result.waitUntilFinish();
+ runCollector(options, node);
Collector.globalValues.remove(id);
- return Linq4j.asEnumerable(unboxValues(values));
+ return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues(values));
}
private static Enumerable<Object> limitCollect(PipelineOptions options, BeamRelNode node) {
long id = options.getOptionsId();
- ConcurrentLinkedQueue<Object[]> values = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<Row> values = new ConcurrentLinkedQueue<>();
checkArgument(
options
@@ -220,15 +252,15 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
values.remove();
}
- return Linq4j.asEnumerable(unboxValues(values));
+ return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues(values));
}
private static class Collector extends DoFn<Row, Void> {
// This will only work on the direct runner.
- private static final Map<Long, Queue<Object[]>> globalValues = new ConcurrentHashMap<>();
+ private static final Map<Long, Queue<Row>> globalValues = new ConcurrentHashMap<>();
- @Nullable private volatile Queue<Object[]> values;
+ @Nullable private volatile Queue<Row> values;
@StartBundle
public void startBundle(StartBundleContext context) {
@@ -238,14 +270,15 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable
@ProcessElement
public void processElement(ProcessContext context) {
- values.add(rowToAvatica(context.element()));
+ values.add(context.element());
}
}
- private static List<Object> unboxValues(Queue<Object[]> values) {
+ private static List<Object> rowToAvaticaAndUnboxValues(Queue<Row> values) {
return values.stream()
.map(
- objects -> {
+ row -> {
+ Object[] objects = rowToAvatica(row);
if (objects.length == 1) {
// if objects.length == 1, that means input Row contains only 1 column/element,
// then an Object instead of Object[] should be returned because of
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
index 5011aa3..76e952e 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
import java.math.BigDecimal;
+import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -97,6 +98,22 @@ public class BeamEnumerableConverterTest {
}
@Test
+ public void testToListRow_collectMultiple() {
+ Schema schema = Schema.builder().addInt64Field("id").addInt64Field("otherid").build();
+ RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY);
+ ImmutableList<ImmutableList<RexLiteral>> tuples =
+ ImmutableList.of(
+ ImmutableList.of(
+ rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+ rexBuilder.makeBigintLiteral(BigDecimal.ONE)));
+ BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null);
+
+ List<Row> rowList = BeamEnumerableConverter.toRowList(options, node);
+ assertTrue(rowList.size() == 1);
+ assertEquals(Row.withSchema(schema).addValues(0L, 1L).build(), rowList.get(0));
+ }
+
+ @Test
public void testToEnumerable_collectNullValue() {
Schema schema = Schema.builder().addNullableField("id", FieldType.INT64).build();
RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY);