You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/15 23:54:53 UTC
[2/3] incubator-beam git commit: Add DisplayData for combine
transforms
Add DisplayData for combine transforms
If more than one combineFn have the same namespace, add a sequential suffix.
This is necessary because each namespace/key pair must be unique within
the transform.
Add a `JavaClass` wrapper around a name/simple-name for a class. This is
necessary in cases where the class may be serialized to support
accessing `DisplayData` since `Class` is not serializable in some cases.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0baa4c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0baa4c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0baa4c9
Branch: refs/heads/master
Commit: b0baa4c9d66750b1cbdbb0dc7f02e62385436bc2
Parents: d440d94
Author: Scott Wegner <sw...@google.com>
Authored: Mon Apr 11 09:08:23 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Fri Apr 15 14:26:51 2016 -0700
----------------------------------------------------------------------
.../sdk/transforms/ApproximateQuantiles.java | 8 +
.../beam/sdk/transforms/ApproximateUnique.java | 44 ++++++
.../org/apache/beam/sdk/transforms/Combine.java | 155 +++++++++++++++----
.../beam/sdk/transforms/CombineFnBase.java | 27 +++-
.../apache/beam/sdk/transforms/CombineFns.java | 65 ++++++++
.../beam/sdk/transforms/CombineWithContext.java | 6 +
.../org/apache/beam/sdk/transforms/Max.java | 6 +
.../org/apache/beam/sdk/transforms/Min.java | 6 +
.../org/apache/beam/sdk/transforms/Sample.java | 14 ++
.../org/apache/beam/sdk/transforms/Top.java | 8 +
.../sdk/transforms/display/ClassForDisplay.java | 93 +++++++++++
.../sdk/transforms/display/DisplayData.java | 111 +++++++++++--
.../org/apache/beam/sdk/util/CombineFnUtil.java | 13 ++
.../transforms/ApproximateQuantilesTest.java | 13 ++
.../sdk/transforms/ApproximateUniqueTest.java | 17 ++
.../beam/sdk/transforms/CombineFnsTest.java | 69 ++++++++-
.../apache/beam/sdk/transforms/CombineTest.java | 22 ++-
.../org/apache/beam/sdk/transforms/MaxTest.java | 13 +-
.../org/apache/beam/sdk/transforms/MinTest.java | 13 +-
.../apache/beam/sdk/transforms/SampleTest.java | 14 ++
.../org/apache/beam/sdk/transforms/TopTest.java | 13 ++
.../transforms/display/ClassForDisplayTest.java | 66 ++++++++
.../transforms/display/DisplayDataMatchers.java | 51 +++---
.../sdk/transforms/display/DisplayDataTest.java | 18 ++-
.../display/ClassForDisplayJava8Test.java | 46 ++++++
.../beam/sdk/transforms/CombineJava8Test.java | 42 +++++
26 files changed, 878 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 2ed7a85..c58c736 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.WeightedValue;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
@@ -359,6 +360,13 @@ public class ApproximateQuantiles {
CoderRegistry registry, Coder<T> elementCoder) {
return new QuantileStateCoder<>(compareFn, elementCoder);
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder
+ .add("numQuantiles", numQuantiles)
+ .add("comparer", compareFn.getClass());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 4f9dfc4..175897b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -31,6 +32,8 @@ import com.google.common.hash.Hashing;
import com.google.common.hash.HashingOutputStream;
import com.google.common.io.ByteStreams;
+import org.apache.avro.reflect.Nullable;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
@@ -168,6 +171,12 @@ public class ApproximateUnique {
private final long sampleSize;
/**
+ * The desired maximum estimation error or null if not specified.
+ */
+ @Nullable
+ private final Double maximumEstimationError;
+
+ /**
* @see ApproximateUnique#globally(int)
*/
public Globally(int sampleSize) {
@@ -178,7 +187,9 @@ public class ApproximateUnique {
+ "In general, the estimation "
+ "error is about 2 / sqrt(sampleSize).");
}
+
this.sampleSize = sampleSize;
+ this.maximumEstimationError = null;
}
/**
@@ -190,7 +201,9 @@ public class ApproximateUnique {
"ApproximateUnique needs an "
+ "estimation error between 1% (0.01) and 50% (0.5).");
}
+
this.sampleSize = sampleSizeFromEstimationError(maximumEstimationError);
+ this.maximumEstimationError = maximumEstimationError;
}
@Override
@@ -200,6 +213,11 @@ public class ApproximateUnique {
Combine.globally(
new ApproximateUniqueCombineFn<>(sampleSize, coder)));
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ ApproximateUnique.populateDisplayData(builder, sampleSize, maximumEstimationError);
+ }
}
/**
@@ -213,9 +231,19 @@ public class ApproximateUnique {
static class PerKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>> {
+ /**
+ * The number of entries in the statistical sample; the higher this number,
+ * the more accurate the estimate will be.
+ */
private final long sampleSize;
/**
+ * The the desired maximum estimation error or null if not specified.
+ */
+ @Nullable
+ private final Double maximumEstimationError;
+
+ /**
* @see ApproximateUnique#perKey(int)
*/
public PerKey(int sampleSize) {
@@ -225,7 +253,9 @@ public class ApproximateUnique {
+ "sampleSize >= 16 for an estimation error <= 50%. In general, "
+ "the estimation error is about 2 / sqrt(sampleSize).");
}
+
this.sampleSize = sampleSize;
+ this.maximumEstimationError = null;
}
/**
@@ -237,7 +267,9 @@ public class ApproximateUnique {
"ApproximateUnique.PerKey needs an "
+ "estimation error between 1% (0.01) and 50% (0.5).");
}
+
this.sampleSize = sampleSizeFromEstimationError(estimationError);
+ this.maximumEstimationError = estimationError;
}
@Override
@@ -254,6 +286,11 @@ public class ApproximateUnique {
Combine.perKey(new ApproximateUniqueCombineFn<>(
sampleSize, coder).<K>asKeyedFn()));
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ ApproximateUnique.populateDisplayData(builder, sampleSize, maximumEstimationError);
+ }
}
@@ -418,4 +455,11 @@ public class ApproximateUnique {
static long sampleSizeFromEstimationError(double estimationError) {
return Math.round(Math.ceil(4.0 / Math.pow(estimationError, 2.0)));
}
+
+ private static void populateDisplayData(
+ DisplayData.Builder builder, long sampleSize, Double maxEstimationError) {
+ builder
+ .add("sampleSize", sampleSize)
+ .addIfNotNull("maximumEstimationError", maxEstimationError);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 28bbeed..3566fa5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -37,6 +36,9 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
+import org.apache.beam.sdk.transforms.display.ClassForDisplay;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -101,9 +103,10 @@ public class Combine {
*/
public static <V> Globally<V, V> globally(
SerializableFunction<Iterable<V>, V> combiner) {
- return globally(IterableCombineFn.of(combiner));
+ return globally(IterableCombineFn.of(combiner), ClassForDisplay.fromInstance(combiner));
}
+
/**
* Returns a {@link Globally Combine.Globally} {@code PTransform}
* that uses the given {@code GloballyCombineFn} to combine all
@@ -121,7 +124,12 @@ public class Combine {
*/
public static <InputT, OutputT> Globally<InputT, OutputT> globally(
GlobalCombineFn<? super InputT, ?, OutputT> fn) {
- return new Globally<>(fn, true, 0);
+ return globally(fn, ClassForDisplay.fromInstance(fn));
+ }
+
+ private static <InputT, OutputT> Globally<InputT, OutputT> globally(
+ GlobalCombineFn<? super InputT, ?, OutputT> fn, ClassForDisplay fnClass) {
+ return new Globally<>(fn, fnClass, true, 0);
}
/**
@@ -142,7 +150,7 @@ public class Combine {
*/
public static <K, V> PerKey<K, V, V> perKey(
SerializableFunction<Iterable<V>, V> fn) {
- return perKey(Combine.IterableCombineFn.of(fn));
+ return perKey(IterableCombineFn.of(fn).<K>asKeyedFn(), ClassForDisplay.fromInstance(fn));
}
/**
@@ -163,7 +171,7 @@ public class Combine {
*/
public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
GlobalCombineFn<? super InputT, ?, OutputT> fn) {
- return perKey(fn.<K>asKeyedFn());
+ return perKey(fn.<K>asKeyedFn(), ClassForDisplay.fromInstance(fn));
}
/**
@@ -184,7 +192,12 @@ public class Combine {
*/
public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
- return new PerKey<>(fn, false /*fewKeys*/);
+ return perKey(fn, ClassForDisplay.fromInstance(fn));
+ }
+
+ private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
+ PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass) {
+ return new PerKey<>(fn, fnClass, false /*fewKeys*/);
}
/**
@@ -192,8 +205,8 @@ public class Combine {
* in {@link GroupByKey}.
*/
private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
- return new PerKey<>(fn, true /*fewKeys*/);
+ PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass) {
+ return new PerKey<>(fn, fnClass, true /*fewKeys*/);
}
/**
@@ -219,7 +232,7 @@ public class Combine {
*/
public static <K, V> GroupedValues<K, V, V> groupedValues(
SerializableFunction<Iterable<V>, V> fn) {
- return groupedValues(IterableCombineFn.of(fn));
+ return groupedValues(IterableCombineFn.of(fn).<K>asKeyedFn(), ClassForDisplay.fromInstance(fn));
}
/**
@@ -245,7 +258,7 @@ public class Combine {
*/
public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
GlobalCombineFn<? super InputT, ?, OutputT> fn) {
- return groupedValues(fn.<K>asKeyedFn());
+ return groupedValues(fn.<K>asKeyedFn(), ClassForDisplay.fromInstance(fn));
}
/**
@@ -271,9 +284,13 @@ public class Combine {
*/
public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
- return new GroupedValues<>(fn);
+ return groupedValues(fn, ClassForDisplay.fromInstance(fn));
}
+ private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
+ PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass) {
+ return new GroupedValues<>(fn, fnClass);
+ }
/////////////////////////////////////////////////////////////////////////////
@@ -495,6 +512,11 @@ public class Combine {
public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
return CombineFn.this;
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ CombineFn.this.populateDisplayData(builder);
+ }
};
}
}
@@ -1168,6 +1190,11 @@ public class Combine {
CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
return KeyedCombineFn.this.getDefaultOutputCoder(registry, keyCoder, inputCoder);
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ KeyedCombineFn.this.populateDisplayData(builder);
+ }
};
}
@@ -1233,31 +1260,36 @@ public class Combine {
extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
+ private final ClassForDisplay fnClass;
private final boolean insertDefault;
private final int fanout;
private final List<PCollectionView<?>> sideInputs;
- private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn,
+ private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn, ClassForDisplay fnClass,
boolean insertDefault, int fanout) {
this.fn = fn;
+ this.fnClass = fnClass;
this.insertDefault = insertDefault;
this.fanout = fanout;
this.sideInputs = ImmutableList.<PCollectionView<?>>of();
}
private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
- boolean insertDefault, int fanout) {
+ ClassForDisplay fnClass, boolean insertDefault, int fanout) {
super(name);
this.fn = fn;
+ this.fnClass = fnClass;
this.insertDefault = insertDefault;
this.fanout = fanout;
this.sideInputs = ImmutableList.<PCollectionView<?>>of();
}
private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
- boolean insertDefault, int fanout, List<PCollectionView<?>> sideInputs) {
+ ClassForDisplay fnClass, boolean insertDefault, int fanout,
+ List<PCollectionView<?>> sideInputs) {
super(name);
this.fn = fn;
+ this.fnClass = fnClass;
this.insertDefault = insertDefault;
this.fanout = fanout;
this.sideInputs = sideInputs;
@@ -1268,7 +1300,7 @@ public class Combine {
* specified name. Does not modify this transform.
*/
public Globally<InputT, OutputT> named(String name) {
- return new Globally<>(name, fn, insertDefault, fanout);
+ return new Globally<>(name, fn, fnClass, insertDefault, fanout);
}
/**
@@ -1279,7 +1311,7 @@ public class Combine {
* to an empty input set will be returned.
*/
public GloballyAsSingletonView<InputT, OutputT> asSingletonView() {
- return new GloballyAsSingletonView<>(fn, insertDefault, fanout);
+ return new GloballyAsSingletonView<>(fn, fnClass, insertDefault, fanout);
}
/**
@@ -1288,7 +1320,7 @@ public class Combine {
* is not globally windowed and the output is not being used as a side input.
*/
public Globally<InputT, OutputT> withoutDefaults() {
- return new Globally<>(name, fn, false, fanout);
+ return new Globally<>(name, fn, fnClass, false, fanout);
}
/**
@@ -1299,7 +1331,7 @@ public class Combine {
* that will be used.
*/
public Globally<InputT, OutputT> withFanout(int fanout) {
- return new Globally<>(name, fn, insertDefault, fanout);
+ return new Globally<>(name, fn, fnClass, insertDefault, fanout);
}
/**
@@ -1309,7 +1341,7 @@ public class Combine {
public Globally<InputT, OutputT> withSideInputs(
Iterable<? extends PCollectionView<?>> sideInputs) {
Preconditions.checkState(fn instanceof RequiresContextInternal);
- return new Globally<InputT, OutputT>(name, fn, insertDefault, fanout,
+ return new Globally<InputT, OutputT>(name, fn, fnClass, insertDefault, fanout,
ImmutableList.<PCollectionView<?>>copyOf(sideInputs));
}
@@ -1320,7 +1352,7 @@ public class Combine {
.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
Combine.PerKey<Void, InputT, OutputT> combine =
- Combine.<Void, InputT, OutputT>fewKeys(fn.asKeyedFn());
+ Combine.<Void, InputT, OutputT>fewKeys(fn.asKeyedFn(), fnClass);
if (!sideInputs.isEmpty()) {
combine = combine.withSideInputs(sideInputs);
}
@@ -1344,6 +1376,12 @@ public class Combine {
}
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ Combine.populateDisplayData(builder, fn, fnClass);
+ Combine.populateGlobalDisplayData(builder, fanout, insertDefault);
+ }
+
private PCollection<OutputT> insertDefaultValueIfEmpty(PCollection<OutputT> maybeEmpty) {
final PCollectionView<Iterable<OutputT>> maybeEmptyView = maybeEmpty.apply(
View.<OutputT>asIterable());
@@ -1370,6 +1408,20 @@ public class Combine {
}
}
+ private static void populateDisplayData(
+ DisplayData.Builder builder, HasDisplayData fn, ClassForDisplay fnClass) {
+ builder
+ .include(fn, fnClass)
+ .add("combineFn", fnClass);
+ }
+
+ private static void populateGlobalDisplayData(
+ DisplayData.Builder builder, int fanout, boolean insertDefault) {
+ builder
+ .addIfNotDefault("fanout", fanout, 0)
+ .add("emitDefaultOnEmptyInput", insertDefault);
+ }
+
/**
* {@code Combine.GloballyAsSingletonView<InputT, OutputT>} takes a {@code PCollection<InputT>}
* and returns a {@code PCollectionView<OutputT>} whose elements are the result of
@@ -1413,12 +1465,15 @@ public class Combine {
extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
+ private final ClassForDisplay fnClass;
private final boolean insertDefault;
private final int fanout;
private GloballyAsSingletonView(
- GlobalCombineFn<? super InputT, ?, OutputT> fn, boolean insertDefault, int fanout) {
+ GlobalCombineFn<? super InputT, ?, OutputT> fn, ClassForDisplay fnClass,
+ boolean insertDefault, int fanout) {
this.fn = fn;
+ this.fnClass = fnClass;
this.insertDefault = insertDefault;
this.fanout = fanout;
}
@@ -1449,6 +1504,12 @@ public class Combine {
public GlobalCombineFn<? super InputT, ?, OutputT> getCombineFn() {
return fn;
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ Combine.populateDisplayData(builder, fn, fnClass);
+ Combine.populateGlobalDisplayData(builder, fanout, insertDefault);
+ }
}
/**
@@ -1528,6 +1589,11 @@ public class Combine {
return accumulator.size() > 1 ? mergeToSingleton(accumulator) : accumulator;
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("combineFn", combiner.getClass());
+ }
+
private List<V> mergeToSingleton(Iterable<V> values) {
List<V> singleton = new ArrayList<>();
singleton.add(combiner.apply(values));
@@ -1601,30 +1667,35 @@ public class Combine {
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
private final transient PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+ private final ClassForDisplay fnClass;
private final boolean fewKeys;
private final List<PCollectionView<?>> sideInputs;
private PerKey(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, boolean fewKeys) {
+ PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass,
+ boolean fewKeys) {
this.fn = fn;
+ this.fnClass = fnClass;
this.fewKeys = fewKeys;
this.sideInputs = ImmutableList.of();
}
private PerKey(String name,
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass,
boolean fewKeys, List<PCollectionView<?>> sideInputs) {
super(name);
this.fn = fn;
+ this.fnClass = fnClass;
this.fewKeys = fewKeys;
this.sideInputs = sideInputs;
}
private PerKey(
String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- boolean fewKeys) {
+ ClassForDisplay fnClass, boolean fewKeys) {
super(name);
this.fn = fn;
+ this.fnClass = fnClass;
this.fewKeys = fewKeys;
this.sideInputs = ImmutableList.of();
}
@@ -1634,7 +1705,7 @@ public class Combine {
* specified name. Does not modify this transform.
*/
public PerKey<K, InputT, OutputT> named(String name) {
- return new PerKey<K, InputT, OutputT>(name, fn, fewKeys);
+ return new PerKey<K, InputT, OutputT>(name, fn, fnClass, fewKeys);
}
/**
@@ -1644,7 +1715,7 @@ public class Combine {
public PerKey<K, InputT, OutputT> withSideInputs(
Iterable<? extends PCollectionView<?>> sideInputs) {
Preconditions.checkState(fn instanceof RequiresContextInternal);
- return new PerKey<K, InputT, OutputT>(name, fn, fewKeys,
+ return new PerKey<K, InputT, OutputT>(name, fn, fnClass, fewKeys,
ImmutableList.<PCollectionView<?>>copyOf(sideInputs));
}
@@ -1661,7 +1732,7 @@ public class Combine {
*/
public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(
SerializableFunction<? super K, Integer> hotKeyFanout) {
- return new PerKeyWithHotKeyFanout<K, InputT, OutputT>(name, fn, hotKeyFanout);
+ return new PerKeyWithHotKeyFanout<K, InputT, OutputT>(name, fn, fnClass, hotKeyFanout);
}
/**
@@ -1669,7 +1740,7 @@ public class Combine {
* constant value for every key.
*/
public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(final int hotKeyFanout) {
- return new PerKeyWithHotKeyFanout<K, InputT, OutputT>(name, fn,
+ return new PerKeyWithHotKeyFanout<K, InputT, OutputT>(name, fn, fnClass,
new SerializableFunction<K, Integer>(){
@Override
public Integer apply(K unused) {
@@ -1698,6 +1769,11 @@ public class Combine {
.apply(GroupByKey.<K, InputT>create(fewKeys))
.apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ Combine.populateDisplayData(builder, fn, fnClass);
+ }
}
/**
@@ -1707,13 +1783,16 @@ public class Combine {
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
private final transient PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+ private final ClassForDisplay fnClass;
private final SerializableFunction<? super K, Integer> hotKeyFanout;
private PerKeyWithHotKeyFanout(String name,
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ ClassForDisplay fnClass,
SerializableFunction<? super K, Integer> hotKeyFanout) {
super(name);
this.fn = fn;
+ this.fnClass = fnClass;
this.hotKeyFanout = hotKeyFanout;
}
@@ -1996,6 +2075,12 @@ public class Combine {
.apply("PostCombine", Combine.perKey(postCombine));
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ Combine.populateDisplayData(builder, fn, fnClass);
+ builder.add("fanoutFn", hotKeyFanout.getClass());
+ }
+
/**
* Used to store either an input or accumulator value, for flattening
* the hot and cold key paths.
@@ -2137,23 +2222,28 @@ public class Combine {
PCollection<KV<K, OutputT>>> {
private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+ private final ClassForDisplay fnClass;
private final List<PCollectionView<?>> sideInputs;
- private GroupedValues(PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
+ private GroupedValues(
+ PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass) {
this.fn = SerializableUtils.clone(fn);
+ this.fnClass = fnClass;
this.sideInputs = ImmutableList.<PCollectionView<?>>of();
}
private GroupedValues(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ ClassForDisplay fnClass,
List<PCollectionView<?>> sideInputs) {
this.fn = SerializableUtils.clone(fn);
+ this.fnClass = fnClass;
this.sideInputs = sideInputs;
}
public GroupedValues<K, InputT, OutputT> withSideInputs(
Iterable<? extends PCollectionView<?>> sideInputs) {
- return new GroupedValues<>(fn, ImmutableList.<PCollectionView<?>>copyOf(sideInputs));
+ return new GroupedValues<>(fn, fnClass, ImmutableList.<PCollectionView<?>>copyOf(sideInputs));
}
/**
@@ -2240,5 +2330,10 @@ public class Combine {
kvCoder.getKeyCoder(), kvCoder.getValueCoder());
return KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder);
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ Combine.populateDisplayData(builder, fn, fnClass);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
index a57d446..1b64bb2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.TypeDescriptor;
import com.google.common.collect.ImmutableMap;
@@ -52,7 +54,7 @@ public class CombineFnBase {
* @param <AccumT> type of mutable accumulator values
* @param <OutputT> type of output values
*/
- public interface GlobalCombineFn<InputT, AccumT, OutputT> extends Serializable {
+ public interface GlobalCombineFn<InputT, AccumT, OutputT> extends Serializable, HasDisplayData {
/**
* Returns the {@code Coder} to use for accumulator {@code AccumT}
@@ -117,7 +119,8 @@ public class CombineFnBase {
* @param <AccumT> type of mutable accumulator values
* @param <OutputT> type of output values
*/
- public interface PerKeyCombineFn<K, InputT, AccumT, OutputT> extends Serializable {
+ public interface PerKeyCombineFn<K, InputT, AccumT, OutputT>
+ extends Serializable, HasDisplayData {
/**
* Returns the {@code Coder} to use for accumulator {@code AccumT}
* values, or null if it is not able to be inferred.
@@ -217,6 +220,16 @@ public class CombineFnBase {
return (TypeVariable<?>)
new TypeDescriptor<OutputT>(AbstractGlobalCombineFn.class) {}.getType();
}
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>By default, does not register any display data. Implementors may override this method
+ * to provide their own display metadata.
+ */
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ }
}
/**
@@ -282,5 +295,15 @@ public class CombineFnBase {
return (TypeVariable<?>)
new TypeDescriptor<OutputT>(AbstractPerKeyCombineFn.class) {}.getType();
}
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>By default, does not register any display data. Implementors may override this method
+ * to provide their own display metadata.
+ */
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index d98bd13..ed45498 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -31,14 +31,19 @@ import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.values.TupleTag;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -47,6 +52,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -455,6 +461,11 @@ public class CombineFns {
}
return new ComposedAccumulatorCoder(coders);
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ CombineFns.populateDisplayData(builder, combineFns);
+ }
}
/**
@@ -588,6 +599,11 @@ public class CombineFns {
}
return new ComposedAccumulatorCoder(coders);
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ CombineFns.populateDisplayData(builder, combineFnWithContexts);
+ }
}
/**
@@ -769,6 +785,11 @@ public class CombineFns {
}
return new ComposedAccumulatorCoder(coders);
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ CombineFns.populateDisplayData(builder, keyedCombineFns);
+ }
}
/**
@@ -915,6 +936,11 @@ public class CombineFns {
}
return new ComposedAccumulatorCoder(coders);
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ CombineFns.populateDisplayData(builder, keyedCombineFns);
+ }
}
/////////////////////////////////////////////////////////////////////////////
@@ -1008,4 +1034,43 @@ public class CombineFns {
"Cannot compose with tuple tag %s because it is already present in the composition.",
outputTag);
}
+
+ /**
+ * Populate display data for the {@code combineFns} that make up a composed combine transform.
+ *
+ * <p>The same combineFn class may be used multiple times, in which case we must take special care
+ * to register display data with unique namespaces.
+ */
+ private static void populateDisplayData(
+ DisplayData.Builder builder, List<? extends HasDisplayData> combineFns) {
+
+ // NB: ArrayListMultimap necessary to maintain ordering of combineFns of the same type.
+ Multimap<Class<?>, HasDisplayData> combineFnMap = ArrayListMultimap.create();
+
+ for (int i = 0; i < combineFns.size(); i++) {
+ HasDisplayData combineFn = combineFns.get(i);
+ builder.add("combineFn" + (i + 1), combineFn.getClass());
+ combineFnMap.put(combineFn.getClass(), combineFn);
+ }
+
+ for (Map.Entry<Class<?>, Collection<HasDisplayData>> combineFnEntries :
+ combineFnMap.asMap().entrySet()) {
+
+ Collection<HasDisplayData> classCombineFns = combineFnEntries.getValue();
+ if (classCombineFns.size() == 1) {
+ // Only one combineFn of this type, include it directly.
+ builder.include(Iterables.getOnlyElement(classCombineFns));
+
+ } else {
+ // Multiple combineFns of same type, add a namespace suffix so display data is
+ // unique and ordered.
+ String baseNamespace = combineFnEntries.getKey().getName();
+ for (int i = 0; i < combineFns.size(); i++) {
+ HasDisplayData combineFn = combineFns.get(i);
+ String namespace = String.format("%s#%d", baseNamespace, i + 1);
+ builder.include(combineFn, namespace);
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
index 77a7e53..9bb4a01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollectionView;
/**
@@ -167,6 +168,11 @@ public class CombineWithContext {
public CombineFnWithContext<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
return CombineFnWithContext.this;
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ CombineFnWithContext.this.populateDisplayData(builder);
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 132d7f2..28749d7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.common.Counter;
import org.apache.beam.sdk.util.common.Counter.AggregationKind;
import org.apache.beam.sdk.util.common.CounterProvider;
@@ -204,6 +205,11 @@ public class Max {
public T apply(T left, T right) {
return comparator.compare(left, right) >= 0 ? left : right;
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("comparer", comparator.getClass());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index dcee91f..8f3082e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.common.Counter;
import org.apache.beam.sdk.util.common.Counter.AggregationKind;
import org.apache.beam.sdk.util.common.CounterProvider;
@@ -204,6 +205,11 @@ public class Min {
public T apply(T left, T right) {
return comparator.compare(left, right) <= 0 ? left : right;
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("comparer", comparator.getClass());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 1e621d4..6362bd4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -153,6 +154,11 @@ public class Sample {
.of(new SampleAnyDoFn<>(limit, iterableView)))
.setCoder(in.getCoder());
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("sampleSize", limit);
+ }
}
/**
@@ -188,6 +194,7 @@ public class Sample {
extends CombineFn<T,
Top.BoundedHeap<KV<Integer, T>, SerializableComparator<KV<Integer, T>>>,
Iterable<T>> {
+ private final int sampleSize;
private final Top.TopCombineFn<KV<Integer, T>, SerializableComparator<KV<Integer, T>>>
topCombineFn;
private final Random rand = new Random();
@@ -196,6 +203,8 @@ public class Sample {
if (sampleSize < 0) {
throw new IllegalArgumentException("sample size must be >= 0");
}
+
+ this.sampleSize = sampleSize;
topCombineFn = new Top.TopCombineFn<KV<Integer, T>, SerializableComparator<KV<Integer, T>>>(
sampleSize, new KV.OrderByKey<Integer, T>());
}
@@ -244,5 +253,10 @@ public class Sample {
CoderRegistry registry, Coder<T> inputCoder) {
return IterableCoder.of(inputCoder);
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("sampleSize", sampleSize);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 82747c2..4b366bc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
import org.apache.beam.sdk.transforms.Combine.PerKey;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -393,6 +394,13 @@ new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
}
@Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder
+ .add("count", count)
+ .add("comparer", compareFn.getClass());
+ }
+
+ @Override
public String getIncompatibleGlobalWindowErrorMessage() {
return "Default values are not supported in Top.[of, smallest, largest]() if the output "
+ "PCollection is not windowed by GlobalWindows. Instead, use "
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java
new file mode 100644
index 0000000..455d6e1
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.display;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Display metadata representing a Java class.
+ *
+ * <p>Java classes can be registered as display metadata via
+ * {@link DisplayData.Builder#add(String, ClassForDisplay)}. {@link ClassForDisplay} is
+ * serializable, unlike {@link Class} which can fail to serialize for Java 8 lambda functions.
+ */
+public class ClassForDisplay implements Serializable {
+ private final String simpleName;
+ private final String name;
+
+ private ClassForDisplay(Class<?> clazz) {
+ name = clazz.getName();
+ simpleName = clazz.getSimpleName();
+ }
+
+ /**
+ * Create a {@link ClassForDisplay} instance representing the specified class.
+ */
+ public static ClassForDisplay of(Class<?> clazz) {
+ return new ClassForDisplay(checkNotNull(clazz));
+ }
+
+ /**
+ * Create a {@link ClassForDisplay} from the class of the specified object instance.
+ */
+ public static ClassForDisplay fromInstance(Object obj) {
+ checkNotNull(obj);
+ return new ClassForDisplay(obj.getClass());
+ }
+
+ /**
+ * Retrieve the fully-qualified name of the class.
+ *
+ * @see Class#getName()
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Retrieve a simple representation of the class name.
+ *
+ * @see Class#getSimpleName()
+ */
+ public String getSimpleName() {
+ return simpleName;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ClassForDisplay) {
+ ClassForDisplay that = (ClassForDisplay) obj;
+ return Objects.equals(this.name, that.name);
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 3aeed83..6065dc4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -128,7 +128,7 @@ public class DisplayData {
return builder.toString();
}
- private static String namespaceOf(Class<?> clazz) {
+ private static String namespaceOf(ClassForDisplay clazz) {
return clazz.getName();
}
@@ -138,19 +138,35 @@ public class DisplayData {
*/
public interface Builder {
/**
- * Register display metadata from the specified subcomponent. For example, a {@link ParDo}
- * transform includes display metadata from the encapsulated {@link DoFn}.
+ * Register display metadata from the specified subcomponent.
+ *
+ * @see #include(HasDisplayData, String)
*/
Builder include(HasDisplayData subComponent);
/**
* Register display metadata from the specified subcomponent, using the specified namespace.
- * For example, a {@link ParDo} transform includes display metadata from the encapsulated
- * {@link DoFn}.
+ *
+ * @see #include(HasDisplayData, String)
*/
Builder include(HasDisplayData subComponent, Class<?> namespace);
/**
+ * Register display metadata from the specified subcomponent, using the specified namespace.
+ *
+ * @see #include(HasDisplayData, String)
+ */
+ Builder include(HasDisplayData subComponent, ClassForDisplay namespace);
+
+ /**
+ * Register display metadata from the specified subcomponent, using the specified namespace.
+ *
+ * <p>For example, a {@link ParDo} transform includes display metadata from the encapsulated
+ * {@link DoFn}.
+ */
+ Builder include(HasDisplayData subComponent, String namespace);
+
+ /**
* Register the given string display metadata. The metadata item will be registered with type
* {@link DisplayData.Type#STRING}, and is identified by the specified key and namespace from
* the current transform or component.
@@ -289,6 +305,13 @@ public class DisplayData {
ItemBuilder add(String key, Class<?> value);
/**
+ * Register the given class display metadata. The metadata item will be registered with type
+ * {@link DisplayData.Type#JAVA_CLASS}, and is identified by the specified key and namespace
+ * from the current transform or component.
+ */
+ ItemBuilder add(String key, ClassForDisplay value);
+
+ /**
* Register the given class display data if the value is not null.
*
* @see DisplayData.Builder#add(String, Class)
@@ -296,6 +319,13 @@ public class DisplayData {
ItemBuilder addIfNotNull(String key, @Nullable Class<?> value);
/**
+ * Register the given class display data if the value is not null.
+ *
+ * @see DisplayData.Builder#add(String, ClassForDisplay)
+ */
+ ItemBuilder addIfNotNull(String key, @Nullable ClassForDisplay value);
+
+ /**
* Register the given class display data if the value is different than the specified default.
*
* @see DisplayData.Builder#add(String, Class)
@@ -303,6 +333,13 @@ public class DisplayData {
ItemBuilder addIfNotDefault(
String key, @Nullable Class<?> value, @Nullable Class<?> defaultValue);
+ /**
+ * Register the given class display data if the value is different than the specified default.
+ *
+ * @see DisplayData.Builder#add(String, ClassForDisplay)
+ */
+ ItemBuilder addIfNotDefault(
+ String key, @Nullable ClassForDisplay value, @Nullable ClassForDisplay defaultValue);
/**
* Register the given display metadata with the specified type.
*
@@ -345,6 +382,14 @@ public class DisplayData {
* <p>Leaving the namespace unspecified will default to the registering instance's class.
*/
ItemBuilder withNamespace(Class<?> namespace);
+
+ /**
+ * Adds an explicit namespace to the most-recently added display metadata. The namespace
+ * and key uniquely identify the display metadata.
+ *
+ * <p>Leaving the namespace unspecified will default to the registering instance's class.
+ */
+ ItemBuilder withNamespace(ClassForDisplay namespace);
}
/**
@@ -362,11 +407,10 @@ public class DisplayData {
private final String label;
private final String url;
- private static Item create(Class<?> nsClass, String key, Type type, Object value) {
+ private static Item create(String nsClass, String key, Type type, Object value) {
FormattedItemValue formatted = type.format(value);
- String namespace = namespaceOf(nsClass);
return new Item(
- namespace, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null);
+ nsClass, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null);
}
private Item(
@@ -494,7 +538,7 @@ public class DisplayData {
return new Item(this.ns, this.key, this.type, this.value, this.shortValue, url, this.label);
}
- private Item withNamespace(Class<?> nsClass) {
+ private Item withNamespace(ClassForDisplay nsClass) {
String namespace = namespaceOf(nsClass);
return new Item(
namespace, this.key, this.type, this.value, this.shortValue, this.url, this.label);
@@ -515,7 +559,7 @@ public class DisplayData {
private final String ns;
private final String key;
- public static Identifier of(Class<?> namespace, String key) {
+ public static Identifier of(ClassForDisplay namespace, String key) {
return of(namespaceOf(namespace), key);
}
@@ -608,7 +652,12 @@ public class DisplayData {
JAVA_CLASS {
@Override
FormattedItemValue format(Object value) {
- Class<?> clazz = checkType(value, Class.class, JAVA_CLASS);
+ if (value instanceof Class<?>) {
+ ClassForDisplay classForDisplay = ClassForDisplay.of((Class<?>) value);
+ return format(classForDisplay);
+ }
+
+ ClassForDisplay clazz = checkType(value, ClassForDisplay.class, JAVA_CLASS);
return new FormattedItemValue(clazz.getName(), clazz.getSimpleName());
}
};
@@ -644,7 +693,7 @@ public class DisplayData {
return TIMESTAMP;
} else if (value instanceof Duration) {
return DURATION;
- } else if (value instanceof Class<?>) {
+ } else if (value instanceof Class<?> || value instanceof ClassForDisplay) {
return JAVA_CLASS;
} else if (value instanceof String) {
return STRING;
@@ -680,7 +729,7 @@ public class DisplayData {
private final Map<Identifier, Item> entries;
private final Set<Object> visited;
- private Class<?> latestNs;
+ private String latestNs;
@Nullable
private Item latestItem;
@@ -704,13 +753,25 @@ public class DisplayData {
@Override
public Builder include(HasDisplayData subComponent, Class<?> namespace) {
+ checkNotNull(namespace);
+ return include(subComponent, ClassForDisplay.of(namespace));
+ }
+
+ @Override
+ public Builder include(HasDisplayData subComponent, ClassForDisplay namespace) {
+ checkNotNull(namespace);
+ return include(subComponent, namespaceOf(namespace));
+ }
+
+ @Override
+ public Builder include(HasDisplayData subComponent, String namespace) {
checkNotNull(subComponent);
checkNotNull(namespace);
commitLatest();
boolean newComponent = visited.add(subComponent);
if (newComponent) {
- Class prevNs = this.latestNs;
+ String prevNs = this.latestNs;
this.latestNs = namespace;
subComponent.populateDisplayData(this);
this.latestNs = prevNs;
@@ -822,17 +883,34 @@ public class DisplayData {
}
@Override
+ public ItemBuilder add(String key, ClassForDisplay value) {
+ checkNotNull(value);
+ return addItemIf(true, key, Type.JAVA_CLASS, value);
+ }
+
+ @Override
public ItemBuilder addIfNotNull(String key, @Nullable Class<?> value) {
return addItemIf(value != null, key, Type.JAVA_CLASS, value);
}
@Override
+ public ItemBuilder addIfNotNull(String key, @Nullable ClassForDisplay value) {
+ return addItemIf(value != null, key, Type.JAVA_CLASS, value);
+ }
+
+ @Override
public ItemBuilder addIfNotDefault(
String key, @Nullable Class<?> value, @Nullable Class<?> defaultValue) {
return addItemIf(!Objects.equals(value, defaultValue), key, Type.JAVA_CLASS, value);
}
@Override
+ public ItemBuilder addIfNotDefault(
+ String key, @Nullable ClassForDisplay value, @Nullable ClassForDisplay defaultValue) {
+ return addItemIf(!Objects.equals(value, defaultValue), key, Type.JAVA_CLASS, value);
+ }
+
+ @Override
public ItemBuilder add(String key, Type type, Object value) {
checkNotNull(value);
checkNotNull(type);
@@ -887,6 +965,11 @@ public class DisplayData {
@Override
public ItemBuilder withNamespace(Class<?> namespace) {
checkNotNull(namespace);
+ return withNamespace(ClassForDisplay.of(namespace));
+ }
+
+ @Override
+ public ItemBuilder withNamespace(ClassForDisplay namespace) {
if (latestItem != null) {
latestItem = latestItem.withNamespace(namespace);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
index fbb683c..34197f7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.state.StateContext;
import java.io.IOException;
@@ -101,6 +102,10 @@ public class CombineFnUtil {
CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
return combineFn.getDefaultOutputCoder(registry, inputCoder);
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ combineFn.populateDisplayData(builder);
+ }
};
}
}
@@ -150,6 +155,10 @@ public class CombineFnUtil {
Coder<InputT> inputCoder) throws CannotProvideCoderException {
return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ keyedCombineFn.populateDisplayData(builder);
+ }
};
}
}
@@ -195,6 +204,10 @@ public class CombineFnUtil {
Coder<InputT> inputCoder) throws CannotProvideCoderException {
return combineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ combineFn.populateDisplayData(builder);
+ }
private void writeObject(@SuppressWarnings("unused") ObjectOutputStream out)
throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
index 6bc5c1e..cc81748 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.TestUtils.checkCombineFn;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import org.apache.beam.sdk.Pipeline;
@@ -28,6 +30,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.ApproximateQuantiles.ApproximateQuantilesCombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -232,6 +235,16 @@ public class ApproximateQuantilesTest {
Arrays.asList("b", "aaa", "ccccc"));
}
+ @Test
+ public void testDisplayData() {
+ Top.Largest<Integer> comparer = new Top.Largest<Integer>();
+ PTransform<?, ?> approxQuanitiles = ApproximateQuantiles.globally(20, comparer);
+ DisplayData displayData = DisplayData.from(approxQuanitiles);
+
+ assertThat(displayData, hasDisplayItem("numQuantiles", 20));
+ assertThat(displayData, hasDisplayItem("comparer", comparer.getClass()));
+ }
+
private Matcher<Iterable<? extends Integer>> quantileMatcher(
int size, int numQuantiles, int absoluteError) {
List<Matcher<? super Integer>> quantiles = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 3a4b813..c94c9f1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -26,6 +29,7 @@ import org.apache.beam.sdk.TestUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -290,4 +294,17 @@ public class ApproximateUniqueTest implements Serializable {
return null;
}
}
+
+ @Test
+ public void testDisplayData() {
+ ApproximateUnique.Globally<Integer> specifiedSampleSize = ApproximateUnique.globally(1234);
+ ApproximateUnique.PerKey<String, Integer> specifiedMaxError = ApproximateUnique.perKey(0.1234);
+
+ assertThat(DisplayData.from(specifiedSampleSize), hasDisplayItem("sampleSize", 1234));
+
+ DisplayData maxErrorDisplayData = DisplayData.from(specifiedMaxError);
+ assertThat(maxErrorDisplayData, hasDisplayItem("maximumEstimationError", 0.1234));
+ assertThat("calculated sampleSize should be included", maxErrorDisplayData,
+ hasDisplayItem(hasKey("sampleSize")));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 0a02538..e66f13a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes;
+
import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.Pipeline;
@@ -35,6 +38,7 @@ import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
import org.apache.beam.sdk.transforms.Min.MinIntegerFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -61,7 +65,7 @@ import java.util.List;
* Unit tests for {@link CombineFns}.
*/
@RunWith(JUnit4.class)
-public class CombineFnsTest {
+public class CombineFnsTest {
@Rule public ExpectedException expectedException = ExpectedException.none();
@Test
@@ -278,6 +282,69 @@ public class CombineFnsTest {
p.run();
}
+ @Test
+ public void testComposedCombineDisplayData() {
+ SimpleFunction<String, String> extractFn = new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input;
+ }
+ };
+
+ DisplayDataCombineFn combineFn1 = new DisplayDataCombineFn("value1");
+ DisplayDataCombineFn combineFn2 = new DisplayDataCombineFn("value2");
+
+ CombineFns.ComposedCombineFn<String> composedCombine = CombineFns.compose()
+ .with(extractFn, combineFn1, new TupleTag<String>())
+ .with(extractFn, combineFn2, new TupleTag<String>());
+
+ DisplayData displayData = DisplayData.from(composedCombine);
+ assertThat(displayData, hasDisplayItem("combineFn1", combineFn1.getClass()));
+ assertThat(displayData, hasDisplayItem("combineFn2", combineFn2.getClass()));
+
+ String nsBase = DisplayDataCombineFn.class.getName();
+ assertThat(displayData, includes(combineFn1, nsBase + "#1"));
+ assertThat(displayData, includes(combineFn2, nsBase + "#2"));
+ }
+
+ private static class DisplayDataCombineFn extends Combine.CombineFn<String, String, String> {
+ private final String value;
+ private static int i;
+ private final int id;
+
+ DisplayDataCombineFn(String value) {
+ id = ++i;
+ this.value = value;
+ }
+
+ @Override
+ public String createAccumulator() {
+ return null;
+ }
+
+ @Override
+ public String addInput(String accumulator, String input) {
+ return null;
+ }
+
+ @Override
+ public String mergeAccumulators(Iterable<String> accumulators) {
+ return null;
+ }
+
+ @Override
+ public String extractOutput(String accumulator) {
+ return null;
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder
+ .add("uniqueKey" + id, value)
+ .add("sharedKey", value);
+ }
+ }
+
private static class UserString implements Serializable {
private String strValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 049baa3..b710641 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -18,7 +18,8 @@
package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.TestUtils.checkCombineFn;
-
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
@@ -44,6 +45,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -682,6 +684,24 @@ public class CombineTest implements Serializable {
Combine.perKey(new TestKeyedCombineFn()).withHotKeyFanout(10).getName());
}
+ @Test
+ public void testDisplayData() {
+ UniqueInts combineFn = new UniqueInts() {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("fnMetadata", "foobar");
+ }
+ };
+ Combine.Globally<?, ?> combine = Combine.globally(combineFn)
+ .withFanout(1234);
+ DisplayData displayData = DisplayData.from(combine);
+
+ assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass()));
+ assertThat(displayData, hasDisplayItem("emitDefaultOnEmptyInput", true));
+ assertThat(displayData, hasDisplayItem("fanout", 1234));
+ assertThat(displayData, includes(combineFn));
+ }
+
////////////////////////////////////////////////////////////////////////////
// Test classes, for different kinds of combining fns.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
index 5722365..226255a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
@@ -18,9 +18,12 @@
package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.TestUtils.checkCombineFn;
-
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
import com.google.common.collect.Lists;
import org.junit.Test;
@@ -65,4 +68,12 @@ public class MaxTest {
Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
4.0);
}
+
+ @Test
+ public void testDisplayData() {
+ Top.Largest<Integer> comparer = new Top.Largest<>();
+
+ Combine.Globally<Integer, Integer> max = Max.globally(comparer);
+ assertThat(DisplayData.from(max), hasDisplayItem("comparer", comparer.getClass()));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
index 8f1d301..d7ec322 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
@@ -17,10 +17,13 @@
*/
package org.apache.beam.sdk.transforms;
-import static org.apache.beam.sdk.TestUtils.checkCombineFn;
+import static org.apache.beam.sdk.TestUtils.checkCombineFn;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import com.google.common.collect.Lists;
import org.junit.Test;
@@ -65,4 +68,12 @@ public class MinTest {
Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
1.0);
}
+
+ @Test
+ public void testDisplayData() {
+ Top.Smallest<Integer> comparer = new Top.Smallest<>();
+
+ Combine.Globally<Integer, Integer> min = Min.globally(comparer);
+ assertThat(DisplayData.from(min), hasDisplayItem("comparer", comparer.getClass()));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 0c2af3f..4b1d5dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.TestUtils.LINES;
import static org.apache.beam.sdk.TestUtils.NO_LINES;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.base.Joiner;
@@ -260,4 +263,15 @@ public class SampleTest {
public void testSampleGetName() {
assertEquals("Sample.SampleAny", Sample.<String>any(1).getName());
}
+
+ @Test
+ public void testDisplayData() {
+ PTransform<?, ?> sampleAny = Sample.any(1234);
+ DisplayData sampleAnyDisplayData = DisplayData.from(sampleAny);
+ assertThat(sampleAnyDisplayData, hasDisplayItem("sampleSize", 1234));
+
+ PTransform<?, ?> samplePerKey = Sample.fixedSizePerKey(2345);
+ DisplayData perKeyDisplayData = DisplayData.from(samplePerKey);
+ assertThat(perKeyDisplayData, hasDisplayItem("sampleSize", 2345));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
index 1815cc9..6d580e7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import org.apache.beam.sdk.Pipeline;
@@ -25,6 +27,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.Bound;
@@ -233,6 +236,16 @@ public class TopTest {
assertEquals("Largest.PerKey", Top.<String, Integer>largestPerKey(2).getName());
}
+ @Test
+ public void testDisplayData() {
+ Top.Largest<Integer> comparer = new Top.Largest<Integer>();
+ Combine.Globally<Integer, List<Integer>> top = Top.of(1234, comparer);
+ DisplayData displayData = DisplayData.from(top);
+
+ assertThat(displayData, hasDisplayItem("count", 1234));
+ assertThat(displayData, hasDisplayItem("comparer", comparer.getClass()));
+ }
+
private static class OrderByLength implements Comparator<String>, Serializable {
@Override
public int compare(String a, String b) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java
new file mode 100644
index 0000000..19f56c6
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.display;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.SerializableUtils;
+import com.google.common.testing.EqualsTester;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link ClassForDisplay}.
+ */
+@RunWith(JUnit4.class)
+public class ClassForDisplayTest {
+ @Rule
+ public final ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testProperties() {
+ ClassForDisplay thisClass = ClassForDisplay.of(ClassForDisplayTest.class);
+ assertEquals(ClassForDisplayTest.class.getName(), thisClass.getName());
+ assertEquals(ClassForDisplayTest.class.getSimpleName(), thisClass.getSimpleName());
+ }
+
+ @Test
+ public void testInputValidation() {
+ thrown.expect(NullPointerException.class);
+ ClassForDisplay.of(null);
+ }
+
+ @Test
+ public void testEquality() {
+ new EqualsTester()
+ .addEqualityGroup(
+ ClassForDisplay.of(ClassForDisplayTest.class), ClassForDisplay.fromInstance(this))
+ .addEqualityGroup(ClassForDisplay.of(ClassForDisplay.class))
+ .addEqualityGroup(ClassForDisplay.of(Class.class))
+ .testEquals();
+ }
+
+ @Test
+ public void testSerialization() {
+ SerializableUtils.ensureSerializable(ClassForDisplay.of(ClassForDisplayTest.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
index 8cfb5c2..abdc350 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
@@ -149,40 +149,42 @@ public class DisplayDataMatchers {
}
}
- /**
- * Create a matcher that matches if the examined {@link DisplayData} contains all display data
- * registered from the specified subcomponent.
- */
- public static Matcher<DisplayData> includes(final HasDisplayData subComponent) {
+ /** @see #includes(HasDisplayData, String) */
+ public static Matcher<DisplayData> includes(HasDisplayData subComponent) {
return includes(subComponent, subComponent.getClass());
}
+ /** @see #includes(HasDisplayData, String) */
+ public static Matcher<DisplayData> includes(
+ HasDisplayData subComponent, Class<? extends HasDisplayData> namespace) {
+ return includes(subComponent, namespace.getName());
+ }
+
/**
* Create a matcher that matches if the examined {@link DisplayData} contains all display data
* registered from the specified subcomponent and namespace.
*/
public static Matcher<DisplayData> includes(
- final HasDisplayData subComponent, final Class<? extends HasDisplayData> namespace) {
+ final HasDisplayData subComponent, final String namespace) {
return new CustomTypeSafeMatcher<DisplayData>("includes subcomponent") {
@Override
protected boolean matchesSafely(DisplayData displayData) {
- DisplayData subComponentData = DisplayData.from(subComponent);
+ DisplayData subComponentData = subComponentData();
if (subComponentData.items().size() == 0) {
throw new UnsupportedOperationException("subComponent contains no display data; " +
"cannot verify whether it is included");
}
- DisplayDataComparision comparison = checkSubset(displayData, subComponentData, namespace);
+ DisplayDataComparison comparison = checkSubset(displayData, subComponentData);
return comparison.missingItems.isEmpty();
}
-
@Override
protected void describeMismatchSafely(
DisplayData displayData, Description mismatchDescription) {
- DisplayData subComponentDisplayData = DisplayData.from(subComponent);
- DisplayDataComparision comparison = checkSubset(
- displayData, subComponentDisplayData, subComponent.getClass());
+ DisplayData subComponentDisplayData = subComponentData();
+ DisplayDataComparison comparison = checkSubset(
+ displayData, subComponentDisplayData);
mismatchDescription
.appendText("did not include:\n")
@@ -191,12 +193,21 @@ public class DisplayDataMatchers {
.appendValue(comparison.unmatchedItems);
}
- private DisplayDataComparision checkSubset(
- DisplayData displayData, DisplayData included, Class<?> namespace) {
- DisplayDataComparision comparison = new DisplayDataComparision(displayData.items());
+ private DisplayData subComponentData() {
+ return DisplayData.from(new HasDisplayData() {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.include(subComponent, namespace);
+ }
+ });
+ }
+
+ private DisplayDataComparison checkSubset(
+ DisplayData displayData, DisplayData included) {
+ DisplayDataComparison comparison = new DisplayDataComparison(displayData.items());
for (Item item : included.items()) {
Item matchedItem = displayData.asMap().get(
- DisplayData.Identifier.of(namespace, item.getKey()));
+ DisplayData.Identifier.of(item.getNamespace(), item.getKey()));
if (matchedItem != null) {
comparison.matched(matchedItem);
@@ -208,11 +219,11 @@ public class DisplayDataMatchers {
return comparison;
}
- class DisplayDataComparision {
+ class DisplayDataComparison {
Collection<DisplayData.Item> missingItems;
Collection<DisplayData.Item> unmatchedItems;
- DisplayDataComparision(Collection<Item> superset) {
+ DisplayDataComparison(Collection<Item> superset) {
missingItems = Sets.newHashSet();
unmatchedItems = Sets.newHashSet(superset);
}
@@ -315,7 +326,9 @@ public class DisplayDataMatchers {
valueMatcher, "with value", "value") {
@Override
protected T featureValueOf(DisplayData.Item actual) {
- return (T) actual.getValue();
+ @SuppressWarnings("unchecked")
+ T value = (T) actual.getValue();
+ return value;
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 5aee8dd..106c441 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -340,7 +340,7 @@ public class DisplayDataTest {
@Override
public void populateDisplayData(Builder builder) {
builder.add("foo", "bar")
- .withNamespace(null);
+ .withNamespace((Class<?>) null);
}
});
}
@@ -349,10 +349,10 @@ public class DisplayDataTest {
public void testIdentifierEquality() {
new EqualsTester()
.addEqualityGroup(
- DisplayData.Identifier.of(DisplayDataTest.class, "1"),
- DisplayData.Identifier.of(DisplayDataTest.class, "1"))
- .addEqualityGroup(DisplayData.Identifier.of(Object.class, "1"))
- .addEqualityGroup(DisplayData.Identifier.of(DisplayDataTest.class, "2"))
+ DisplayData.Identifier.of(ClassForDisplay.of(DisplayDataTest.class), "1"),
+ DisplayData.Identifier.of(ClassForDisplay.of(DisplayDataTest.class), "1"))
+ .addEqualityGroup(DisplayData.Identifier.of(ClassForDisplay.of(Object.class), "1"))
+ .addEqualityGroup(DisplayData.Identifier.of(ClassForDisplay.of(DisplayDataTest.class), "2"))
.testEquals();
}
@@ -568,6 +568,7 @@ public class DisplayDataTest {
.add("float", 3.14)
.add("boolean", true)
.add("java_class", DisplayDataTest.class)
+ .add("java_class2", ClassForDisplay.of(DisplayDataTest.class))
.add("timestamp", Instant.now())
.add("duration", Duration.standardHours(1));
}
@@ -585,6 +586,9 @@ public class DisplayDataTest {
hasItem(allOf(hasKey("java_class"), hasType(DisplayData.Type.JAVA_CLASS))));
assertThat(
items,
+ hasItem(allOf(hasKey("java_class2"), hasType(DisplayData.Type.JAVA_CLASS))));
+ assertThat(
+ items,
hasItem(allOf(hasKey("timestamp"), hasType(DisplayData.Type.TIMESTAMP))));
assertThat(
items, hasItem(allOf(hasKey("duration"), hasType(DisplayData.Type.DURATION))));
@@ -678,6 +682,8 @@ public class DisplayDataTest {
assertEquals(DisplayData.Type.BOOLEAN, DisplayData.inferType(true));
assertEquals(DisplayData.Type.TIMESTAMP, DisplayData.inferType(Instant.now()));
assertEquals(DisplayData.Type.DURATION, DisplayData.inferType(Duration.millis(1234)));
+ assertEquals(DisplayData.Type.JAVA_CLASS,
+ DisplayData.inferType(ClassForDisplay.of(DisplayDataTest.class)));
assertEquals(DisplayData.Type.JAVA_CLASS, DisplayData.inferType(DisplayDataTest.class));
assertEquals(DisplayData.Type.STRING, DisplayData.inferType("hello world"));
@@ -773,7 +779,7 @@ public class DisplayDataTest {
DisplayData.from(new HasDisplayData() {
@Override
public void populateDisplayData(Builder builder) {
- builder.include(subComponent, null);
+ builder.include(subComponent, (ClassForDisplay) null);
}
});
}