You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/24 16:11:01 UTC
[07/14] incubator-beam git commit: Move shared DelegatingAggregator
out of OldDoFn
Move shared DelegatingAggregator out of OldDoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/139437bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/139437bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/139437bd
Branch: refs/heads/master
Commit: 139437bdca8872a11f6a87a9f54347985523faf2
Parents: 0d500ef
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:45:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700
----------------------------------------------------------------------
.../sdk/transforms/DelegatingAggregator.java | 125 +++++++++++++++++++
.../org/apache/beam/sdk/transforms/DoFn.java | 30 ++---
.../org/apache/beam/sdk/transforms/OldDoFn.java | 97 --------------
.../DoFnDelegatingAggregatorTest.java | 5 +-
4 files changed, 142 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
new file mode 100644
index 0000000..d92bb71
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.MoreObjects;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * An {@link Aggregator} that delegates calls to {@link #addValue} to another aggregator.
+ *
+ * <p>This {@link Aggregator} is designed to be constructed without a delegate, at pipeline
+ * construction time, and serialized within a {@link DoFn}. The delegate aggregator to which it
+ * submits values must be provided by the runner at execution time.
+ *
+ * @param <AggInputT> the type of input element
+ * @param <AggOutputT> the type of output element
+ */
+class DelegatingAggregator<AggInputT, AggOutputT>
+ implements Aggregator<AggInputT, AggOutputT>, Serializable {
+ private final UUID id;
+
+ private final String name;
+
+ private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
+
+ private Aggregator<AggInputT, ?> delegate;
+
+ public DelegatingAggregator(String name,
+ CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+ this.id = UUID.randomUUID();
+ this.name = checkNotNull(name, "name cannot be null");
+ // Safe contravariant cast
+ @SuppressWarnings("unchecked")
+ CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
+ (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
+ this.combineFn = specificCombiner;
+ }
+
+ @Override
+ public void addValue(AggInputT value) {
+ if (delegate == null) {
+ throw new IllegalStateException(
+ String.format(
+ "addValue cannot be called on Aggregator outside of the execution of a %s.",
+ DoFn.class.getSimpleName()));
+ } else {
+ delegate.addValue(value);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
+ return combineFn;
+ }
+
+ /**
+ * Sets the current delegate of the Aggregator.
+ *
+ * @param delegate the delegate to set in this aggregator
+ */
+ public void setDelegate(Aggregator<AggInputT, ?> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("name", name)
+ .add("combineFn", combineFn)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name, combineFn.getClass());
+ }
+
+ /**
+ * Indicates whether some other object is "equal to" this one.
+ *
+ * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
+ * CombineFns are the same class, and they have identical IDs.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (o == null) {
+ return false;
+ }
+ if (o instanceof DelegatingAggregator) {
+ DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
+ return Objects.equals(this.id, that.id)
+ && Objects.equals(this.name, that.name)
+ && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 8b3aaf8..0531cbb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -775,31 +775,31 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
}
/**
- * Returns an {@link Aggregator} with aggregation logic specified by the
- * {@link CombineFn} argument. The name provided must be unique across
- * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created
- * during pipeline construction.
+ * Returns an {@link Aggregator} with aggregation logic specified by the {@link CombineFn}
+ * argument. The name provided must be unique across {@link Aggregator}s created within the {@link
+ * DoFn}. Aggregators can only be created during pipeline construction.
*
* @param name the name of the aggregator
* @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this {@link DoFn}
+ * @return an aggregator for the provided name and combiner in the scope of this {@link DoFn}
* @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
+ * @throws IllegalArgumentException if the given name collides with another aggregator in this
+ * scope
* @throws IllegalStateException if called during pipeline execution.
*/
- public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+ public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
checkNotNull(name, "name cannot be null");
checkNotNull(combiner, "combiner cannot be null");
- checkArgument(!aggregators.containsKey(name),
+ checkArgument(
+ !aggregators.containsKey(name),
"Cannot create aggregator with name %s."
- + " An Aggregator with that name already exists within this scope.",
+ + " An Aggregator with that name already exists within this scope.",
name);
- checkState(!aggregatorsAreFinal,
+ checkState(
+ !aggregatorsAreFinal,
"Cannot create an aggregator during pipeline execution."
- + " Aggregators should be registered during pipeline construction.");
+ + " Aggregators should be registered during pipeline construction.");
DelegatingAggregator<AggInputT, AggOutputT> aggregator =
new DelegatingAggregator<>(name, combiner);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 72c2965..b269f47 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -21,14 +21,11 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import com.google.common.base.MoreObjects;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -505,100 +502,6 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
}
/**
- * An {@link Aggregator} that delegates calls to addValue to another
- * aggregator.
- *
- * @param <AggInputT> the type of input element
- * @param <AggOutputT> the type of output element
- */
- static class DelegatingAggregator<AggInputT, AggOutputT> implements
- Aggregator<AggInputT, AggOutputT>, Serializable {
- private final UUID id;
-
- private final String name;
-
- private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
-
- private Aggregator<AggInputT, ?> delegate;
-
- public DelegatingAggregator(String name,
- CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- this.id = UUID.randomUUID();
- this.name = checkNotNull(name, "name cannot be null");
- // Safe contravariant cast
- @SuppressWarnings("unchecked")
- CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
- (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
- this.combineFn = specificCombiner;
- }
-
- @Override
- public void addValue(AggInputT value) {
- if (delegate == null) {
- throw new IllegalStateException(
- "addValue cannot be called on Aggregator outside of the execution of a OldDoFn.");
- } else {
- delegate.addValue(value);
- }
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
- return combineFn;
- }
-
- /**
- * Sets the current delegate of the Aggregator.
- *
- * @param delegate the delegate to set in this aggregator
- */
- public void setDelegate(Aggregator<AggInputT, ?> delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("name", name)
- .add("combineFn", combineFn)
- .toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, name, combineFn.getClass());
- }
-
- /**
- * Indicates whether some other object is "equal to" this one.
- *
- * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
- * CombineFns are the same class, and they have identical IDs.
- */
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (o == null) {
- return false;
- }
- if (o instanceof DelegatingAggregator) {
- DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.name, that.name)
- && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
- }
- return false;
- }
- }
-
- /**
* A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
*/
private class AdaptedContext extends Context {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
index c072fd7..f51a6b0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -35,7 +34,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
- * Tests for {@link OldDoFn.DelegatingAggregator}.
+ * Tests for {@link DelegatingAggregator}.
*/
@RunWith(JUnit4.class)
public class DoFnDelegatingAggregatorTest {
@@ -63,7 +62,7 @@ public class DoFnDelegatingAggregatorTest {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("cannot be called");
- thrown.expectMessage("OldDoFn");
+ thrown.expectMessage("DoFn");
aggregator.addValue(21.2);
}