You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/06/02 03:55:16 UTC

[beam] branch master updated: [BEAM-9825] | Implement Intersect, Union, Except transforms (#11610)

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 01c11e7  [BEAM-9825] | Implement Intersect,Union,Except transforms (#11610)
01c11e7 is described below

commit 01c11e7211937bde3c238fe3639f9dfe7774d093
Author: darshanj <da...@thoughtworks.com>
AuthorDate: Tue Jun 2 11:54:54 2020 +0800

    [BEAM-9825] | Implement Intersect,Union,Except transforms (#11610)
    
    * [BEAM-9825] | Implement Intersect,Union,Except transforms
---
 .../java/org/apache/beam/sdk/transforms/Sets.java  | 680 +++++++++++++++++++++
 .../org/apache/beam/sdk/transforms/SetsTest.java   | 324 ++++++++++
 .../sql/impl/rel/BeamSetOperatorRelBase.java       |  43 +-
 3 files changed, 1027 insertions(+), 20 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sets.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sets.java
new file mode 100644
index 0000000..16a7281
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sets.java
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/**
+ * The {@code PTransform}s that allow to compute different set functions across {@link
+ * PCollection}s.
+ *
+ * <p>They come in two variants. 1. Between two {@link PCollection} 2. Between two or more {@link
+ * PCollection} in a {@link PCollectionList}.
+ *
+ * <p>Following {@code PTransform}s follows SET DISTINCT semantics: intersectDistinct,
+ * expectDistinct, unionDistinct
+ *
+ * <p>Following {@code PTransform}s follows SET ALL semantics: intersectAll, expectAll, unionAll
+ *
+ * <p>For example, the following demonstrates intersectDistinct between two collections {@link
+ * PCollection}s.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> left = p.apply(Create.of("1", "2", "3", "3", "4", "5"));
+ * PCollection<String> right = p.apply(Create.of("1", "3", "4", "4", "6"));
+ *
+ * PCollection<String> results =
+ *     left.apply(SetFns.intersectDistinct(right)); // results will be PCollection<String> containing: "1","3","4"
+ *
+ * }</pre>
+ *
+ * <p>For example, the following demonstrates intersectDistinct between three collections {@link
+ * PCollection}s in a {@link PCollectionList}.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> first = p.apply(Create.of("1", "2", "3", "3", "4", "5"));
+ * PCollection<String> second = p.apply(Create.of("1", "3", "4", "4", "6"));
+ * PCollection<String> third = p.apply(Create.of("3", "4", "4"));
+ *
+ * // Following example will perform (first intersect second) intersect third.
+ * PCollection<String> results =
+ *     PCollectionList.of(first).and(second).and(third)
+ *     .apply(SetFns.intersectDistinct()); // results will be PCollection<String> containing: "3","4"
+ *
+ * }</pre>
+ */
+public class Sets {
+
+  /**
+   * Returns a new {@code PTransform} transform that follows SET DISTINCT semantics to compute the
+   * intersection with provided {@code PCollection<T>}.
+   *
+   * <p>The argument should not be modified after this is called.
+   *
+   * <p>The elements of the output {@link PCollection} will all distinct elements that present in
+   * both pipeline is constructed and provided {@link PCollection}.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}). Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the input {@code PCollection<T>}
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   *
+   * PCollection<String> left = p.apply(Create.of("1", "2", "3", "3", "4", "5"));
+   * PCollection<String> right = p.apply(Create.of("1", "3", "4", "4", "6"));
+   *
+   * PCollection<String> results =
+   *     left.apply(SetFns.intersectDistinct(right)); // results will be PCollection<String> containing: "1","3","4"
+   *
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input and output {@code PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> intersectDistinct(
+      PCollection<T> rightCollection) {
+    checkNotNull(rightCollection, "rightCollection argument is null");
+    return new SetImpl<>(rightCollection, intersectDistinct());
+  }
+
+  /**
+   * Returns a {@code PTransform} that takes a {@code PCollectionList<PCollection<T>>} and returns a
+   * {@code PCollection<T>} containing the intersection of collections done in order for all
+   * collections in {@code PCollectionList<T>}.
+   *
+   * <p>Returns a new {@code PTransform} transform that follows SET DISTINCT semantics which takes a
+   * {@code PCollectionList<PCollection<T>>} and returns a {@code PCollection<T>} containing the
+   * intersection of collections done in order for all collections in {@code PCollectionList<T>}.
+   *
+   * <p>The elements of the output {@link PCollection} will have all distinct elements that are
+   * present in both pipeline is constructed and next {@link PCollection} in the list and applied to
+   * all collections in order.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the first {@code PCollection<T>} in {@code PCollectionList<T>}.
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   *
+   * PCollection<String> first = p.apply(Create.of("1", "2", "3", "3", "4", "5"));
+   * PCollection<String> second = p.apply(Create.of("1", "3", "4", "4", "6"));
+   * PCollection<String> third = p.apply(Create.of("3", "4", "4"));
+   *
+   * // Following example will perform (first intersect second) intersect third.
+   * PCollection<String> results =
+   *     PCollectionList.of(first).and(second).and(third)
+   *     .apply(SetFns.intersectDistinct()); // results will be PCollection<String> containing: "3","4"
+   *
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input {@code PCollectionList<T>} and output {@code
+   *     PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollectionList<T>, PCollection<T>> intersectDistinct() {
+    SerializableBiFunction<Long, Long, Long> intersectFn =
+        (numberOfElementsinLeft, numberOfElementsinRight) ->
+            (numberOfElementsinLeft > 0 && numberOfElementsinRight > 0) ? 1L : 0L;
+    return new SetImplCollections<>(intersectFn);
+  }
+
+  /**
+   * Returns a new {@code PTransform} transform that follows SET ALL semantics to compute the
+   * intersection with provided {@code PCollection<T>}.
+   *
+   * <p>The argument should not be modified after this is called.
+   *
+   * <p>The elements of the output {@link PCollection} which will follow INTESECT_ALL Semantics as
+   * follows: Given there are m elements on pipeline which is constructed {@link PCollection} (left)
+   * and n elements on in provided {@link PCollection} (right): - it will output MIN(m - n, 0)
+   * elements of left for all elements which are present in both left and right.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the input {@code PCollection<T>}
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   *
+   * PCollection<String> left = p.apply(Create.of("1", "1", "1", "2", "3", "3", "4", "5"));
+   * PCollection<String> right = p.apply(Create.of("1", "1", "3", "4", "4", "6"));
+   *
+   * PCollection<String> results =
+   *     left.apply(SetFns.intersectAll(right)); // results will be PCollection<String> containing: "1","1","3","4"
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input and output {@code PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> intersectAll(
+      PCollection<T> rightCollection) {
+    checkNotNull(rightCollection, "rightCollection argument is null");
+    return new SetImpl<>(rightCollection, intersectAll());
+  }
+
+  /**
+   * Returns a new {@code PTransform} transform that follows SET ALL semantics which takes a {@code
+   * PCollectionList<PCollection<T>>} and returns a {@code PCollection<T>} containing the
+   * intersection all of collections done in order for all collections in {@code
+   * PCollectionList<T>}.
+   *
+   * <p>The elements of the output {@link PCollection} which will follow INTERSECT_ALL semantics.
+   * Output is calculated as follows: Given there are m elements on pipeline which is constructed
+   * {@link PCollection} (left) and n elements on in provided {@link PCollection} (right): - it will
+   * output MIN(m - n, 0) elements of left for all elements which are present in both left and
+   * right.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the first {@code PCollection<T>} in {@code PCollectionList<T>}.
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   * PCollection<String> first = p.apply(Create.of("1", "1", "1", "2", "3", "3", "4", "5"));
+   * PCollection<String> second = p.apply(Create.of("1", "1", "3", "4", "4", "6"));
+   * PCollection<String> third = p.apply(Create.of("1", "5"));
+   *
+   * // Following example will perform (first intersect second) intersect third.
+   * PCollection<String> results =
+   *     PCollectionList.of(first).and(second).and(third)
+   *     .apply(SetFns.intersectAll()); // results will be PCollection<String> containing: "1"
+   *
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input {@code PCollectionList<T>} and output {@code
+   *     PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollectionList<T>, PCollection<T>> intersectAll() {
+    return new SetImplCollections<>(Math::min);
+  }
+
+  /**
+   * Returns a new {@code PTransform} transform that follows SET DISTINCT semantics to compute the
+   * difference (except) with provided {@code PCollection<T>}.
+   *
+   * <p>The argument should not be modified after this is called.
+   *
+   * <p>The elements of the output {@link PCollection} will all distinct elements that present in
+   * pipeline is constructed but not present in provided {@link PCollection}.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the input {@code PCollection<T>}
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   *
+   * PCollection<String> left = p.apply(Create.of("1", "1", "1", "2", "3", "3","4", "5"));
+   * PCollection<String> right = p.apply(Create.of("1", "1", "3", "4", "4", "6"));
+   *
+   * PCollection<String> results =
+   *     left.apply(SetFns.exceptDistinct(right)); // results will be PCollection<String> containing: "2","5"
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input and output {@code PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> exceptDistinct(
+      PCollection<T> rightCollection) {
+    checkNotNull(rightCollection, "rightCollection argument is null");
+    return new SetImpl<>(rightCollection, exceptDistinct());
+  }
+
+  /**
+   * Returns a {@code PTransform} that takes a {@code PCollectionList<PCollection<T>>} and returns a
+   * {@code PCollection<T>} containing the difference (except) of collections done in order for all
+   * collections in {@code PCollectionList<T>}.
+   *
+   * <p>Returns a new {@code PTransform} transform that follows SET DISTINCT semantics which takes a
+   * {@code PCollectionList<PCollection<T>>} and returns a {@code PCollection<T>} containing the
+   * difference (except) of collections done in order for all collections in {@code
+   * PCollectionList<T>}.
+   *
+   * <p>The elements of the output {@link PCollection} will have all distinct elements that are
+   * present in pipeline is constructed but not present in next {@link PCollection} in the list and
+   * applied to all collections in order.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the first {@code PCollection<T>} in {@code PCollectionList<T>}.
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   * PCollection<String> first = p.apply(Create.of("1", "1", "1", "2", "3", "3", "4", "5"));
+   * PCollection<String> second = p.apply(Create.of("1", "1", "3", "4", "4", "6"));
+   *
+   * PCollection<String> third = p.apply(Create.of("1", "2", "2"));
+   *
+   * // Following example will perform (first intersect second) intersect third.
+   * PCollection<String> results =
+   *     PCollectionList.of(first).and(second).and(third)
+   *     .apply(SetFns.exceptDistinct()); // results will be PCollection<String> containing: "5"
+   *
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input {@code PCollectionList<T>} and output {@code
+   *     PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollectionList<T>, PCollection<T>> exceptDistinct() {
+    SerializableBiFunction<Long, Long, Long> exceptFn =
+        (numberOfElementsinLeft, numberOfElementsinRight) ->
+            numberOfElementsinLeft > 0 && numberOfElementsinRight == 0 ? 1L : 0L;
+    return new SetImplCollections<>(exceptFn);
+  }
+
+  /**
+   * Returns a new {@code PTransform} transform that follows SET ALL semantics to compute the
+   * difference all (exceptAll) with provided {@code PCollection<T>}.
+   *
+   * <p>The argument should not be modified after this is called.
+   *
+   * <p>The elements of the output {@link PCollection} which will follow EXCEPT_ALL Semantics as
+   * follows: Given there are m elements on pipeline which is constructed {@link PCollection} (left)
+   * and n elements on in provided {@link PCollection} (right): - it will output m elements of left
+   * for all elements which are present in left but not in right. - it will output MAX(m - n, 0)
+   * elements of left for all elements which are present in both left and right.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the input {@code PCollection<T>}
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   *
+   * PCollection<String> left = p.apply(Create.of("1", "1", "1", "2", "3", "3", "3", "4", "5"));
+   * PCollection<String> right = p.apply(Create.of("1", "3", "4", "4", "6"));
+   *
+   * PCollection<String> results =
+   *     left.apply(SetFns.exceptAll(right)); // results will be PCollection<String> containing: "1","1","2","3","3","5"
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input and output {@code PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> exceptAll(
+      PCollection<T> rightCollection) {
+    checkNotNull(rightCollection, "rightCollection argument is null");
+    return new SetImpl<>(rightCollection, exceptAll());
+  }
+
+  /**
+   * Returns a new {@code PTransform} transform that follows SET ALL semantics which takes a {@code
+   * PCollectionList<PCollection<T>>} and returns a {@code PCollection<T>} containing the difference
+   * all (exceptAll) of collections done in order for all collections in {@code PCollectionList<T>}.
+   *
+   * <p>The elements of the output {@link PCollection} which will follow EXCEPT_ALL semantics.
+   * Output is calculated as follows: Given there are m elements on pipeline which is constructed
+   * {@link PCollection} (left) and n elements on in provided {@link PCollection} (right): - it will
+   * output m elements of left for all elements which are present in left but not in right. - it
+   * will output MAX(m - n, 0) elements of left for all elements which are present in both left and
+   * right.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the first {@code PCollection<T>} in {@code PCollectionList<T>}.
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   * PCollection<String> first = p.apply(Create.of("1", "1", "1", "2", "3", "3", "3", "4", "5"));
+   * PCollection<String> second = p.apply(Create.of("1", "3", "4", "4", "6"));
+   * PCollection<String> third = p.apply(Create.of("1", "5"));
+   *
+   * // Following example will perform (first intersect second) intersect third.
+   * PCollection<String> results =
+   *     PCollectionList.of(first).and(second).and(third)
+   *     .apply(SetFns.exceptAll()); // results will be PCollection<String> containing: "1","2","3","3"
+   *
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input {@code PCollectionList<T>} and output {@code
+   *     PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollectionList<T>, PCollection<T>> exceptAll() {
+    SerializableBiFunction<Long, Long, Long> exceptFn =
+        (numberOfElementsinLeft, numberOfElementsinRight) ->
+            Math.max(numberOfElementsinLeft - numberOfElementsinRight, 0L);
+    return new SetImplCollections<>(exceptFn);
+  }
+
+  /**
+   * Returns a new {@code PTransform} transform that follows SET DISTINCT semantics to compute the
+   * union with provided {@code PCollection<T>}.
+   *
+   * <p>The argument should not be modified after this is called.
+   *
+   * <p>The elements of the output {@link PCollection} will all distinct elements that present in
+   * pipeline is constructed or present in provided {@link PCollection}.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the input {@code PCollection<T>}
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   *
+   * PCollection<String> left = p.apply(Create.of("1", "1", "2"));
+   * PCollection<String> right = p.apply(Create.of("1", "3", "4", "4"));
+   *
+   * PCollection<String> results =
+   *     left.apply(SetFns.unionDistinct(right)); // results will be PCollection<String> containing: "1","2","3","4"
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input and output {@code PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> unionDistinct(
+      PCollection<T> rightCollection) {
+    checkNotNull(rightCollection, "rightCollection argument is null");
+    return new SetImpl<>(rightCollection, unionDistinct());
+  }
+
+  /**
+   * Returns a new {@code PTransform} transform that follows SET DISTINCT semantics which takes a
+   * {@code PCollectionList<PCollection<T>>} and returns a {@code PCollection<T>} containing the
+   * union of collections done in order for all collections in {@code PCollectionList<T>}.
+   *
+   * <p>The elements of the output {@link PCollection} will have all distinct elements that are
+   * present in pipeline is constructed or present in next {@link PCollection} in the list and
+   * applied to all collections in order.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the first {@code PCollection<T>} in {@code PCollectionList<T>}.
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   * PCollection<String> first = p.apply(Create.of("1", "1", "2"));
+   * PCollection<String> second = p.apply(Create.of("1", "3", "4", "4"));
+   *
+   * PCollection<String> third = p.apply(Create.of("1", "5"));
+   *
+   * // Following example will perform (first intersect second) intersect third.
+   * PCollection<String> results =
+   *     PCollectionList.of(first).and(second).and(third)
+   *     .apply(SetFns.unionDistinct()); // results will be PCollection<String> containing: "1","2","3","4","5"
+   *
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input {@code PCollectionList<T>} and output {@code
+   *     PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollectionList<T>, PCollection<T>> unionDistinct() {
+    SerializableBiFunction<Long, Long, Long> unionFn =
+        (numberOfElementsinLeft, numberOfElementsinRight) -> 1L;
+    return new SetImplCollections<>(unionFn);
+  }
+
+  /**
+   * Returns a new {@code PTransform} transform that follows SET ALL semantics to compute the
+   * unionAll with provided {@code PCollection<T>}.
+   *
+   * <p>The argument should not be modified after this is called.
+   *
+   * <p>The elements of the output {@link PCollection} which will follow UNION_ALL semantics as
+   * follows: Given there are m elements on pipeline which is constructed {@link PCollection} (left)
+   * and n elements on in provided {@link PCollection} (right): - it will output m elements of left
+   * and m elements of right.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all {@code PCollection<T>}
+   * to be deterministic (see {@link Coder#verifyDeterministic()}). If the collection {@code Coder}
+   * is not deterministic, an exception is thrown at pipeline construction time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the input {@code PCollection<T>}
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   *
+   * PCollection<String> left = p.apply(Create.of("1", "1", "2"));
+   * PCollection<String> right = p.apply(Create.of("1", "3", "4", "4"));
+   *
+   * PCollection<String> results =
+   *     left.apply(SetFns.unionAll(right)); // results will be PCollection<String> containing: "1","1","1","2","3","4","4"
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input and output {@code PCollection<T>}s.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> unionAll(
+      PCollection<T> rightCollection) {
+    checkNotNull(rightCollection, "rightCollection argument is null");
+    return new SetImpl<>(rightCollection, unionAll());
+  }
+
+  /**
+   * Returns a new {@code PTransform} transform that follows SET ALL semantics which takes a {@code
+   * PCollectionList<PCollection<T>>} and returns a {@code PCollection<T>} containing the unionAll
+   * of collections done in order for all collections in {@code PCollectionList<T>}.
+   *
+   * <p>The elements of the output {@link PCollection} which will follow UNION_ALL semantics. Output
+   * is calculated as follows: Given there are m elements on pipeline which is constructed {@link
+   * PCollection} (left) and n elements on in provided {@link PCollection} (right): - it will output
+   * m elements of left and m elements of right.
+   *
+   * <p>Note that this transform requires that the {@code Coder} of the all inputs {@code
+   * PCollection<T>} to be deterministic (see {@link Coder#verifyDeterministic()}). If the
+   * collection {@code Coder} is not deterministic, an exception is thrown at pipeline construction
+   * time.
+   *
+   * <p>All inputs must have equal {@link WindowFn}s and compatible triggers (see {@link
+   * Trigger#isCompatible(Trigger)}).Triggers with multiple firings may lead to nondeterministic
+   * results since the this {@code PTransform} is only computed over each individual firing.
+   *
+   * <p>By default, the output {@code PCollection<T>} encodes its elements using the same {@code
+   * Coder} as that of the first {@code PCollection<T>} in {@code PCollectionList<T>}.
+   *
+   * <pre>{@code
+   * Pipeline p = ...;
+   * PCollection<String> first = p.apply(Create.of("1", "1", "2"));
+   * PCollection<String> second = p.apply(Create.of("1", "3", "4", "4"));
+   * PCollection<String> third = p.apply(Create.of("1", "5"));
+   *
+   * // Following example will perform (first intersect second) intersect third.
+   * PCollection<String> results =
+   *     PCollectionList.of(first).and(second).and(third)
+   *     .apply(SetFns.unionAll()); // results will be PCollection<String> containing: "1","1","1","1","2","3","4","4","5"
+   *
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input {@code PCollectionList<T>} and output {@code
+   *     PCollection<T>}s.
+   */
+  public static <T> Flatten.PCollections<T> unionAll() {
+    return Flatten.pCollections();
+  }
+
+  private static class SetImpl<T> extends PTransform<PCollection<T>, PCollection<T>> {
+
+    private final transient PCollection<T> rightCollection;
+    private final PTransform<PCollectionList<T>, PCollection<T>> listTransformFn;
+
+    private SetImpl(
+        PCollection<T> rightCollection,
+        PTransform<PCollectionList<T>, PCollection<T>> listTransformFn) {
+      this.rightCollection = rightCollection;
+      this.listTransformFn = listTransformFn;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> leftCollection) {
+      return PCollectionList.of(leftCollection).and(rightCollection).apply(listTransformFn);
+    }
+  }
+
+  private static class SetImplCollections<T>
+      extends PTransform<PCollectionList<T>, PCollection<T>> {
+
+    private final SerializableBiFunction<Long, Long, Long> fn;
+
+    private SetImplCollections(SerializableBiFunction<Long, Long, Long> fn) {
+      this.fn = fn;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollectionList<T> input) {
+      List<PCollection<T>> all = input.getAll();
+      MapElements<T, KV<T, Void>> elementToVoid =
+          MapElements.via(
+              new SimpleFunction<T, KV<T, Void>>() {
+                @Override
+                public KV<T, Void> apply(T element) {
+                  return KV.of(element, null);
+                }
+              });
+
+      checkArgument(all.size() > 1, "must have at least two input to a PCollectionList");
+
+      PCollection<T> first = all.get(0);
+      Pipeline pipeline = first.getPipeline();
+      String firstName = first.getName();
+
+      List<TupleTag<Void>> allTags = new ArrayList<>();
+      KeyedPCollectionTuple<T> keyedPCollectionTuple = KeyedPCollectionTuple.empty(pipeline);
+
+      for (PCollection<T> col : all) {
+        TupleTag<Void> tag = new TupleTag<>();
+
+        PCollection<KV<T, Void>> kvOfElementAndVoid =
+            col.apply("PrepareKVs" + col.getName(), elementToVoid);
+
+        allTags.add(tag);
+        keyedPCollectionTuple = keyedPCollectionTuple.and(tag, kvOfElementAndVoid);
+      }
+
+      PCollection<KV<T, CoGbkResult>> coGbkResults =
+          keyedPCollectionTuple.apply("CBKAll" + firstName, CoGroupByKey.create());
+
+      // TODO: lift combiners through the CoGBK.
+      PCollection<T> results =
+          coGbkResults.apply(
+              "FilterSetElement" + firstName,
+              ParDo.of(
+                  new DoFn<KV<T, CoGbkResult>, T>() {
+
+                    @ProcessElement
+                    public void processElement(ProcessContext c) {
+                      KV<T, CoGbkResult> elementGroups = c.element();
+                      CoGbkResult value = elementGroups.getValue();
+                      T element = elementGroups.getKey();
+
+                      long numberOfOutputs = Iterables.size(value.getAll(allTags.get(0)));
+                      List<TupleTag<Void>> tail = allTags.subList(1, allTags.size());
+
+                      for (TupleTag<Void> tag : tail) {
+                        long nextSize = Iterables.size(value.getAll(tag));
+                        numberOfOutputs = fn.apply(numberOfOutputs, nextSize);
+                      }
+                      for (long i = 0L; i < numberOfOutputs; i++) {
+                        c.output(element);
+                      }
+                    }
+                  }));
+
+      return results.setCoder(first.getCoder());
+    }
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SetsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SetsTest.java
new file mode 100644
index 0000000..ed07a65
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SetsTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static junit.framework.TestCase.assertEquals;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SetsTest {
+
+  @Rule public final TestPipeline p = TestPipeline.create();
+
+  Schema schema = Schema.builder().addStringField("alphabet").build();
+
+  static PCollection<String> first;
+  static PCollection<String> second;
+  static PCollection<Row> firstRows;
+  static PCollection<Row> secondRows;
+
+  private Iterable<Row> toRows(String... values) {
+    return Iterables.transform(
+        Arrays.asList(values), (elem) -> Row.withSchema(schema).addValues(elem).build());
+  }
+
+  @Before
+  public void setup() {
+    final String[] firstData = {"a", "a", "a", "b", "b", "c", "d", "d", "g", "g", "h", "h"};
+    final String[] secondData = {"a", "a", "b", "b", "b", "c", "d", "d", "e", "e", "f", "f"};
+
+    first = p.apply("first", Create.of(Arrays.asList(firstData)));
+    second = p.apply("second", Create.of(Arrays.asList(secondData)));
+
+    firstRows = p.apply("firstRows", Create.of(toRows(firstData)).withRowSchema(schema));
+    secondRows = p.apply("secondRows", Create.of(toRows(secondData)).withRowSchema(schema));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testIntersection() {
+    PAssert.that(first.apply("strings", Sets.intersectDistinct(second)))
+        .containsInAnyOrder("a", "b", "c", "d");
+
+    PCollection<Row> results = firstRows.apply("rows", Sets.intersectDistinct(secondRows));
+
+    PAssert.that(results).containsInAnyOrder(toRows("a", "b", "c", "d"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testIntersectionCollectionList() {
+
+    PCollection<String> third = p.apply("third", Create.of(Arrays.asList("b", "b", "c", "f")));
+    PCollection<Row> thirdRows = p.apply("thirdRows", Create.of(toRows("b", "b", "c", "f")));
+
+    PAssert.that(
+            PCollectionList.of(first)
+                .and(second)
+                .and(third)
+                .apply("stringsCols", Sets.intersectDistinct()))
+        .containsInAnyOrder("b", "c");
+
+    PCollection<Row> results =
+        PCollectionList.of(firstRows)
+            .and(secondRows)
+            .and(thirdRows)
+            .apply("rowCols", Sets.intersectDistinct());
+
+    PAssert.that(results).containsInAnyOrder(toRows("b", "c"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testIntersectionAll() {
+
+    PAssert.that(first.apply("strings", Sets.intersectAll(second)))
+        .containsInAnyOrder("a", "a", "b", "b", "c", "d", "d");
+
+    PCollection<Row> results = firstRows.apply("rows", Sets.intersectAll(secondRows));
+
+    PAssert.that(results).containsInAnyOrder(toRows("a", "a", "b", "b", "c", "d", "d"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testIntersectionAllCollectionList() {
+    PCollection<String> third = p.apply("third", Create.of(Arrays.asList("a", "b", "f")));
+    PCollection<Row> thirdRows = p.apply("thirdRows", Create.of(toRows("a", "b", "f")));
+
+    PAssert.that(
+            PCollectionList.of(first)
+                .and(second)
+                .and(third)
+                .apply("stringsCols", Sets.intersectAll()))
+        .containsInAnyOrder("a", "b");
+
+    PCollection<Row> results =
+        PCollectionList.of(firstRows)
+            .and(secondRows)
+            .and(thirdRows)
+            .apply("rowCols", Sets.intersectAll());
+
+    PAssert.that(results).containsInAnyOrder(toRows("a", "b"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testExcept() {
+
+    PAssert.that(first.apply("strings", Sets.exceptDistinct(second))).containsInAnyOrder("g", "h");
+
+    PCollection<Row> results = firstRows.apply("rows", Sets.exceptDistinct(secondRows));
+
+    PAssert.that(results).containsInAnyOrder(toRows("g", "h"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testExceptCollectionList() {
+    PCollection<String> third = p.apply("third", Create.of(Arrays.asList("a", "b", "b", "g", "g")));
+    PCollection<Row> thirdRows = p.apply("thirdRows", Create.of(toRows("a", "b", "b", "g", "g")));
+
+    PAssert.that(
+            PCollectionList.of(first)
+                .and(second)
+                .and(third)
+                .apply("stringsCols", Sets.exceptDistinct()))
+        .containsInAnyOrder("h");
+
+    PCollection<Row> results =
+        PCollectionList.of(firstRows)
+            .and(secondRows)
+            .and(thirdRows)
+            .apply("rowCols", Sets.exceptDistinct());
+
+    PAssert.that(results).containsInAnyOrder(toRows("h"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testExceptAll() {
+
+    PAssert.that(first.apply("strings", Sets.exceptAll(second)))
+        .containsInAnyOrder("a", "g", "g", "h", "h");
+
+    PCollection<Row> results = firstRows.apply("rows", Sets.exceptAll(secondRows));
+
+    PAssert.that(results).containsInAnyOrder(toRows("a", "g", "g", "h", "h"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testExceptAllCollectionList() {
+    PCollection<String> third = p.apply("third", Create.of(Arrays.asList("a", "b", "b", "g", "f")));
+    PCollection<Row> thirdRows = p.apply("thirdRows", Create.of(toRows("a", "b", "b", "g")));
+
+    PAssert.that(
+            PCollectionList.of(first).and(second).and(third).apply("stringsCols", Sets.exceptAll()))
+        .containsInAnyOrder("g", "h", "h");
+
+    PCollection<Row> results =
+        PCollectionList.of(firstRows)
+            .and(secondRows)
+            .and(thirdRows)
+            .apply("rowCols", Sets.exceptAll());
+
+    PAssert.that(results).containsInAnyOrder(toRows("g", "h", "h"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testUnion() {
+
+    PAssert.that(first.apply("strings", Sets.unionDistinct(second)))
+        .containsInAnyOrder("a", "b", "c", "d", "e", "f", "g", "h");
+
+    PCollection<Row> results = firstRows.apply("rows", Sets.unionDistinct(secondRows));
+
+    PAssert.that(results).containsInAnyOrder(toRows("a", "b", "c", "d", "e", "f", "g", "h"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testUnionCollectionList() {
+    PCollection<String> third = p.apply("third", Create.of(Arrays.asList("a", "k", "k")));
+    PCollection<Row> thirdRows = p.apply("thirdRows", Create.of(toRows("a", "k", "k")));
+
+    PAssert.that(
+            PCollectionList.of(first)
+                .and(second)
+                .and(third)
+                .apply("stringsCols", Sets.unionDistinct()))
+        .containsInAnyOrder("a", "b", "c", "d", "e", "f", "g", "h", "k");
+
+    PCollection<Row> results =
+        PCollectionList.of(firstRows)
+            .and(secondRows)
+            .and(thirdRows)
+            .apply("rowCols", Sets.unionDistinct());
+
+    PAssert.that(results).containsInAnyOrder(toRows("a", "b", "c", "d", "e", "f", "g", "h", "k"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testUnionAll() {
+
+    PAssert.that(first.apply("strings", Sets.unionAll(second)))
+        .containsInAnyOrder(
+            "a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "c", "c", "d", "d", "d", "d", "e",
+            "e", "f", "f", "g", "g", "h", "h");
+
+    PCollection<Row> results = firstRows.apply("rows", Sets.unionAll(secondRows));
+
+    PAssert.that(results)
+        .containsInAnyOrder(
+            toRows(
+                "a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "c", "c", "d", "d", "d", "d", "e",
+                "e", "f", "f", "g", "g", "h", "h"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testUnionAllCollections() {
+
+    PCollection<String> third = p.apply("third", Create.of(Arrays.asList("a", "b", "b", "k", "k")));
+    PCollection<Row> thirdRows = p.apply("thirdRows", Create.of(toRows("a", "b", "b", "k", "k")));
+
+    PAssert.that(
+            PCollectionList.of(first).and(second).and(third).apply("stringsCols", Sets.unionAll()))
+        .containsInAnyOrder(
+            "a", "a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "b", "b", "c", "c", "d", "d",
+            "d", "d", "e", "e", "f", "f", "g", "g", "h", "h", "k", "k");
+
+    PCollection<Row> results =
+        PCollectionList.of(firstRows)
+            .and(secondRows)
+            .and(thirdRows)
+            .apply("rowCols", Sets.unionAll());
+
+    PAssert.that(results)
+        .containsInAnyOrder(
+            toRows(
+                "a", "a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "b", "b", "c", "c", "d", "d",
+                "d", "d", "e", "e", "f", "f", "g", "g", "h", "h", "k", "k"));
+
+    assertEquals(schema, results.getSchema());
+
+    p.run();
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
index ab76a7b..bdc5f4c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -20,15 +20,11 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
-import org.apache.beam.sdk.schemas.transforms.CoGroup;
-import org.apache.beam.sdk.schemas.transforms.CoGroup.By;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sets;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 
 /**
@@ -75,20 +71,27 @@ public class BeamSetOperatorRelBase extends PTransform<PCollectionList<Row>, PCo
               + rightWindow);
     }
 
-    // TODO: We may want to preaggregate the counts first using Group instead of calling CoGroup and
-    // measuring the
-    // iterable size. If on average there are duplicates in the input, this will be faster.
-    final String lhsTag = "lhs";
-    final String rhsTag = "rhs";
-    PCollection<Row> joined =
-        PCollectionTuple.of(lhsTag, leftRows, rhsTag, rightRows)
-            .apply("CoGroup", CoGroup.join(By.fieldNames("*")));
-    return joined
-        .apply(
-            "FilterResults",
-            ParDo.of(
-                new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(
-                    lhsTag, rhsTag, opType, all)))
-        .setRowSchema(joined.getSchema().getField("key").getType().getRowSchema());
+    switch (opType) {
+      case UNION:
+        if (all) {
+          return leftRows.apply(Sets.unionAll(rightRows));
+        } else {
+          return leftRows.apply(Sets.unionDistinct(rightRows));
+        }
+      case INTERSECT:
+        if (all) {
+          return leftRows.apply(Sets.intersectAll(rightRows));
+        } else {
+          return leftRows.apply(Sets.intersectDistinct(rightRows));
+        }
+      case MINUS:
+        if (all) {
+          return leftRows.apply(Sets.exceptAll(rightRows));
+        } else {
+          return leftRows.apply(Sets.exceptDistinct(rightRows));
+        }
+      default:
+        throw new IllegalStateException("Unexpected set operation value: " + opType);
+    }
   }
 }