You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/21 00:15:54 UTC

[GitHub] [beam] apilloud commented on a change in pull request #11682: [BEAM-9946] | added new api in Partition Transform

apilloud commented on a change in pull request #11682:
URL: https://github.com/apache/beam/pull/11682#discussion_r428371418



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##########
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn<T> partitionDoFn) {
 
   private static class PartitionDoFn<X> extends DoFn<X, Void> {
     private final int numPartitions;
-    private final PartitionFn<? super X> partitionFn;
     private final TupleTagList outputTags;
+    private Contextful<Contextful.Fn<X, Integer>> ctxFn;

Review comment:
       nit: this can be `final`

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##########
@@ -85,7 +141,14 @@
    * @throws IllegalArgumentException if {@code numPartitions <= 0}
    */
   public static <T> Partition<T> of(int numPartitions, PartitionFn<? super T> partitionFn) {
-    return new Partition<>(new PartitionDoFn<T>(numPartitions, partitionFn));
+
+    Contextful ctfFn =
+        Contextful.fn(
+            (T element, Contextful.Fn.Context c) ->
+                partitionFn.partitionFor(element, numPartitions),
+            Requirements.empty());
+    Object aClass = partitionFn;

Review comment:
       This is a no-op, please remove.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##########
@@ -76,6 +93,45 @@
     int partitionFor(T elem, int numPartitions);
   }
 
+  /**
+   * A function object that chooses an output partition for an element.
+   *
+   * @param <T> the type of the elements being partitioned
+   */
+  public interface PartitionWithSideInputsFn<T> extends Serializable {
+    /**
+     * Chooses the partition into which to put the given element.
+     *
+     * @param elem the element to be partitioned
+     * @param numPartitions the total number of partitions ({@code >= 1})
+     * @param c the {@link Contextful.Fn.Context} needed to access sideInputs.
+     * @return index of the selected partition (in the range {@code [0..numPartitions-1]})
+     */
+    int partitionFor(T elem, int numPartitions, Contextful.Fn.Context c);
+  }
+
+  /**
+   * Returns a new {@code Partition} {@code PTransform} that divides its input {@code PCollection}
+   * into the given number of partitions, using the given partitioning function.
+   *
+   * @param numPartitions the number of partitions to divide the input {@code PCollection} into
+   * @param partitionFn the function to invoke on each element to choose its output partition
+   * @param requirements the {@link Requirements} needed to run it.
+   * @throws IllegalArgumentException if {@code numPartitions <= 0}
+   */
+  public static <T> Partition<T> of(
+      int numPartitions,
+      PartitionWithSideInputsFn<? super T> partitionFn,
+      Requirements requirements) {
+    Contextful ctfFn =
+        Contextful.fn(
+            (T element, Contextful.Fn.Context c) ->
+                partitionFn.partitionFor(element, numPartitions, c),
+            requirements);
+    Object aClass = partitionFn;

Review comment:
       This is a no-op, please remove.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##########
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn<T> partitionDoFn) {
 
   private static class PartitionDoFn<X> extends DoFn<X, Void> {
     private final int numPartitions;
-    private final PartitionFn<? super X> partitionFn;
     private final TupleTagList outputTags;
+    private Contextful<Contextful.Fn<X, Integer>> ctxFn;
+    private Object originalFnForDisplayData;
 
     /**
      * Constructs a PartitionDoFn.
      *
      * @throws IllegalArgumentException if {@code numPartitions <= 0}
      */
-    public PartitionDoFn(int numPartitions, PartitionFn<? super X> partitionFn) {
+    public PartitionDoFn(

Review comment:
       nit: drop the `public`.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##########
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn<T> partitionDoFn) {
 
   private static class PartitionDoFn<X> extends DoFn<X, Void> {
     private final int numPartitions;
-    private final PartitionFn<? super X> partitionFn;
     private final TupleTagList outputTags;
+    private Contextful<Contextful.Fn<X, Integer>> ctxFn;
+    private Object originalFnForDisplayData;

Review comment:
       nit: this can be `final`

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
##########
@@ -124,21 +190,26 @@ private Partition(PartitionDoFn<T> partitionDoFn) {
 
   private static class PartitionDoFn<X> extends DoFn<X, Void> {
     private final int numPartitions;
-    private final PartitionFn<? super X> partitionFn;
     private final TupleTagList outputTags;
+    private Contextful<Contextful.Fn<X, Integer>> ctxFn;
+    private Object originalFnForDisplayData;
 
     /**
      * Constructs a PartitionDoFn.
      *
      * @throws IllegalArgumentException if {@code numPartitions <= 0}
      */
-    public PartitionDoFn(int numPartitions, PartitionFn<? super X> partitionFn) {
+    public PartitionDoFn(
+        int numPartitions,
+        Contextful<Contextful.Fn<X, Integer>> ctxFn,
+        Object originalFnForDisplayData) {

Review comment:
       How about just passing the class here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org