You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:38 UTC

[20/50] incubator-beam git commit: Move shared DelegatingAggregator out of OldDoFn

Move shared DelegatingAggregator out of OldDoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/139437bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/139437bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/139437bd

Branch: refs/heads/apex-runner
Commit: 139437bdca8872a11f6a87a9f54347985523faf2
Parents: 0d500ef
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:45:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/DelegatingAggregator.java    | 125 +++++++++++++++++++
 .../org/apache/beam/sdk/transforms/DoFn.java    |  30 ++---
 .../org/apache/beam/sdk/transforms/OldDoFn.java |  97 --------------
 .../DoFnDelegatingAggregatorTest.java           |   5 +-
 4 files changed, 142 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
new file mode 100644
index 0000000..d92bb71
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.MoreObjects;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * An {@link Aggregator} that delegates calls to {@link #addValue} to another aggregator.
+ *
+ * <p>This {@link Aggregator} is designed to be constructed without a delegate, at pipeline
+ * construction time, and serialized within a {@link DoFn}. The delegate aggregator to which it
+ * submits values must be provided by the runner at execution time.
+ *
+ * @param <AggInputT> the type of input element
+ * @param <AggOutputT> the type of output element
+ */
+class DelegatingAggregator<AggInputT, AggOutputT>
+    implements Aggregator<AggInputT, AggOutputT>, Serializable {
+  private final UUID id;
+
+  private final String name;
+
+  private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
+
+  private Aggregator<AggInputT, ?> delegate;
+
+  public DelegatingAggregator(String name,
+      CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+    this.id = UUID.randomUUID();
+    this.name = checkNotNull(name, "name cannot be null");
+    // Safe contravariant cast
+    @SuppressWarnings("unchecked")
+    CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
+        (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
+    this.combineFn = specificCombiner;
+  }
+
+  @Override
+  public void addValue(AggInputT value) {
+    if (delegate == null) {
+      throw new IllegalStateException(
+          String.format(
+              "addValue cannot be called on Aggregator outside of the execution of a %s.",
+              DoFn.class.getSimpleName()));
+    } else {
+      delegate.addValue(value);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
+    return combineFn;
+  }
+
+  /**
+   * Sets the current delegate of the Aggregator.
+   *
+   * @param delegate the delegate to set in this aggregator
+   */
+  public void setDelegate(Aggregator<AggInputT, ?> delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(getClass())
+        .add("name", name)
+        .add("combineFn", combineFn)
+        .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, name, combineFn.getClass());
+  }
+
+  /**
+   * Indicates whether some other object is "equal to" this one.
+   *
+   * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
+   * CombineFns are the same class, and they have identical IDs.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+    if (o == null) {
+      return false;
+    }
+    if (o instanceof DelegatingAggregator) {
+      DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
+      return Objects.equals(this.id, that.id)
+          && Objects.equals(this.name, that.name)
+          && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 8b3aaf8..0531cbb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -775,31 +775,31 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   }
 
   /**
-   * Returns an {@link Aggregator} with aggregation logic specified by the
-   * {@link CombineFn} argument. The name provided must be unique across
-   * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created
-   * during pipeline construction.
+   * Returns an {@link Aggregator} with aggregation logic specified by the {@link CombineFn}
+   * argument. The name provided must be unique across {@link Aggregator}s created within the {@link
+   * DoFn}. Aggregators can only be created during pipeline construction.
    *
    * @param name the name of the aggregator
    * @param combiner the {@link CombineFn} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this {@link DoFn}
+   * @return an aggregator for the provided name and combiner in the scope of this {@link DoFn}
    * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
+   * @throws IllegalArgumentException if the given name collides with another aggregator in this
+   *     scope
    * @throws IllegalStateException if called during pipeline execution.
    */
-  public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-      createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+  public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+      String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
     checkNotNull(name, "name cannot be null");
     checkNotNull(combiner, "combiner cannot be null");
-    checkArgument(!aggregators.containsKey(name),
+    checkArgument(
+        !aggregators.containsKey(name),
         "Cannot create aggregator with name %s."
-        + " An Aggregator with that name already exists within this scope.",
+            + " An Aggregator with that name already exists within this scope.",
         name);
-    checkState(!aggregatorsAreFinal,
+    checkState(
+        !aggregatorsAreFinal,
         "Cannot create an aggregator during pipeline execution."
-        + " Aggregators should be registered during pipeline construction.");
+            + " Aggregators should be registered during pipeline construction.");
 
     DelegatingAggregator<AggInputT, AggOutputT> aggregator =
         new DelegatingAggregator<>(name, combiner);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 72c2965..b269f47 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -21,14 +21,11 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.MoreObjects;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -505,100 +502,6 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
   }
 
   /**
-   * An {@link Aggregator} that delegates calls to addValue to another
-   * aggregator.
-   *
-   * @param <AggInputT> the type of input element
-   * @param <AggOutputT> the type of output element
-   */
-  static class DelegatingAggregator<AggInputT, AggOutputT> implements
-      Aggregator<AggInputT, AggOutputT>, Serializable {
-    private final UUID id;
-
-    private final String name;
-
-    private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
-
-    private Aggregator<AggInputT, ?> delegate;
-
-    public DelegatingAggregator(String name,
-        CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-      this.id = UUID.randomUUID();
-      this.name = checkNotNull(name, "name cannot be null");
-      // Safe contravariant cast
-      @SuppressWarnings("unchecked")
-      CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
-          (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
-      this.combineFn = specificCombiner;
-    }
-
-    @Override
-    public void addValue(AggInputT value) {
-      if (delegate == null) {
-        throw new IllegalStateException(
-            "addValue cannot be called on Aggregator outside of the execution of a OldDoFn.");
-      } else {
-        delegate.addValue(value);
-      }
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
-      return combineFn;
-    }
-
-    /**
-     * Sets the current delegate of the Aggregator.
-     *
-     * @param delegate the delegate to set in this aggregator
-     */
-    public void setDelegate(Aggregator<AggInputT, ?> delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("name", name)
-          .add("combineFn", combineFn)
-          .toString();
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(id, name, combineFn.getClass());
-    }
-
-    /**
-     * Indicates whether some other object is "equal to" this one.
-     *
-     * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
-     * CombineFns are the same class, and they have identical IDs.
-     */
-    @Override
-    public boolean equals(Object o) {
-      if (o == this) {
-        return true;
-      }
-      if (o == null) {
-        return false;
-      }
-      if (o instanceof DelegatingAggregator) {
-        DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
-        return Objects.equals(this.id, that.id)
-            && Objects.equals(this.name, that.name)
-            && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
-      }
-      return false;
-    }
-  }
-
-  /**
    * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
    */
   private class AdaptedContext extends Context {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
index c072fd7..f51a6b0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,7 +34,7 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 /**
- * Tests for {@link OldDoFn.DelegatingAggregator}.
+ * Tests for {@link DelegatingAggregator}.
  */
 @RunWith(JUnit4.class)
 public class DoFnDelegatingAggregatorTest {
@@ -63,7 +62,7 @@ public class DoFnDelegatingAggregatorTest {
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("cannot be called");
-    thrown.expectMessage("OldDoFn");
+    thrown.expectMessage("DoFn");
 
     aggregator.addValue(21.2);
   }