You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/01/15 06:27:18 UTC

[beam] branch master updated: [BEAM-6427]INTERSECT ALL is not compatible with SQL standard.

This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d7a0a3f  [BEAM-6427]INTERSECT ALL is not compatible with SQL standard.
d7a0a3f is described below

commit d7a0a3fd9d5a75410bd37fc92036edb2a2bf0448
Author: amaliujia <am...@gmail.com>
AuthorDate: Mon Jan 14 15:10:19 2019 -0800

    [BEAM-6427]INTERSECT ALL is not compatible with SQL standard.
---
 .../sql/impl/transform/BeamSetOperatorsTransforms.java      | 10 ++++++++--
 .../sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java   | 13 ++-----------
 2 files changed, 10 insertions(+), 13 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
index 4827e2d..7353152 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
@@ -81,8 +81,14 @@ public abstract class BeamSetOperatorsTransforms {
         case INTERSECT:
           if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
             if (all) {
-              for (Row leftRow : leftRows) {
-                ctx.output(leftRow);
+              int leftCount = Iterators.size(leftRows.iterator());
+              int rightCount = Iterators.size(rightRows.iterator());
+
+              // Say for Row R, there are m instances on left and n instances on right,
+              // INTERSECT ALL outputs MIN(m, n) instances of R.
+              Iterator<Row> iter = (leftCount <= rightCount) ? leftRows.iterator() : rightRows.iterator();
+              while (iter.hasNext()) {
+                ctx.output(iter.next());
               }
             } else {
               ctx.output(ctx.element().getKey());
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
index 238da34..1bcbed4 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
@@ -108,7 +108,7 @@ public class BeamIntersectRelTest extends BaseRelTest {
             + "FROM ORDER_DETAILS2 ";
 
     PCollection<Row> rows = compilePipeline(sql, pipeline);
-    PAssert.that(rows).satisfies(new CheckSize(3));
+    PAssert.that(rows).satisfies(new CheckSize(2));
 
     PAssert.that(rows)
         .containsInAnyOrder(
@@ -116,16 +116,7 @@ public class BeamIntersectRelTest extends BaseRelTest {
                     Schema.FieldType.INT64, "order_id",
                     Schema.FieldType.INT32, "site_id",
                     Schema.FieldType.DECIMAL, "price")
-                .addRows(
-                    1L,
-                    1,
-                    new BigDecimal(1.0),
-                    1L,
-                    1,
-                    new BigDecimal(1.0),
-                    2L,
-                    2,
-                    new BigDecimal(2.0))
+                .addRows(1L, 1, new BigDecimal(1.0), 2L, 2, new BigDecimal(2.0))
                 .getRows());
 
     pipeline.run();