You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/01/15 06:26:49 UTC

[beam] branch master updated: [BEAM-6430] Fix EXCEPT.

This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 761240f  [BEAM-6430] Fix EXCEPT.
761240f is described below

commit 761240fb526f1b0a770abe1a11a6eedd9cd2c42e
Author: amaliujia <am...@gmail.com>
AuthorDate: Mon Jan 14 15:42:06 2019 -0800

    [BEAM-6430] Fix EXCEPT.
---
 .../sql/impl/transform/BeamSetOperatorsTransforms.java | 18 ++++++++++++++++++
 .../sdk/extensions/sql/impl/rel/BeamMinusRelTest.java  | 15 ++++++++++++---
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
index 581fb08..4827e2d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
 
 /** Collections of {@code PTransform} and {@code DoFn} used to perform Set operations. */
 public abstract class BeamSetOperatorsTransforms {
@@ -89,6 +90,8 @@ public abstract class BeamSetOperatorsTransforms {
           }
           break;
         case MINUS:
+          // Say for Row R, there are m instances on left and n instances on right,
+          // EXCEPT ALL outputs MAX(m - n, 0) instances of R.
           if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) {
             Iterator<Row> iter = leftRows.iterator();
             if (all) {
@@ -100,6 +103,21 @@ public abstract class BeamSetOperatorsTransforms {
               // only output one
               ctx.output(iter.next());
             }
+          } else if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
+            int leftCount = Iterators.size(leftRows.iterator());
+            int rightCount = Iterators.size(rightRows.iterator());
+
+            int outputCount = leftCount - rightCount;
+            if (outputCount > 0) {
+              if (all) {
+                while (outputCount > 0) {
+                  outputCount--;
+                  ctx.output(ctx.element().getKey());
+                }
+              } else {
+                ctx.output(ctx.element().getKey());
+              }
+            }
           }
       }
     }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
index 322f8fb..5ac5bce 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
@@ -93,7 +93,7 @@ public class BeamMinusRelTest extends BaseRelTest {
                     Schema.FieldType.INT64, "order_id",
                     Schema.FieldType.INT32, "site_id",
                     Schema.FieldType.DECIMAL, "price")
-                .addRows(4L, 4, new BigDecimal(4.0))
+                .addRows(1L, 1, new BigDecimal(1.0), 4L, 4, new BigDecimal(4.0))
                 .getRows());
 
     pipeline.run();
@@ -110,7 +110,7 @@ public class BeamMinusRelTest extends BaseRelTest {
             + "FROM ORDER_DETAILS2 ";
 
     PCollection<Row> rows = compilePipeline(sql, pipeline);
-    PAssert.that(rows).satisfies(new CheckSize(2));
+    PAssert.that(rows).satisfies(new CheckSize(3));
 
     PAssert.that(rows)
         .containsInAnyOrder(
@@ -118,7 +118,16 @@ public class BeamMinusRelTest extends BaseRelTest {
                     Schema.FieldType.INT64, "order_id",
                     Schema.FieldType.INT32, "site_id",
                     Schema.FieldType.DECIMAL, "price")
-                .addRows(4L, 4, new BigDecimal(4.0), 4L, 4, new BigDecimal(4.0))
+                .addRows(
+                    1L,
+                    1,
+                    new BigDecimal(1.0),
+                    4L,
+                    4,
+                    new BigDecimal(4.0),
+                    4L,
+                    4,
+                    new BigDecimal(4.0))
                 .getRows());
 
     pipeline.run();