You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/01/03 21:28:01 UTC
[3/4] beam git commit: Remove .named from Combine
Remove .named from Combine
Introduces a NameOverride interface that allows some classes to define
custom behavior for getting the name. This is necessary for
parameterized CombineFns to expose details about their parameter values.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16b26673
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16b26673
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16b26673
Branch: refs/heads/master
Commit: 16b266738ed46966400bf0ad1807359a5f763419
Parents: e5a3f75
Author: bchambers <bc...@google.com>
Authored: Thu Dec 29 13:10:13 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Jan 3 13:15:45 2017 -0800
----------------------------------------------------------------------
.../core/UnboundedReadFromBoundedSource.java | 2 +-
.../beam/runners/dataflow/DataflowRunner.java | 4 +-
.../DataflowUnboundedReadFromBoundedSource.java | 9 +-
.../sdk/io/BoundedReadFromUnboundedSource.java | 2 +-
.../main/java/org/apache/beam/sdk/io/Read.java | 4 +-
.../org/apache/beam/sdk/transforms/Combine.java | 175 ++++++++++---------
.../org/apache/beam/sdk/transforms/Count.java | 4 +-
.../org/apache/beam/sdk/transforms/Max.java | 20 +--
.../org/apache/beam/sdk/transforms/Mean.java | 4 +-
.../org/apache/beam/sdk/transforms/Min.java | 20 +--
.../org/apache/beam/sdk/transforms/ParDo.java | 14 +-
.../org/apache/beam/sdk/transforms/Sum.java | 12 +-
.../org/apache/beam/sdk/transforms/Top.java | 27 +--
.../org/apache/beam/sdk/util/NameUtils.java | 40 ++++-
.../apache/beam/sdk/transforms/CombineTest.java | 23 +--
.../apache/beam/sdk/transforms/CountTest.java | 2 +-
.../org/apache/beam/sdk/transforms/MaxTest.java | 14 +-
.../apache/beam/sdk/transforms/MeanTest.java | 5 +-
.../org/apache/beam/sdk/transforms/MinTest.java | 15 +-
.../org/apache/beam/sdk/transforms/SumTest.java | 12 +-
.../org/apache/beam/sdk/transforms/TopTest.java | 13 +-
.../org/apache/beam/sdk/util/NameUtilsTest.java | 33 ++--
22 files changed, 239 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 645a411..3073076 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -100,7 +100,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
@Override
public String getKindString() {
- return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
+ return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 69c9c18..03e5dfc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2308,7 +2308,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public String getKindString() {
- return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
+ return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
}
static {
@@ -2784,7 +2784,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
? "streaming" : "batch";
String name =
transform == null
- ? NameUtils.approximateSimpleName(doFn.getClass())
+ ? NameUtils.approximateSimpleName(doFn)
: NameUtils.approximatePTransformName(transform.getClass());
throw new UnsupportedOperationException(
String.format("The DataflowRunner in %s mode does not support %s.", mode, name));
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index db87e21..a2ae799 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -105,14 +105,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
@Override
public String getKindString() {
- String sourceName;
- if (source.getClass().isAnonymousClass()) {
- sourceName = "AnonymousSource";
- } else {
- sourceName = NameUtils.approximateSimpleName(source.getClass());
- }
-
- return String.format("Read(%s)", sourceName);
+ return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index ac84c5e..8b63bfd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -117,7 +117,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
@Override
public String getKindString() {
- return "Read(" + NameUtils.approximateSimpleName(source.getClass()) + ")";
+ return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 7404cba..0e269a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -117,7 +117,7 @@ public class Read {
@Override
public String getKindString() {
- return "Read(" + NameUtils.approximateSimpleName(source.getClass()) + ")";
+ return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
}
@Override
@@ -184,7 +184,7 @@ public class Read {
@Override
public String getKindString() {
- return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
+ return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 3b07260..92c04ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -58,6 +58,8 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.NameUtils.NameOverride;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -473,56 +475,72 @@ public class Combine {
@Override
public <K> KeyedCombineFn<K, InputT, AccumT, OutputT> asKeyedFn() {
// The key, an object, is never even looked at.
- return new KeyedCombineFn<K, InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(K key) {
- return CombineFn.this.createAccumulator();
- }
+ return new KeyIgnoringCombineFn<>(this);
+ }
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input) {
- return CombineFn.this.addInput(accumulator, input);
- }
+ private static class KeyIgnoringCombineFn<K, InputT, AccumT, OutputT>
+ extends KeyedCombineFn<K, InputT, AccumT, OutputT>
+ implements NameOverride {
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
- return CombineFn.this.mergeAccumulators(accumulators);
- }
+ private final CombineFn<InputT, AccumT, OutputT> fn;
- @Override
- public OutputT extractOutput(K key, AccumT accumulator) {
- return CombineFn.this.extractOutput(accumulator);
- }
+ private KeyIgnoringCombineFn(CombineFn<InputT, AccumT, OutputT> fn) {
+ this.fn = fn;
+ }
- @Override
- public AccumT compact(K key, AccumT accumulator) {
- return CombineFn.this.compact(accumulator);
- }
+ @Override
+ public AccumT createAccumulator(K key) {
+ return fn.createAccumulator();
+ }
- @Override
- public Coder<AccumT> getAccumulatorCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return CombineFn.this.getAccumulatorCoder(registry, inputCoder);
- }
+ @Override
+ public AccumT addInput(K key, AccumT accumulator, InputT input) {
+ return fn.addInput(accumulator, input);
+ }
- @Override
- public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return CombineFn.this.getDefaultOutputCoder(registry, inputCoder);
- }
+ @Override
+ public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
+ return fn.mergeAccumulators(accumulators);
+ }
- @Override
- public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
- return CombineFn.this;
- }
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator) {
+ return fn.extractOutput(accumulator);
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(CombineFn.this);
- }
- };
+ @Override
+ public AccumT compact(K key, AccumT accumulator) {
+ return fn.compact(accumulator);
+ }
+
+ @Override
+ public Coder<AccumT> getAccumulatorCoder(
+ CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
+ throws CannotProvideCoderException {
+ return fn.getAccumulatorCoder(registry, inputCoder);
+ }
+
+ @Override
+ public Coder<OutputT> getDefaultOutputCoder(
+ CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
+ throws CannotProvideCoderException {
+ return fn.getDefaultOutputCoder(registry, inputCoder);
+ }
+
+ @Override
+ public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
+ return fn;
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.delegate(fn);
+ }
+
+ @Override
+ public String getNameOverride() {
+ return NameUtils.approximateSimpleName(fn);
+ }
}
}
@@ -1338,20 +1356,9 @@ public class Combine {
this.sideInputs = ImmutableList.of();
}
- private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
- DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
- super(name);
- this.fn = fn;
- this.fnDisplayData = fnDisplayData;
- this.insertDefault = insertDefault;
- this.fanout = fanout;
- this.sideInputs = ImmutableList.of();
- }
-
- private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
+ private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout,
List<PCollectionView<?>> sideInputs) {
- super(name);
this.fn = fn;
this.fnDisplayData = fnDisplayData;
this.insertDefault = insertDefault;
@@ -1359,12 +1366,9 @@ public class Combine {
this.sideInputs = sideInputs;
}
- /**
- * Return a new {@code Globally} transform that's like this transform but with the
- * specified name. Does not modify this transform.
- */
- public Globally<InputT, OutputT> named(String name) {
- return new Globally<>(name, fn, fnDisplayData, insertDefault, fanout);
+ @Override
+ protected String getKindString() {
+ return String.format("Combine.globally(%s)", NameUtils.approximateSimpleName(fn));
}
/**
@@ -1384,7 +1388,7 @@ public class Combine {
* is not globally windowed and the output is not being used as a side input.
*/
public Globally<InputT, OutputT> withoutDefaults() {
- return new Globally<>(name, fn, fnDisplayData, false, fanout);
+ return new Globally<>(fn, fnDisplayData, false, fanout);
}
/**
@@ -1395,7 +1399,7 @@ public class Combine {
* that will be used.
*/
public Globally<InputT, OutputT> withFanout(int fanout) {
- return new Globally<>(name, fn, fnDisplayData, insertDefault, fanout);
+ return new Globally<>(fn, fnDisplayData, insertDefault, fanout);
}
/**
@@ -1405,7 +1409,7 @@ public class Combine {
public Globally<InputT, OutputT> withSideInputs(
Iterable<? extends PCollectionView<?>> sideInputs) {
checkState(fn instanceof RequiresContextInternal);
- return new Globally<>(name, fn, fnDisplayData, insertDefault, fanout,
+ return new Globally<>(fn, fnDisplayData, insertDefault, fanout,
ImmutableList.copyOf(sideInputs));
}
@@ -1613,7 +1617,9 @@ public class Combine {
* {@link #perKey(SerializableFunction)}, and
* {@link #groupedValues(SerializableFunction)}.
*/
- public static class IterableCombineFn<V> extends CombineFn<V, List<V>, V> {
+ public static class IterableCombineFn<V>
+ extends CombineFn<V, List<V>, V>
+ implements NameOverride {
/**
* Returns a {@code CombineFn} that uses the given
* {@code SerializableFunction} to combine values.
@@ -1693,6 +1699,11 @@ public class Combine {
singleton.add(combiner.apply(values));
return singleton;
}
+
+ @Override
+ public String getNameOverride() {
+ return NameUtils.approximateSimpleName(combiner);
+ }
}
/**
@@ -1774,33 +1785,19 @@ public class Combine {
this.sideInputs = ImmutableList.of();
}
- private PerKey(String name,
+ private PerKey(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
boolean fewKeys, List<PCollectionView<?>> sideInputs) {
- super(name);
this.fn = fn;
this.fnDisplayData = fnDisplayData;
this.fewKeys = fewKeys;
this.sideInputs = sideInputs;
}
- private PerKey(
- String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
- super(name);
- this.fn = fn;
- this.fnDisplayData = fnDisplayData;
- this.fewKeys = fewKeys;
- this.sideInputs = ImmutableList.of();
- }
-
- /**
- * Return a new {@code Globally} transform that's like this transform but with the
- * specified name. Does not modify this transform.
- */
- public PerKey<K, InputT, OutputT> named(String name) {
- return new PerKey<>(name, fn, fnDisplayData, fewKeys);
+ @Override
+ protected String getKindString() {
+ return String.format("Combine.perKey(%s)", NameUtils.approximateSimpleName(fn));
}
/**
@@ -1810,7 +1807,7 @@ public class Combine {
public PerKey<K, InputT, OutputT> withSideInputs(
Iterable<? extends PCollectionView<?>> sideInputs) {
checkState(fn instanceof RequiresContextInternal);
- return new PerKey<>(name, fn, fnDisplayData, fewKeys,
+ return new PerKey<>(fn, fnDisplayData, fewKeys,
ImmutableList.copyOf(sideInputs));
}
@@ -1827,7 +1824,7 @@ public class Combine {
*/
public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(
SerializableFunction<? super K, Integer> hotKeyFanout) {
- return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData, hotKeyFanout);
+ return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData, hotKeyFanout);
}
/**
@@ -1835,7 +1832,7 @@ public class Combine {
* constant value for every key.
*/
public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(final int hotKeyFanout) {
- return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData,
+ return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData,
new SimpleFunction<K, Integer>() {
@Override
public void populateDisplayData(Builder builder) {
@@ -1890,17 +1887,21 @@ public class Combine {
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final SerializableFunction<? super K, Integer> hotKeyFanout;
- private PerKeyWithHotKeyFanout(String name,
+ private PerKeyWithHotKeyFanout(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
SerializableFunction<? super K, Integer> hotKeyFanout) {
- super(name);
this.fn = fn;
this.fnDisplayData = fnDisplayData;
this.hotKeyFanout = hotKeyFanout;
}
@Override
+ protected String getKindString() {
+ return String.format("Combine.perKeyWithFanout(%s)", NameUtils.approximateSimpleName(fn));
+ }
+
+ @Override
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
return applyHelper(input);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 9101996..d164978 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -51,7 +51,7 @@ public class Count {
* its input {@link PCollection}.
*/
public static <T> Combine.Globally<T, Long> globally() {
- return Combine.globally(new CountFn<T>()).named("Count.Globally");
+ return Combine.globally(new CountFn<T>());
}
/**
@@ -59,7 +59,7 @@ public class Count {
* associated with each key of its input {@link PCollection}.
*/
public static <K, V> Combine.PerKey<K, V, Long> perKey() {
- return Combine.<K, V, Long>perKey(new CountFn<V>()).named("Count.PerKey");
+ return Combine.<K, V, Long>perKey(new CountFn<V>());
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index c44d9b6..0990ca4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -52,7 +52,7 @@ public class Max {
* elements, or {@code Integer.MIN_VALUE} if there are no elements.
*/
public static Combine.Globally<Integer, Integer> integersGlobally() {
- return Combine.globally(new MaxIntegerFn()).named("Max.Globally");
+ return Combine.globally(new MaxIntegerFn());
}
/**
@@ -64,7 +64,7 @@ public class Max {
* <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
*/
public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
- return Combine.<K, Integer, Integer>perKey(new MaxIntegerFn()).named("Max.PerKey");
+ return Combine.<K, Integer, Integer>perKey(new MaxIntegerFn());
}
/**
@@ -73,7 +73,7 @@ public class Max {
* or {@code Long.MIN_VALUE} if there are no elements.
*/
public static Combine.Globally<Long, Long> longsGlobally() {
- return Combine.globally(new MaxLongFn()).named("Max.Globally");
+ return Combine.globally(new MaxLongFn());
}
/**
@@ -85,7 +85,7 @@ public class Max {
* <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
*/
public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
- return Combine.<K, Long, Long>perKey(new MaxLongFn()).named("Max.PerKey");
+ return Combine.<K, Long, Long>perKey(new MaxLongFn());
}
/**
@@ -94,7 +94,7 @@ public class Max {
* elements, or {@code Double.NEGATIVE_INFINITY} if there are no elements.
*/
public static Combine.Globally<Double, Double> doublesGlobally() {
- return Combine.globally(new MaxDoubleFn()).named("Max.Globally");
+ return Combine.globally(new MaxDoubleFn());
}
/**
@@ -106,7 +106,7 @@ public class Max {
* <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
*/
public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
- return Combine.<K, Double, Double>perKey(new MaxDoubleFn()).named("Max.PerKey");
+ return Combine.<K, Double, Double>perKey(new MaxDoubleFn());
}
/**
@@ -116,7 +116,7 @@ public class Max {
*/
public static <T extends Comparable<? super T>>
Combine.Globally<T, T> globally() {
- return Combine.<T, T>globally(MaxFn.<T>naturalOrder()).named("Max.Globally");
+ return Combine.<T, T>globally(MaxFn.<T>naturalOrder());
}
/**
@@ -129,7 +129,7 @@ public class Max {
*/
public static <K, T extends Comparable<? super T>>
Combine.PerKey<K, T, T> perKey() {
- return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder()).named("Max.PerKey");
+ return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder());
}
/**
@@ -139,7 +139,7 @@ public class Max {
*/
public static <T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.Globally<T, T> globally(ComparatorT comparator) {
- return Combine.<T, T>globally(MaxFn.of(comparator)).named("Max.Globally");
+ return Combine.<T, T>globally(MaxFn.of(comparator));
}
/**
@@ -151,7 +151,7 @@ public class Max {
*/
public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
- return Combine.<K, T, T>perKey(MaxFn.of(comparator)).named("Max.PerKey");
+ return Combine.<K, T, T>perKey(MaxFn.of(comparator));
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 9eea3a0..cb77ba3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -64,7 +64,7 @@ public class Mean {
* @param <NumT> the type of the {@code Number}s being combined
*/
public static <NumT extends Number> Combine.Globally<NumT, Double> globally() {
- return Combine.<NumT, Double>globally(new MeanFn<>()).named("Mean.Globally");
+ return Combine.<NumT, Double>globally(new MeanFn<>());
}
/**
@@ -81,7 +81,7 @@ public class Mean {
* @param <NumT> the type of the {@code Number}s being combined
*/
public static <K, NumT extends Number> Combine.PerKey<K, NumT, Double> perKey() {
- return Combine.<K, NumT, Double>perKey(new MeanFn<>()).named("Mean.PerKey");
+ return Combine.<K, NumT, Double>perKey(new MeanFn<>());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index f046779..5003594 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -52,7 +52,7 @@ public class Min {
* {@code PCollection}'s elements, or {@code Integer.MAX_VALUE} if there are no elements.
*/
public static Combine.Globally<Integer, Integer> integersGlobally() {
- return Combine.globally(new MinIntegerFn()).named("Min.Globally");
+ return Combine.globally(new MinIntegerFn());
}
/**
@@ -64,7 +64,7 @@ public class Min {
* <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
*/
public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
- return Combine.<K, Integer, Integer>perKey(new MinIntegerFn()).named("Min.PerKey");
+ return Combine.<K, Integer, Integer>perKey(new MinIntegerFn());
}
/**
@@ -73,7 +73,7 @@ public class Min {
* or {@code Long.MAX_VALUE} if there are no elements.
*/
public static Combine.Globally<Long, Long> longsGlobally() {
- return Combine.globally(new MinLongFn()).named("Min.Globally");
+ return Combine.globally(new MinLongFn());
}
/**
@@ -85,7 +85,7 @@ public class Min {
* <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
*/
public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
- return Combine.<K, Long, Long>perKey(new MinLongFn()).named("Min.PerKey");
+ return Combine.<K, Long, Long>perKey(new MinLongFn());
}
/**
@@ -94,7 +94,7 @@ public class Min {
* elements, or {@code Double.POSITIVE_INFINITY} if there are no elements.
*/
public static Combine.Globally<Double, Double> doublesGlobally() {
- return Combine.globally(new MinDoubleFn()).named("Min.Globally");
+ return Combine.globally(new MinDoubleFn());
}
/**
@@ -106,7 +106,7 @@ public class Min {
* <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
*/
public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
- return Combine.<K, Double, Double>perKey(new MinDoubleFn()).named("Min.PerKey");
+ return Combine.<K, Double, Double>perKey(new MinDoubleFn());
}
/**
@@ -116,7 +116,7 @@ public class Min {
*/
public static <T extends Comparable<? super T>>
Combine.Globally<T, T> globally() {
- return Combine.<T, T>globally(MinFn.<T>naturalOrder()).named("Min.Globally");
+ return Combine.<T, T>globally(MinFn.<T>naturalOrder());
}
/**
@@ -129,7 +129,7 @@ public class Min {
*/
public static <K, T extends Comparable<? super T>>
Combine.PerKey<K, T, T> perKey() {
- return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder()).named("Min.PerKey");
+ return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder());
}
/**
@@ -139,7 +139,7 @@ public class Min {
*/
public static <T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.Globally<T, T> globally(ComparatorT comparator) {
- return Combine.<T, T>globally(MinFn.of(comparator)).named("Min.Globally");
+ return Combine.<T, T>globally(MinFn.of(comparator));
}
/**
@@ -151,7 +151,7 @@ public class Min {
*/
public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
- return Combine.<K, T, T>perKey(MinFn.of(comparator)).named("Min.PerKey");
+ return Combine.<K, T, T>perKey(MinFn.of(comparator));
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 7e54a54..5b4fa19 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -762,12 +762,7 @@ public class ParDo {
@Override
protected String getKindString() {
- Class<?> clazz = getFn().getClass();
- if (clazz.isAnonymousClass()) {
- return "AnonymousParDo";
- } else {
- return String.format("ParDo(%s)", NameUtils.approximateSimpleName(clazz));
- }
+ return String.format("ParDo(%s)", NameUtils.approximateSimpleName(getFn()));
}
/**
@@ -976,12 +971,7 @@ public class ParDo {
@Override
protected String getKindString() {
- Class<?> clazz = getFn().getClass();
- if (clazz.isAnonymousClass()) {
- return "AnonymousParMultiDo";
- } else {
- return String.format("ParMultiDo(%s)", NameUtils.approximateSimpleName(clazz));
- }
+ return String.format("ParMultiDo(%s)", NameUtils.approximateSimpleName(getFn()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 27c5ced..48eafc3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -50,7 +50,7 @@ public class Sum {
* {@code 0} if there are no elements.
*/
public static Combine.Globally<Integer, Integer> integersGlobally() {
- return Combine.globally(new SumIntegerFn()).named("Sum.Globally");
+ return Combine.globally(new SumIntegerFn());
}
/**
@@ -62,7 +62,7 @@ public class Sum {
* that key in the input {@code PCollection}.
*/
public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
- return Combine.<K, Integer, Integer>perKey(new SumIntegerFn()).named("Sum.PerKey");
+ return Combine.<K, Integer, Integer>perKey(new SumIntegerFn());
}
/**
@@ -73,7 +73,7 @@ public class Sum {
* {@code 0} if there are no elements.
*/
public static Combine.Globally<Long, Long> longsGlobally() {
- return Combine.globally(new SumLongFn()).named("Sum.Globally");
+ return Combine.globally(new SumLongFn());
}
/**
@@ -85,7 +85,7 @@ public class Sum {
* that key in the input {@code PCollection}.
*/
public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
- return Combine.<K, Long, Long>perKey(new SumLongFn()).named("Sum.PerKey");
+ return Combine.<K, Long, Long>perKey(new SumLongFn());
}
/**
@@ -96,7 +96,7 @@ public class Sum {
* {@code 0} if there are no elements.
*/
public static Combine.Globally<Double, Double> doublesGlobally() {
- return Combine.globally(new SumDoubleFn()).named("Sum.Globally");
+ return Combine.globally(new SumDoubleFn());
}
/**
@@ -108,7 +108,7 @@ public class Sum {
* that key in the input {@code PCollection}.
*/
public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
- return Combine.<K, Double, Double>perKey(new SumDoubleFn()).named("Sum.PerKey");
+ return Combine.<K, Double, Double>perKey(new SumDoubleFn());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 992a341..47be9b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -39,6 +39,8 @@ import org.apache.beam.sdk.transforms.Combine.PerKey;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.NameUtils.NameOverride;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -97,7 +99,7 @@ public class Top {
*/
public static <T, ComparatorT extends Comparator<T> & Serializable>
Combine.Globally<T, List<T>> of(int count, ComparatorT compareFn) {
- return Combine.globally(new TopCombineFn<>(count, compareFn)).named("Top.Globally");
+ return Combine.globally(new TopCombineFn<>(count, compareFn));
}
/**
@@ -141,8 +143,7 @@ public class Top {
* {@code KV}s and return the top values associated with each key.
*/
public static <T extends Comparable<T>> Combine.Globally<T, List<T>> smallest(int count) {
- return Combine.globally(new TopCombineFn<>(count, new Smallest<T>()))
- .named("Smallest.Globally");
+ return Combine.globally(new TopCombineFn<>(count, new Smallest<T>()));
}
/**
@@ -186,7 +187,7 @@ public class Top {
* {@code KV}s and return the top values associated with each key.
*/
public static <T extends Comparable<T>> Combine.Globally<T, List<T>> largest(int count) {
- return Combine.globally(new TopCombineFn<>(count, new Largest<T>())).named("Largest.Globally");
+ return Combine.globally(new TopCombineFn<>(count, new Largest<T>()));
}
/**
@@ -233,8 +234,7 @@ public class Top {
public static <K, V, ComparatorT extends Comparator<V> & Serializable>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
perKey(int count, ComparatorT compareFn) {
- return Combine.perKey(
- new TopCombineFn<>(count, compareFn).<K>asKeyedFn()).named("Top.PerKey");
+ return Combine.perKey(new TopCombineFn<>(count, compareFn).<K>asKeyedFn());
}
/**
@@ -280,8 +280,7 @@ public class Top {
public static <K, V extends Comparable<V>>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
smallestPerKey(int count) {
- return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()).<K>asKeyedFn())
- .named("Smallest.PerKey");
+ return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()).<K>asKeyedFn());
}
/**
@@ -327,9 +326,7 @@ public class Top {
public static <K, V extends Comparable<V>>
PerKey<K, V, List<V>>
largestPerKey(int count) {
- return Combine.perKey(
-new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
- .named("Largest.PerKey");
+ return Combine.perKey(new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn());
}
/**
@@ -368,7 +365,8 @@ new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
* @param <T> type of element being compared
*/
public static class TopCombineFn<T, ComparatorT extends Comparator<T> & Serializable>
- extends AccumulatingCombineFn<T, BoundedHeap<T, ComparatorT>, List<T>> {
+ extends AccumulatingCombineFn<T, BoundedHeap<T, ComparatorT>, List<T>>
+ implements NameOverride {
private final int count;
private final ComparatorT compareFn;
@@ -380,6 +378,11 @@ new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
}
@Override
+ public String getNameOverride() {
+ return String.format("Top(%s)", NameUtils.approximateSimpleName(compareFn));
+ }
+
+ @Override
public BoundedHeap<T, ComparatorT> createAccumulator() {
return new BoundedHeap<>(count, compareFn, new ArrayList<T>());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
index 60a0e41..1c59af7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
@@ -29,12 +29,18 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
/**
- * Helpers for extracting the name of objects (most commonly {@link DoFn} and {@link CombineFn}).
+ * Helpers for extracting the name of objects and classes.
*/
public class NameUtils {
+ /** Classes may implement this interface to change how names are generated for their instances. */
+ public interface NameOverride {
+ /** Return the name to use for this instance. */
+ String getNameOverride();
+ }
+
private static final String[] STANDARD_NAME_SUFFIXES =
- new String[]{"OldDoFn", "DoFn", "Fn"};
+ new String[]{"OldDoFn", "DoFn", "CombineFn", "Fn"};
/**
* Pattern to match a non-anonymous inner class.
@@ -87,7 +93,16 @@ public class NameUtils {
}
/**
- * Returns a simple name for a class.
+ * As {@link #approximateSimpleName(Object, String)} but returning {@code "Anonymous"} when
+ * {@code object} is an instance of anonymous class.
+ */
+ public static String approximateSimpleName(Object object) {
+ return approximateSimpleName(object, "Anonymous");
+ }
+
+ /**
+ * Returns a simple name describing a class that is being used as a function (eg., a {@link DoFn}
+ * or {@link CombineFn}, etc.).
*
* <p>Note: this is non-invertible - the name may be simplified to an
* extent that it cannot be mapped back to the original class.
@@ -96,15 +111,28 @@ public class NameUtils {
* removes the package and outer classes from the name,
* and removes common suffixes.
*
+ * <p>If the object is an instanceof {@link NameOverride}, the result of
+ * {@link NameOverride#getNameOverride()} is returned. This allows classes that act as wrappers to
+ * override the handling of names by delegating to the objects they wrap.
+ *
+ * <p>If the class is anonymous, the string {@code anonymousValue} is returned.
+ *
* <p>Examples:
* <ul>
* <li>{@code some.package.Word.SummaryDoFn} becomes "Summary"
* <li>{@code another.package.PairingFn} becomes "Pairing"
* </ul>
- *
- * @throws IllegalArgumentException if the class is anonymous
*/
- public static String approximateSimpleName(Class<?> clazz) {
+ public static String approximateSimpleName(Object object, String anonymousValue) {
+ if (object instanceof NameOverride) {
+ return ((NameOverride) object).getNameOverride();
+ }
+
+ Class<?> clazz = object.getClass();
+ if (clazz.isAnonymousClass()) {
+ return anonymousValue;
+ }
+
return approximateSimpleName(clazz, /* dropOuterClassNames */ true);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index f783928..fef47fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -24,6 +24,7 @@ import static org.apache.beam.sdk.TestUtils.checkCombineFn;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -470,8 +471,7 @@ public class CombineTest implements Serializable {
runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.<KV<String, Double>>emptyList());
}
- // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine),
- // provide their own top-level name.
+ // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine) have good names.
@Test
public void testCombinerNames() {
Combine.PerKey<String, Integer, Integer> min = Min.integersPerKey();
@@ -479,10 +479,10 @@ public class CombineTest implements Serializable {
Combine.PerKey<String, Integer, Double> mean = Mean.perKey();
Combine.PerKey<String, Integer, Integer> sum = Sum.integersPerKey();
- assertThat(min.getName(), Matchers.startsWith("Min"));
- assertThat(max.getName(), Matchers.startsWith("Max"));
- assertThat(mean.getName(), Matchers.startsWith("Mean"));
- assertThat(sum.getName(), Matchers.startsWith("Sum"));
+ assertThat(min.getName(), equalTo("Combine.perKey(MinInteger)"));
+ assertThat(max.getName(), equalTo("Combine.perKey(MaxInteger)"));
+ assertThat(mean.getName(), equalTo("Combine.perKey(Mean)"));
+ assertThat(sum.getName(), equalTo("Combine.perKey(SumInteger)"));
}
private static final SerializableFunction<String, Integer> hotKeyFanout =
@@ -635,18 +635,13 @@ public class CombineTest implements Serializable {
@Test
public void testCombineGetName() {
- assertEquals("Combine.Globally", Combine.globally(new SumInts()).getName());
- assertEquals(
- "MyCombineGlobally", Combine.globally(new SumInts()).named("MyCombineGlobally").getName());
+ assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName());
assertEquals(
"Combine.GloballyAsSingletonView",
Combine.globally(new SumInts()).asSingletonView().getName());
- assertEquals("Combine.PerKey", Combine.perKey(new TestKeyedCombineFn()).getName());
- assertEquals(
- "MyCombinePerKey",
- Combine.perKey(new TestKeyedCombineFn()).named("MyCombinePerKey").getName());
+ assertEquals("Combine.perKey(TestKeyed)", Combine.perKey(new TestKeyedCombineFn()).getName());
assertEquals(
- "Combine.PerKeyWithHotKeyFanout",
+ "Combine.perKeyWithFanout(TestKeyed)",
Combine.perKey(new TestKeyedCombineFn()).withHotKeyFanout(10).getName());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
index eafb12d..dca0542 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
@@ -110,6 +110,6 @@ public class CountTest {
@Test
public void testCountGetName() {
assertEquals("Count.PerElement", Count.perElement().getName());
- assertEquals("Count.Globally", Count.globally().getName());
+ assertEquals("Combine.globally(Count)", Count.globally().getName());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
index 5c78b3f..4aa39a3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
@@ -34,13 +34,13 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class MaxTest {
@Test
- public void testMeanGetNames() {
- assertEquals("Max.Globally", Max.integersGlobally().getName());
- assertEquals("Max.Globally", Max.doublesGlobally().getName());
- assertEquals("Max.Globally", Max.longsGlobally().getName());
- assertEquals("Max.PerKey", Max.integersPerKey().getName());
- assertEquals("Max.PerKey", Max.doublesPerKey().getName());
- assertEquals("Max.PerKey", Max.longsPerKey().getName());
+ public void testMaxGetNames() {
+ assertEquals("Combine.globally(MaxInteger)", Max.integersGlobally().getName());
+ assertEquals("Combine.globally(MaxDouble)", Max.doublesGlobally().getName());
+ assertEquals("Combine.globally(MaxLong)", Max.longsGlobally().getName());
+ assertEquals("Combine.perKey(MaxInteger)", Max.integersPerKey().getName());
+ assertEquals("Combine.perKey(MaxDouble)", Max.doublesPerKey().getName());
+ assertEquals("Combine.perKey(MaxLong)", Max.longsPerKey().getName());
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
index 1c94e35..84741ee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
@@ -36,10 +36,11 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class MeanTest {
+
@Test
public void testMeanGetNames() {
- assertEquals("Mean.Globally", Mean.globally().getName());
- assertEquals("Mean.PerKey", Mean.perKey().getName());
+ assertEquals("Combine.globally(Mean)", Mean.globally().getName());
+ assertEquals("Combine.perKey(Mean)", Mean.perKey().getName());
}
private static final Coder<CountSum<Number>> TEST_CODER = new CountSumCoder<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
index a0eca07..4334ed9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
@@ -35,15 +35,14 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class MinTest {
@Test
- public void testMeanGetNames() {
- assertEquals("Min.Globally", Min.integersGlobally().getName());
- assertEquals("Min.Globally", Min.doublesGlobally().getName());
- assertEquals("Min.Globally", Min.longsGlobally().getName());
- assertEquals("Min.PerKey", Min.integersPerKey().getName());
- assertEquals("Min.PerKey", Min.doublesPerKey().getName());
- assertEquals("Min.PerKey", Min.longsPerKey().getName());
+ public void testMinGetNames() {
+ assertEquals("Combine.globally(MinInteger)", Min.integersGlobally().getName());
+ assertEquals("Combine.globally(MinDouble)", Min.doublesGlobally().getName());
+ assertEquals("Combine.globally(MinLong)", Min.longsGlobally().getName());
+ assertEquals("Combine.perKey(MinInteger)", Min.integersPerKey().getName());
+ assertEquals("Combine.perKey(MinDouble)", Min.doublesPerKey().getName());
+ assertEquals("Combine.perKey(MinLong)", Min.longsPerKey().getName());
}
-
@Test
public void testMinIntegerFn() {
checkCombineFn(
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
index b4f723d..04c0186 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
@@ -41,12 +41,12 @@ public class SumTest {
@Test
public void testSumGetNames() {
- assertEquals("Sum.Globally", Sum.integersGlobally().getName());
- assertEquals("Sum.Globally", Sum.doublesGlobally().getName());
- assertEquals("Sum.Globally", Sum.longsGlobally().getName());
- assertEquals("Sum.PerKey", Sum.integersPerKey().getName());
- assertEquals("Sum.PerKey", Sum.doublesPerKey().getName());
- assertEquals("Sum.PerKey", Sum.longsPerKey().getName());
+ assertEquals("Combine.globally(SumInteger)", Sum.integersGlobally().getName());
+ assertEquals("Combine.globally(SumDouble)", Sum.doublesGlobally().getName());
+ assertEquals("Combine.globally(SumLong)", Sum.longsGlobally().getName());
+ assertEquals("Combine.perKey(SumInteger)", Sum.integersPerKey().getName());
+ assertEquals("Combine.perKey(SumDouble)", Sum.doublesPerKey().getName());
+ assertEquals("Combine.perKey(SumLong)", Sum.longsPerKey().getName());
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
index d011197..89e0076 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
@@ -234,12 +234,13 @@ public class TopTest {
@Test
public void testTopGetNames() {
- assertEquals("Top.Globally", Top.of(1, new OrderByLength()).getName());
- assertEquals("Smallest.Globally", Top.smallest(1).getName());
- assertEquals("Largest.Globally", Top.largest(2).getName());
- assertEquals("Top.PerKey", Top.perKey(1, new IntegerComparator()).getName());
- assertEquals("Smallest.PerKey", Top.<String, Integer>smallestPerKey(1).getName());
- assertEquals("Largest.PerKey", Top.<String, Integer>largestPerKey(2).getName());
+ assertEquals("Combine.globally(Top(OrderByLength))", Top.of(1, new OrderByLength()).getName());
+ assertEquals("Combine.globally(Top(Smallest))", Top.smallest(1).getName());
+ assertEquals("Combine.globally(Top(Largest))", Top.largest(2).getName());
+ assertEquals("Combine.perKey(Top(IntegerComparator))",
+ Top.perKey(1, new IntegerComparator()).getName());
+ assertEquals("Combine.perKey(Top(Smallest))", Top.<String, Integer>smallestPerKey(1).getName());
+ assertEquals("Combine.perKey(Top(Largest))", Top.<String, Integer>largestPerKey(2).getName());
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index b35e942..b81aa36 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.NameUtils.NameOverride;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PDone;
import org.junit.Rule;
@@ -111,16 +112,12 @@ public class NameUtilsTest {
@Test
public void testSimpleName() {
- assertEquals("Embedded", NameUtils.approximateSimpleName(EmbeddedOldDoFn.class));
+ assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedOldDoFn()));
}
@Test
public void testAnonSimpleName() throws Exception {
- thrown.expect(IllegalArgumentException.class);
-
- EmbeddedOldDoFn anon = new EmbeddedOldDoFn(){};
-
- NameUtils.approximateSimpleName(anon.getClass());
+ assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedOldDoFn() {}));
}
@Test
@@ -128,7 +125,7 @@ public class NameUtilsTest {
EmbeddedOldDoFn fn = new EmbeddedOldDoFn();
EmbeddedOldDoFn inner = fn.getEmbedded();
- assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner.getClass()));
+ assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner));
}
@Test
@@ -160,9 +157,25 @@ public class NameUtilsTest {
};
assertEquals("NamedInnerClass",
- NameUtils.approximateSimpleName(anonymousClassObj.getInnerClassInstance().getClass()));
+ NameUtils.approximateSimpleName(anonymousClassObj.getInnerClassInstance()));
assertEquals("NameUtilsTest.NamedInnerClass",
- NameUtils.approximatePTransformName(
- anonymousClassObj.getInnerClassInstance().getClass()));
+ NameUtils.approximatePTransformName(anonymousClassObj.getInnerClassInstance().getClass()));
+ }
+
+ @Test
+ public void testApproximateSimpleNameOverride() {
+ Object overriddenName = new NameOverride() {
+ @Override
+ public String getNameOverride() {
+ return "CUSTOM_NAME";
+ }
+ };
+ assertEquals("CUSTOM_NAME", NameUtils.approximateSimpleName(overriddenName));
+ }
+
+ @Test
+ public void testApproximateSimpleNameCustomAnonymous() {
+ Object overriddenName = new Object() {};
+ assertEquals("CUSTOM_NAME", NameUtils.approximateSimpleName(overriddenName, "CUSTOM_NAME"));
}
}