You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/10/20 18:19:01 UTC
[2/3] incubator-beam git commit: Add Display Data 'path' metadata
Add Display Data 'path' metadata
Display Data supports the notion of "sub components", components within
a transform class which can contribute their own display data. We add a
namespace to display data items based on the originating component,
which keeps the display data items unique within the step.
There are instances where a component is included multiple times within
a step. We handle the case of the same instance being shared by simply
ignoring it the second time. However, we don't handle the case of a
separate instance being added of the same class. Currently the separate
instances will add display data with the same namespace and key, causing
a failure.
This can come up for example when infrastructure at different levels
wrap and re-wrap a component. We saw this with a bounded source being
adapted multiple times, Bounded -> Unbounded -> Bounded -> Unbounded.
The BoundedToUnboundedSourceAdapter was included multiple times with
separate instances and caused a failure while populating display data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad03d07a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad03d07a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad03d07a
Branch: refs/heads/master
Commit: ad03d07ae783f054a31e8b2e14100afff8cdf747
Parents: ff6301b
Author: Scott Wegner <sw...@google.com>
Authored: Wed Oct 12 14:49:41 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Oct 20 11:10:13 2016 -0700
----------------------------------------------------------------------
.../core/UnboundedReadFromBoundedSource.java | 2 +-
runners/direct-java/pom.xml | 5 -
.../runners/direct/ForwardingPTransform.java | 2 +-
.../beam/runners/direct/DirectRunnerTest.java | 32 --
.../direct/ForwardingPTransformTest.java | 7 +-
.../beam/runners/dataflow/DataflowRunner.java | 5 +-
.../DataflowUnboundedReadFromBoundedSource.java | 4 +-
.../sdk/io/BoundedReadFromUnboundedSource.java | 5 +-
.../apache/beam/sdk/io/CompressedSource.java | 2 +-
.../java/org/apache/beam/sdk/io/PubsubIO.java | 5 +-
.../main/java/org/apache/beam/sdk/io/Read.java | 4 +-
.../main/java/org/apache/beam/sdk/io/Write.java | 6 +-
.../sdk/options/ProxyInvocationHandler.java | 149 +++---
.../org/apache/beam/sdk/transforms/Combine.java | 60 +--
.../apache/beam/sdk/transforms/CombineFns.java | 33 +-
.../beam/sdk/transforms/CombineWithContext.java | 3 +-
.../beam/sdk/transforms/DoFnAdapters.java | 2 +-
.../beam/sdk/transforms/FlatMapElements.java | 6 +-
.../apache/beam/sdk/transforms/MapElements.java | 8 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 60 ++-
.../apache/beam/sdk/transforms/Partition.java | 2 +-
.../sdk/transforms/display/DisplayData.java | 518 +++++++++++++------
.../beam/sdk/transforms/windowing/Window.java | 2 +-
.../io/BoundedReadFromUnboundedSourceTest.java | 4 +-
.../beam/sdk/io/CompressedSourceTest.java | 4 +-
.../java/org/apache/beam/sdk/io/ReadTest.java | 10 +-
.../java/org/apache/beam/sdk/io/WriteTest.java | 6 +-
.../sdk/options/ProxyInvocationHandlerTest.java | 40 ++
.../beam/sdk/transforms/CombineFnsTest.java | 7 +-
.../apache/beam/sdk/transforms/CombineTest.java | 4 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 8 +-
.../transforms/display/DisplayDataMatchers.java | 141 +++--
.../display/DisplayDataMatchersTest.java | 67 ++-
.../sdk/transforms/display/DisplayDataTest.java | 367 +++++++++----
.../sdk/transforms/windowing/WindowTest.java | 4 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 5 -
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 +-
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 2 +-
38 files changed, 988 insertions(+), 605 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 91a1715..2afdcf2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -108,7 +108,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
.add(DisplayData.item("source", source.getClass()))
- .include(source);
+ .include("source", source);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 354c8c7..6cb1838 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -286,11 +286,6 @@
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
index 3160b58..77311c2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
@@ -57,6 +57,6 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- delegate().populateDisplayData(builder);
+ builder.delegate(delegate());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 37af90c..4027d25 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.InputStream;
@@ -242,37 +241,6 @@ public class DirectRunnerTest implements Serializable {
p.run();
}
- @Test
- public void pipelineOptionsDisplayDataExceptionShouldFail() {
- Object brokenValueType = new Object() {
- @JsonValue
- public int getValue () {
- return 42;
- }
-
- @Override
- public String toString() {
- throw new RuntimeException("oh noes!!");
- }
- };
-
- Pipeline p = getPipeline();
- p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType);
-
- p.apply(Create.of(1, 2, 3));
-
- thrown.expectMessage(PipelineOptions.class.getName());
- thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
- p.run();
- }
-
- /** {@link PipelineOptions} to inject bad object implementations. */
- public interface ObjectPipelineOptions extends PipelineOptions {
- Object getValue();
- void setValue(Object value);
- }
-
-
/**
* Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
* {@link DirectRunner}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
index 6abaf92..c75adaa 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -102,10 +103,10 @@ public class ForwardingPTransformTest {
@Test
public void populateDisplayDataDelegates() {
- DisplayData.Builder builder = mock(DisplayData.Builder.class);
- doThrow(RuntimeException.class).when(delegate).populateDisplayData(builder);
+ doThrow(RuntimeException.class)
+ .when(delegate).populateDisplayData(any(DisplayData.Builder.class));
thrown.expect(RuntimeException.class);
- forwarding.populateDisplayData(builder);
+ DisplayData.from(forwarding);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5f83788..7bf270d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2249,8 +2249,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.add(DisplayData.item("source", source.getClass()));
- builder.include(source);
+ builder
+ .add(DisplayData.item("source", source.getClass()))
+ .include("source", source);
}
public UnboundedSource<T, ?> getSource() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index e4257d1..96a35bc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -120,7 +120,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
.add(DisplayData.item("source", source.getClass()))
- .include(source);
+ .include("source", source);
}
/**
@@ -195,7 +195,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("source", boundedSource.getClass()));
- builder.include(boundedSource);
+ builder.include("source", boundedSource);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 630a8a3..40c52a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -119,7 +119,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
.withLabel("Maximum Read Records"), Long.MAX_VALUE)
.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
.withLabel("Maximum Read Time"))
- .include(source);
+ .include("source", source);
}
private static class UnboundedToBoundedSourceAdapter<T>
@@ -204,8 +204,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.add(DisplayData.item("source", source.getClass()));
- builder.include(source);
+ builder.delegate(source);
}
private class Reader extends BoundedReader<ValueWithRecordId<T>> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 680dc2c..bf871b7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -390,7 +390,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
- .include(sourceDelegate)
+ .include("source", sourceDelegate)
.add(DisplayData.item("source", sourceDelegate.getClass())
.withLabel("Read Source"));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 6091156..72a6399 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -792,8 +792,7 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- Bound.this.populateDisplayData(builder);
+ builder.delegate(Bound.this);
}
}
}
@@ -1043,7 +1042,7 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- Bound.this.populateDisplayData(builder);
+ builder.delegate(Bound.this);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 29c4e47..f04fbaf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -127,7 +127,7 @@ public class Read {
builder
.add(DisplayData.item("source", source.getClass())
.withLabel("Read Source"))
- .include(source);
+ .include("source", source);
}
}
@@ -194,7 +194,7 @@ public class Read {
builder
.add(DisplayData.item("source", source.getClass())
.withLabel("Read Source"))
- .include(source);
+ .include("source", source);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index e8b19d9..7559fca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -118,7 +118,7 @@ public class Write {
super.populateDisplayData(builder);
builder
.add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
- .include(sink)
+ .include("sink", sink)
.addIfNotDefault(
DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"),
0);
@@ -209,7 +209,7 @@ public class Write {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- Write.Bound.this.populateDisplayData(builder);
+ builder.delegate(Write.Bound.this);
}
}
@@ -261,7 +261,7 @@ public class Write {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- Write.Bound.this.populateDisplayData(builder);
+ builder.delegate(Write.Bound.this);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index a77dcc6..3e74916 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -86,7 +86,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
* {@link PipelineOptions#as(Class)}.
*/
@ThreadSafe
-class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
+class ProxyInvocationHandler implements InvocationHandler {
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
* No two instances of this class are considered equivalent hence we generate a random hash code.
@@ -138,8 +138,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
&& args[0] instanceof DisplayData.Builder) {
@SuppressWarnings("unchecked")
DisplayData.Builder builder = (DisplayData.Builder) args[0];
- // Explicitly set display data namespace so thrown exceptions will have sensible type.
- builder.include(this, PipelineOptions.class);
+ builder.delegate(new PipelineOptionsDisplayData());
return Void.TYPE;
}
String methodName = method.getName();
@@ -243,88 +242,116 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
}
/**
- * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set
- * pipeline options will be added as display data.
+ * Nested class to handle display data in order to set the display data namespace to something
+ * sensible.
*/
- public void populateDisplayData(DisplayData.Builder builder) {
- Set<PipelineOptionSpec> optionSpecs = PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
- Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
-
- for (Map.Entry<String, BoundValue> option : options.entrySet()) {
- BoundValue boundValue = option.getValue();
- if (boundValue.isDefault()) {
- continue;
- }
+ class PipelineOptionsDisplayData implements HasDisplayData {
+ /**
+ * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set
+ * pipeline options will be added as display data.
+ */
+ public void populateDisplayData(DisplayData.Builder builder) {
+ Set<PipelineOptionSpec> optionSpecs =
+ PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
- Object value = boundValue.getValue() == null ? "" : boundValue.getValue();
- DisplayData.Type type = DisplayData.inferType(value);
- HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(option.getKey()));
+ Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
- for (PipelineOptionSpec optionSpec : specs) {
- if (!optionSpec.shouldSerialize()) {
- // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also
- // excluded from display data. These options are generally not useful for display.
+ for (Map.Entry<String, BoundValue> option : options.entrySet()) {
+ BoundValue boundValue = option.getValue();
+ if (boundValue.isDefault()) {
continue;
}
- Class<?> pipelineInterface = optionSpec.getDefiningInterface();
- if (type != null) {
- builder.add(DisplayData.item(option.getKey(), type, value)
- .withNamespace(pipelineInterface));
- } else {
- builder.add(DisplayData.item(option.getKey(), displayDataString(value))
- .withNamespace(pipelineInterface));
+ DisplayDataValue resolved = DisplayDataValue.resolve(boundValue.getValue());
+ HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(option.getKey()));
+
+ for (PipelineOptionSpec optionSpec : specs) {
+ if (!optionSpec.shouldSerialize()) {
+ // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also
+ // excluded from display data. These options are generally not useful for display.
+ continue;
+ }
+
+ builder.add(DisplayData.item(option.getKey(), resolved.getType(), resolved.getValue())
+ .withNamespace(optionSpec.getDefiningInterface()));
}
}
- }
- for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) {
- if (options.containsKey(jsonOption.getKey())) {
- // Option overwritten since deserialization; don't re-write
- continue;
- }
+ for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) {
+ if (options.containsKey(jsonOption.getKey())) {
+ // Option overwritten since deserialization; don't re-write
+ continue;
+ }
+
+ HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(jsonOption.getKey()));
+ if (specs.isEmpty()) {
+ // No PipelineOptions interface for this key not currently loaded
+ builder.add(DisplayData.item(jsonOption.getKey(), jsonOption.getValue().toString())
+ .withNamespace(UnknownPipelineOptions.class));
+ continue;
+ }
- HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(jsonOption.getKey()));
- if (specs.isEmpty()) {
- builder.add(DisplayData.item(jsonOption.getKey(), jsonOption.getValue().toString())
- .withNamespace(UnknownPipelineOptions.class));
- } else {
for (PipelineOptionSpec spec : specs) {
if (!spec.shouldSerialize()) {
continue;
}
Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod());
- value = value == null ? "" : value;
- DisplayData.Type type = DisplayData.inferType(value);
- if (type != null) {
- builder.add(DisplayData.item(jsonOption.getKey(), type, value)
- .withNamespace(spec.getDefiningInterface()));
- } else {
- builder.add(DisplayData.item(jsonOption.getKey(), displayDataString(value))
- .withNamespace(spec.getDefiningInterface()));
- }
+ DisplayDataValue resolved = DisplayDataValue.resolve(value);
+ builder.add(DisplayData.item(jsonOption.getKey(), resolved.getType(), resolved.getValue())
+ .withNamespace(spec.getDefiningInterface()));
}
}
}
}
/**
- * {@link Object#toString()} wrapper to extract display data values for various types.
+ * Helper class to resolve a {@link DisplayData} type and value from {@link PipelineOptions}.
*/
- private String displayDataString(Object value) {
- checkNotNull(value, "value cannot be null");
- if (!value.getClass().isArray()) {
- return value.toString();
- }
- if (!value.getClass().getComponentType().isPrimitive()) {
- return Arrays.deepToString((Object[]) value);
+ @AutoValue
+ abstract static class DisplayDataValue {
+ /**
+ * The resolved display data value. May differ from the input to {@link #resolve(Object)}
+ */
+ abstract Object getValue();
+
+ /** The resolved display data type. */
+ abstract DisplayData.Type getType();
+
+ /**
+ * Infer the value and {@link DisplayData.Type type} for the given
+ * {@link PipelineOptions} value.
+ */
+ static DisplayDataValue resolve(@Nullable Object value) {
+ DisplayData.Type type = DisplayData.inferType(value);
+
+ if (type == null) {
+ value = displayDataString(value);
+ type = DisplayData.Type.STRING;
+ }
+
+ return new AutoValue_ProxyInvocationHandler_DisplayDataValue(value, type);
}
- // At this point, we have some type of primitive array. Arrays.deepToString(..) requires an
- // Object array, but will unwrap nested primitive arrays.
- String wrapped = Arrays.deepToString(new Object[] {value});
- return wrapped.substring(1, wrapped.length() - 1);
+ /**
+ * Safe {@link Object#toString()} wrapper to extract display data values for various types.
+ */
+ private static String displayDataString(@Nullable Object value) {
+ if (value == null) {
+ return "";
+ }
+ if (!value.getClass().isArray()) {
+ return value.toString();
+ }
+ if (!value.getClass().getComponentType().isPrimitive()) {
+ return Arrays.deepToString((Object[]) value);
+ }
+
+ // At this point, we have some type of primitive array. Arrays.deepToString(..) requires an
+ // Object array, but will unwrap nested primitive arrays.
+ String wrapped = Arrays.deepToString(new Object[]{value});
+ return wrapped.substring(1, wrapped.length() - 1);
+ }
}
/**
@@ -587,7 +614,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
DisplayData displayData = DisplayData.from(value);
- for (DisplayData.Item<?> item : displayData.items()) {
+ for (DisplayData.Item item : displayData.items()) {
@SuppressWarnings("unchecked")
Map<String, Object> serializedItem = MAPPER.convertValue(item, Map.class);
serializedDisplayData.add(serializedItem);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index df9a306..7719c73 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -124,14 +124,14 @@ public class Combine {
return globally(fn, displayDataForFn(fn));
}
- private static <T> DisplayData.Item<? extends Class<?>> displayDataForFn(T fn) {
+ private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) {
return DisplayData.item("combineFn", fn.getClass())
.withLabel("Combiner");
}
private static <InputT, OutputT> Globally<InputT, OutputT> globally(
GlobalCombineFn<? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new Globally<>(fn, fnDisplayData, true, 0);
}
@@ -200,7 +200,7 @@ public class Combine {
private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/);
}
@@ -210,7 +210,7 @@ public class Combine {
*/
private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new PerKey<>(fn, fnDisplayData, true /*fewKeys*/);
}
@@ -294,7 +294,7 @@ public class Combine {
private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new GroupedValues<>(fn, fnDisplayData);
}
@@ -521,7 +521,7 @@ public class Combine {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.include(CombineFn.this);
+ builder.delegate(CombineFn.this);
}
};
}
@@ -1258,7 +1258,7 @@ public class Combine {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.include(KeyedCombineFn.this);
+ builder.delegate(KeyedCombineFn.this);
}
};
}
@@ -1325,13 +1325,13 @@ public class Combine {
extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
- private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+ private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final boolean insertDefault;
private final int fanout;
private final List<PCollectionView<?>> sideInputs;
private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
this.fn = fn;
this.fnDisplayData = fnDisplayData;
this.insertDefault = insertDefault;
@@ -1340,7 +1340,7 @@ public class Combine {
}
private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
super(name);
this.fn = fn;
this.fnDisplayData = fnDisplayData;
@@ -1350,7 +1350,7 @@ public class Combine {
}
private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout,
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout,
List<PCollectionView<?>> sideInputs) {
super(name);
this.fn = fn;
@@ -1498,9 +1498,9 @@ public class Combine {
private static void populateDisplayData(
DisplayData.Builder builder, HasDisplayData fn,
- DisplayData.Item<? extends Class<?>> fnDisplayItem) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayItem) {
builder
- .include(fn)
+ .include("combineFn", fn)
.add(fnDisplayItem);
}
@@ -1556,13 +1556,13 @@ public class Combine {
extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
- private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+ private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final boolean insertDefault;
private final int fanout;
private GloballyAsSingletonView(
GlobalCombineFn<? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
this.fn = fn;
this.fnDisplayData = fnDisplayData;
this.insertDefault = insertDefault;
@@ -1762,13 +1762,13 @@ public class Combine {
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
- private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+ private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final boolean fewKeys;
private final List<PCollectionView<?>> sideInputs;
private PerKey(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData, boolean fewKeys) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
this.fn = fn;
this.fnDisplayData = fnDisplayData;
this.fewKeys = fewKeys;
@@ -1777,7 +1777,7 @@ public class Combine {
private PerKey(String name,
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData,
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
boolean fewKeys, List<PCollectionView<?>> sideInputs) {
super(name);
this.fn = fn;
@@ -1788,7 +1788,7 @@ public class Combine {
private PerKey(
String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData, boolean fewKeys) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
super(name);
this.fn = fn;
this.fnDisplayData = fnDisplayData;
@@ -1888,12 +1888,12 @@ public class Combine {
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
- private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+ private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final SerializableFunction<? super K, Integer> hotKeyFanout;
private PerKeyWithHotKeyFanout(String name,
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData,
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
SerializableFunction<? super K, Integer> hotKeyFanout) {
super(name);
this.fn = fn;
@@ -1976,7 +1976,7 @@ public class Combine {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.include(PerKeyWithHotKeyFanout.this);
+ builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
postCombine =
@@ -2024,7 +2024,7 @@ public class Combine {
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.include(PerKeyWithHotKeyFanout.this);
+ builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
} else {
@@ -2068,7 +2068,7 @@ public class Combine {
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.include(PerKeyWithHotKeyFanout.this);
+ builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
postCombine =
@@ -2117,7 +2117,7 @@ public class Combine {
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.include(PerKeyWithHotKeyFanout.this);
+ builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
}
@@ -2202,7 +2202,7 @@ public class Combine {
Combine.populateDisplayData(builder, fn, fnDisplayData);
if (hotKeyFanout instanceof HasDisplayData) {
- builder.include((HasDisplayData) hotKeyFanout);
+ builder.include("hotKeyFanout", (HasDisplayData) hotKeyFanout);
}
builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
.withLabel("Fanout Function"));
@@ -2349,12 +2349,12 @@ public class Combine {
PCollection<KV<K, OutputT>>> {
private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
- private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+ private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final List<PCollectionView<?>> sideInputs;
private GroupedValues(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
this.fn = SerializableUtils.clone(fn);
this.fnDisplayData = fnDisplayData;
this.sideInputs = ImmutableList.of();
@@ -2362,7 +2362,7 @@ public class Combine {
private GroupedValues(
PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
- DisplayData.Item<? extends Class<?>> fnDisplayData,
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
List<PCollectionView<?>> sideInputs) {
this.fn = SerializableUtils.clone(fn);
this.fnDisplayData = fnDisplayData;
@@ -2402,7 +2402,7 @@ public class Combine {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- Combine.GroupedValues.this.populateDisplayData(builder);
+ builder.delegate(Combine.GroupedValues.this);
}
}).withSideInputs(sideInputs));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index 229b1d2..1b3e525 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -21,18 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -1044,35 +1040,12 @@ public class CombineFns {
*/
private static void populateDisplayData(
DisplayData.Builder builder, List<? extends HasDisplayData> combineFns) {
-
- // NB: ArrayListMultimap necessary to maintain ordering of combineFns of the same type.
- Multimap<Class<?>, HasDisplayData> combineFnMap = ArrayListMultimap.create();
-
for (int i = 0; i < combineFns.size(); i++) {
HasDisplayData combineFn = combineFns.get(i);
- builder.add(DisplayData.item("combineFn" + (i + 1), combineFn.getClass())
+ String token = "combineFn" + (i + 1);
+ builder.add(DisplayData.item(token, combineFn.getClass())
.withLabel("Combine Function"));
- combineFnMap.put(combineFn.getClass(), combineFn);
- }
-
- for (Map.Entry<Class<?>, Collection<HasDisplayData>> combineFnEntries :
- combineFnMap.asMap().entrySet()) {
-
- Collection<HasDisplayData> classCombineFns = combineFnEntries.getValue();
- if (classCombineFns.size() == 1) {
- // Only one combineFn of this type, include it directly.
- builder.include(Iterables.getOnlyElement(classCombineFns));
-
- } else {
- // Multiple combineFns of same type, add a namespace suffix so display data is
- // unique and ordered.
- String baseNamespace = combineFnEntries.getKey().getName();
- for (int i = 0; i < combineFns.size(); i++) {
- HasDisplayData combineFn = combineFns.get(i);
- String namespace = String.format("%s#%d", baseNamespace, i + 1);
- builder.include(combineFn, namespace);
- }
- }
+ builder.include(token, combineFn);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
index 3dd4fe2..7ac952c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
@@ -171,8 +171,7 @@ public class CombineWithContext {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- CombineFnWithContext.this.populateDisplayData(builder);
+ builder.delegate(CombineFnWithContext.this);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 12d4824..18d9333 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -246,7 +246,7 @@ public class DoFnAdapters {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.include(fn);
+ builder.delegate(fn);
}
private void readObject(java.io.ObjectInputStream in)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index b590d45..4ef809f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -119,7 +119,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
//////////////////////////////////////////////////////////////////////////////////////////////////
private final SimpleFunction<InputT, ? extends Iterable<OutputT>> fn;
- private final DisplayData.Item<?> fnClassDisplayData;
+ private final DisplayData.ItemSpec<?> fnClassDisplayData;
private FlatMapElements(
SimpleFunction<InputT, ? extends Iterable<OutputT>> fn,
@@ -166,7 +166,9 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.include(fn).add(fnClassDisplayData);
+ builder
+ .include("flatMapFn", fn)
+ .add(fnClassDisplayData);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 73e4359..c109034 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -103,7 +103,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
///////////////////////////////////////////////////////////////////
private final SimpleFunction<InputT, OutputT> fn;
- private final DisplayData.Item<?> fnClassDisplayData;
+ private final DisplayData.ItemSpec<?> fnClassDisplayData;
private MapElements(SimpleFunction<InputT, OutputT> fn, Class<?> fnClass) {
this.fn = fn;
@@ -123,7 +123,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- MapElements.this.populateDisplayData(builder);
+ builder.delegate(MapElements.this);
}
@Override
@@ -141,6 +141,8 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.include(fn).add(fnClassDisplayData);
+ builder
+ .include("mapFn", fn)
+ .add(fnClassDisplayData);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 8aa87e4..93eb1ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -521,7 +521,7 @@ public class ParDo {
*/
public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
validate(fn);
- return of(adapt(fn), fn.getClass());
+ return of(adapt(fn), displayDataForFn(fn));
}
/**
@@ -538,12 +538,17 @@ public class ParDo {
*/
@Deprecated
public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
- return of(fn, fn.getClass());
+ return of(fn, displayDataForFn(fn));
}
private static <InputT, OutputT> Bound<InputT, OutputT> of(
- OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
- return new Unbound().of(fn, fnClass);
+ OldDoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+ return new Unbound().of(fn, fnDisplayData);
+ }
+
+ private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) {
+ return DisplayData.item("fn", fn.getClass())
+ .withLabel("Transform Function");
}
/**
@@ -666,7 +671,7 @@ public class ParDo {
*/
public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
validate(fn);
- return of(adapt(fn), fn.getClass());
+ return of(adapt(fn), displayDataForFn(fn));
}
/**
@@ -681,12 +686,12 @@ public class ParDo {
*/
@Deprecated
public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
- return of(fn, fn.getClass());
+ return of(fn, displayDataForFn(fn));
}
private <InputT, OutputT> Bound<InputT, OutputT> of(
- OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
- return new Bound<>(name, sideInputs, fn, fnClass);
+ OldDoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+ return new Bound<>(name, sideInputs, fn, fnDisplayData);
}
}
@@ -707,16 +712,16 @@ public class ParDo {
// Inherits name.
private final List<PCollectionView<?>> sideInputs;
private final OldDoFn<InputT, OutputT> fn;
- private final Class<?> fnClass;
+ private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
Bound(String name,
List<PCollectionView<?>> sideInputs,
OldDoFn<InputT, OutputT> fn,
- Class<?> fnClass) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
super(name);
this.sideInputs = sideInputs;
this.fn = SerializableUtils.clone(fn);
- this.fnClass = fnClass;
+ this.fnDisplayData = fnDisplayData;
}
/**
@@ -744,7 +749,7 @@ public class ParDo {
ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder();
builder.addAll(this.sideInputs);
builder.addAll(sideInputs);
- return new Bound<>(name, builder.build(), fn, fnClass);
+ return new Bound<>(name, builder.build(), fn, fnDisplayData);
}
/**
@@ -758,7 +763,7 @@ public class ParDo {
public BoundMulti<InputT, OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag,
TupleTagList sideOutputTags) {
return new BoundMulti<>(
- name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
+ name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData);
}
@Override
@@ -802,7 +807,7 @@ public class ParDo {
@Override
public void populateDisplayData(Builder builder) {
super.populateDisplayData(builder);
- ParDo.populateDisplayData(builder, fn, fnClass);
+ ParDo.populateDisplayData(builder, fn, fnDisplayData);
}
public OldDoFn<InputT, OutputT> getFn() {
@@ -883,7 +888,7 @@ public class ParDo {
*/
public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
validate(fn);
- return of(adapt(fn), fn.getClass());
+ return of(adapt(fn), displayDataForFn(fn));
}
/**
@@ -898,12 +903,13 @@ public class ParDo {
*/
@Deprecated
public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
- return of(fn, fn.getClass());
+ return of(fn, displayDataForFn(fn));
}
- private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
+ private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn,
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new BoundMulti<>(
- name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
+ name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData);
}
}
@@ -925,20 +931,20 @@ public class ParDo {
private final TupleTag<OutputT> mainOutputTag;
private final TupleTagList sideOutputTags;
private final OldDoFn<InputT, OutputT> fn;
- private final Class<?> fnClass;
+ private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
BoundMulti(String name,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
TupleTagList sideOutputTags,
OldDoFn<InputT, OutputT> fn,
- Class<?> fnClass) {
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
super(name);
this.sideInputs = sideInputs;
this.mainOutputTag = mainOutputTag;
this.sideOutputTags = sideOutputTags;
this.fn = SerializableUtils.clone(fn);
- this.fnClass = fnClass;
+ this.fnDisplayData = fnDisplayData;
}
/**
@@ -969,7 +975,7 @@ public class ParDo {
builder.addAll(sideInputs);
return new BoundMulti<>(
name, builder.build(),
- mainOutputTag, sideOutputTags, fn, fnClass);
+ mainOutputTag, sideOutputTags, fn, fnDisplayData);
}
@@ -1023,7 +1029,7 @@ public class ParDo {
@Override
public void populateDisplayData(Builder builder) {
super.populateDisplayData(builder);
- ParDo.populateDisplayData(builder, fn, fnClass);
+ ParDo.populateDisplayData(builder, fn, fnDisplayData);
}
public OldDoFn<InputT, OutputT> getFn() {
@@ -1044,11 +1050,11 @@ public class ParDo {
}
private static void populateDisplayData(
- DisplayData.Builder builder, OldDoFn<?, ?> fn, Class<?> fnClass) {
+ DisplayData.Builder builder, OldDoFn<?, ?> fn,
+ DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
builder
- .include(fn)
- .add(DisplayData.item("fn", fnClass)
- .withLabel("Transform Function"));
+ .include("fn", fn)
+ .add(fnDisplayData);
}
private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 9247942..5b4eead 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -124,7 +124,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.include(partitionDoFn);
+ builder.include("partitionFn", partitionDoFn);
}
private final transient PartitionDoFn<T> partitionDoFn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 394666b..5ab6342 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -20,15 +20,19 @@ package org.apache.beam.sdk.transforms.display;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import autovalue.shaded.com.google.common.common.base.Joiner;
import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -48,12 +52,12 @@ import org.joda.time.format.ISODateTimeFormat;
* interface.
*/
public class DisplayData implements Serializable {
- private static final DisplayData EMPTY = new DisplayData(Maps.<Identifier, Item<?>>newHashMap());
+ private static final DisplayData EMPTY = new DisplayData(Maps.<Identifier, Item>newHashMap());
private static final DateTimeFormatter TIMESTAMP_FORMATTER = ISODateTimeFormat.dateTime();
- private final ImmutableMap<Identifier, Item<?>> entries;
+ private final ImmutableMap<Identifier, Item> entries;
- private DisplayData(Map<Identifier, Item<?>> entries) {
+ private DisplayData(Map<Identifier, Item> entries) {
this.entries = ImmutableMap.copyOf(entries);
}
@@ -71,7 +75,11 @@ public class DisplayData implements Serializable {
*/
public static DisplayData from(HasDisplayData component) {
checkNotNull(component, "component argument cannot be null");
- return InternalBuilder.forRoot(component).build();
+
+ InternalBuilder builder = new InternalBuilder();
+ builder.include(Path.root(), component);
+
+ return builder.build();
}
/**
@@ -99,11 +107,11 @@ public class DisplayData implements Serializable {
}
@JsonValue
- public Collection<Item<?>> items() {
+ public Collection<Item> items() {
return entries.values();
}
- public Map<Identifier, Item<?>> asMap() {
+ public Map<Identifier, Item> asMap() {
return entries;
}
@@ -126,7 +134,7 @@ public class DisplayData implements Serializable {
public String toString() {
StringBuilder builder = new StringBuilder();
boolean isFirstLine = true;
- for (Item<?> entry : entries.values()) {
+ for (Item entry : entries.values()) {
if (isFirstLine) {
isFirstLine = false;
} else {
@@ -149,70 +157,81 @@ public class DisplayData implements Serializable {
*/
public interface Builder {
/**
- * Register display data from the specified subcomponent. For example, a {@link PTransform}
- * which delegates to a user-provided function can implement {@link HasDisplayData} on the
- * function and include it from the {@link PTransform}:
+ * Register display data from the specified subcomponent at the given path. For example, a
+ * {@link PTransform} which delegates to a user-provided function can implement
+ * {@link HasDisplayData} on the function and include it from the {@link PTransform}:
*
* <pre><code>{@literal @Override}
* public void populateDisplayData(DisplayData.Builder builder) {
* super.populateDisplayData(builder);
*
* builder
- * .add(DisplayData.item("userFn", userFn)) // To register the class name of the userFn
- * .include(userFn); // To allow the userFn to register additional display data
+ * // To register the class name of the userFn
+ * .add(DisplayData.item("userFn", userFn.getClass()))
+ * // To allow the userFn to register additional display data
+ * .include("userFn", userFn);
* }
* </code></pre>
*
- * <p>Using {@code include(subcomponent)} will associate each of the registered items with the
- * namespace of the {@code subcomponent} being registered. To register display data in the
- * current namespace, such as from a base class implementation, use
+ * <p>Using {@code include(path, subcomponent)} will associate each of the registered items with
+ * the namespace of the {@code subcomponent} being registered, with the specified path element
+ * relative to the current path. To register display data in the current path and namespace,
+ * such as from a base class implementation, use
* {@code subcomponent.populateDisplayData(builder)} instead.
*
* @see HasDisplayData#populateDisplayData(DisplayData.Builder)
*/
- Builder include(HasDisplayData subComponent);
+ Builder include(String path, HasDisplayData subComponent);
/**
- * Register display data from the specified subcomponent, overriding the namespace of
- * subcomponent display items with the specified namespace.
+ * Register display data from the specified component on behalf of the current component.
+ * Display data items will be added with the subcomponent namespace but the current component
+ * path.
*
- * @see #include(HasDisplayData)
- */
- Builder include(HasDisplayData subComponent, Class<?> namespace);
-
- /**
- * Register display data from the specified subcomponent, overriding the namespace of
- * subcomponent display items with the specified namespace.
+ * <p>This is useful for components which simply wrap other components and wish to retain the
+ * display data from the wrapped component. Such components should implement
+ * {@code populateDisplayData} as:
*
- * @see #include(HasDisplayData)
+ * <pre><code>{@literal @Override}
+ * public void populateDisplayData(DisplayData.Builder builder) {
+ * builder.delegate(wrapped);
+ * }
+ * </code></pre>
*/
- Builder include(HasDisplayData subComponent, String namespace);
+ Builder delegate(HasDisplayData component);
/**
* Register the given display item.
*/
- Builder add(Item<?> item);
+ Builder add(ItemSpec<?> item);
/**
* Register the given display item if the value is not null.
*/
- Builder addIfNotNull(Item<?> item);
+ Builder addIfNotNull(ItemSpec<?> item);
/**
* Register the given display item if the value is different than the specified default.
*/
- <T> Builder addIfNotDefault(Item<T> item, @Nullable T defaultValue);
+ <T> Builder addIfNotDefault(ItemSpec<T> item, @Nullable T defaultValue);
}
/**
- * {@link Item Items} are the unit of display data. Each item is identified by a given key
+ * {@link Item Items} are the unit of display data. Each item is identified by a given path, key,
* and namespace from the component the display item belongs to.
*
* <p>{@link Item Items} are registered via {@link DisplayData.Builder#add}
* within {@link HasDisplayData#populateDisplayData} implementations.
*/
@AutoValue
- public abstract static class Item<T> implements Serializable {
+ public abstract static class Item {
+
+ /**
+ * The path for the display item within a component hierarchy.
+ */
+ @Nullable
+ @JsonIgnore
+ public abstract Path getPath();
/**
* The namespace for the display item. The namespace defaults to the component which
@@ -220,7 +239,7 @@ public class DisplayData implements Serializable {
*/
@Nullable
@JsonGetter("namespace")
- public abstract String getNamespace();
+ public abstract Class<?> getNamespace();
/**
* The key for the display item. Each display item is created with a key and value
@@ -240,11 +259,8 @@ public class DisplayData implements Serializable {
* Retrieve the value of the display item. The value is translated from the input to
* {@link DisplayData#item} into a format suitable for display. Translation is based on the
* item's {@link #getType() type}.
- *
- * <p>The value will only be {@literal null} if the input value during creation was null.
*/
@JsonGetter("value")
- @Nullable
public abstract Object getValue();
/**
@@ -285,27 +301,104 @@ public class DisplayData implements Serializable {
@Nullable
public abstract String getLinkUrl();
- private static <T> Item<T> create(String key, Type type, @Nullable T value) {
- FormattedItemValue formatted = type.safeFormat(value);
- return of(null, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null);
+ private static Item create(ItemSpec<?> spec, Path path) {
+ checkNotNull(spec, "spec cannot be null");
+ checkNotNull(path, "path cannot be null");
+ Class<?> ns = checkNotNull(spec.getNamespace(), "namespace must be set");
+
+ return new AutoValue_DisplayData_Item(path, ns, spec.getKey(), spec.getType(),
+ spec.getValue(), spec.getShortValue(), spec.getLabel(), spec.getLinkUrl());
}
+ @Override
+ public String toString() {
+ return String.format("%s%s:%s=%s", getPath(), getNamespace().getName(), getKey(), getValue());
+ }
+ }
+
+ /**
+ * Specifies an {@link Item} to register as display data. Each item is identified by a given
+ * path, key, and namespace from the component the display item belongs to.
+ *
+ * <p>{@link Item Items} are registered via {@link DisplayData.Builder#add}
+ * within {@link HasDisplayData#populateDisplayData} implementations.
+ */
+ @AutoValue
+ public abstract static class ItemSpec<T> implements Serializable {
+ /**
+ * The namespace for the display item. If unset, defaults to the component which
+ * the display item is registered to.
+ */
+ @Nullable
+ public abstract Class<?> getNamespace();
+
+ /**
+ * The key for the display item. Each display item is created with a key and value
+ * via {@link DisplayData#item}.
+ */
+ public abstract String getKey();
+
+ /**
+ * The {@link DisplayData.Type} of display data. All display data conforms to a predefined set
+ * of allowed types.
+ */
+ public abstract Type getType();
+
+ /**
+ * The value of the display item. The value is translated from the input to
+ * {@link DisplayData#item} into a format suitable for display. Translation is based on the
+ * item's {@link #getType() type}.
+ */
+ @Nullable
+ public abstract Object getValue();
+
/**
- * Set the item {@link Item#getNamespace() namespace} from the given {@link Class}.
+ * The optional short value for an item, or {@code null} if none is provided.
*
- * <p>This method does not alter the current instance, but instead returns a new {@link Item}
- * with the namespace set.
+ * <p>The short value is an alternative display representation for items having a long display
+ * value. For example, the {@link #getValue() value} for {@link Type#JAVA_CLASS} items contains
+ * the full class name with package, while the short value contains just the class name.
+ *
+ * <p>A {@link #getValue() value} will be provided for each display item, and some types may
+ * also provide a short-value. If a short value is provided, display data consumers may
+ * choose to display it instead of or in addition to the {@link #getValue() value}.
*/
- public Item<T> withNamespace(Class<?> namespace) {
- checkNotNull(namespace, "namespace argument cannot be null");
- return withNamespace(namespaceOf(namespace));
+ @Nullable
+ public abstract Object getShortValue();
+
+ /**
+ * The optional label for an item. The label is a human-readable description of what
+ * the metadata represents. UIs may choose to display the label instead of the item key.
+ */
+ @Nullable
+ public abstract String getLabel();
+
+ /**
+ * The optional link URL for an item. The URL points to an address where the reader
+ * can find additional context for the display data.
+ */
+ @Nullable
+ public abstract String getLinkUrl();
+
+ private static <T> ItemSpec<T> create(String key, Type type, @Nullable T value) {
+ return ItemSpec.<T>builder()
+ .setKey(key)
+ .setType(type)
+ .setRawValue(value)
+ .build();
}
- /** @see #withNamespace(Class) */
- public Item<T> withNamespace(String namespace) {
+ /**
+ * Set the item {@link ItemSpec#getNamespace() namespace} from the given {@link Class}.
+ *
+ * <p>This method does not alter the current instance, but instead returns a new
+ * {@link ItemSpec} with the namespace set.
+ */
+ public ItemSpec<T> withNamespace(Class<?> namespace) {
checkNotNull(namespace, "namespace argument cannot be null");
- return of(
- namespace, getKey(), getType(), getValue(), getShortValue(), getLabel(), getLinkUrl());
+ return toBuilder()
+ .setNamespace(namespace)
+ .build();
}
/**
@@ -313,12 +406,13 @@ public class DisplayData implements Serializable {
*
* <p>Specifying a null value will clear the label if it was previously defined.
*
- * <p>This method does not alter the current instance, but instead returns a new {@link Item}
- * with the label set.
+ * <p>This method does not alter the current instance, but instead returns a new
+ * {@link ItemSpec} with the label set.
*/
- public Item<T> withLabel(String label) {
- return of(
- getNamespace(), getKey(), getType(), getValue(), getShortValue(), label, getLinkUrl());
+ public ItemSpec<T> withLabel(@Nullable String label) {
+ return toBuilder()
+ .setLabel(label)
+ .build();
}
/**
@@ -326,11 +420,13 @@ public class DisplayData implements Serializable {
*
* <p>Specifying a null value will clear the link url if it was previously defined.
*
- * <p>This method does not alter the current instance, but instead returns a new {@link Item}
- * with the link url set.
+ * <p>This method does not alter the current instance, but instead returns a new
+ * {@link ItemSpec} with the link url set.
*/
- public Item<T> withLinkUrl(String url) {
- return of(getNamespace(), getKey(), getType(), getValue(), getShortValue(), getLabel(), url);
+ public ItemSpec<T> withLinkUrl(@Nullable String url) {
+ return toBuilder()
+ .setLinkUrl(url)
+ .build();
}
/**
@@ -339,84 +435,166 @@ public class DisplayData implements Serializable {
* <p>This should only be used internally. It is useful to compare the value of a
* {@link DisplayData.Item} to the value derived from a specified input.
*/
- private Item<T> withValue(Object value) {
- FormattedItemValue formatted = getType().safeFormat(value);
- return of(getNamespace(), getKey(), getType(), formatted.getLongValue(),
- formatted.getShortValue(), getLabel(), getLinkUrl());
- }
-
- private static <T> Item<T> of(
- @Nullable String namespace,
- String key,
- Type type,
- @Nullable Object value,
- @Nullable Object shortValue,
- @Nullable String label,
- @Nullable String linkUrl) {
- return new AutoValue_DisplayData_Item<>(
- namespace, key, type, value, shortValue, label, linkUrl);
+ private ItemSpec<T> withValue(T value) {
+ return toBuilder()
+ .setRawValue(value)
+ .build();
}
@Override
public String toString() {
return String.format("%s:%s=%s", getNamespace(), getKey(), getValue());
}
+
+ static <T> ItemSpec.Builder<T> builder() {
+ return new AutoValue_DisplayData_ItemSpec.Builder<>();
+ }
+
+ abstract ItemSpec.Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ public abstract ItemSpec.Builder<T> setKey(String key);
+ public abstract ItemSpec.Builder<T> setNamespace(@Nullable Class<?> namespace);
+ public abstract ItemSpec.Builder<T> setType(Type type);
+ public abstract ItemSpec.Builder<T> setValue(@Nullable Object longValue);
+ public abstract ItemSpec.Builder<T> setShortValue(@Nullable Object shortValue);
+ public abstract ItemSpec.Builder<T> setLabel(@Nullable String label);
+ public abstract ItemSpec.Builder<T> setLinkUrl(@Nullable String url);
+ public abstract ItemSpec<T> build();
+
+
+ abstract Type getType();
+
+ ItemSpec.Builder<T> setRawValue(@Nullable T value) {
+ FormattedItemValue formatted = getType().safeFormat(value);
+ return this
+ .setValue(formatted.getLongValue())
+ .setShortValue(formatted.getShortValue());
+ }
+ }
}
/**
* Unique identifier for a display data item within a component.
- * Identifiers are composed of the key they are registered with and a namespace generated from
- * the class of the component which registered the item.
+ *
+ * <p>Identifiers are composed of:
+ *
+ * <ul>
+ * <li>A {@link #getPath() path} based on the component hierarchy</li>
+ * <li>The {@link #getKey() key} it is registered with</li>
+ * <li>A {@link #getNamespace() namespace} generated from the class of the component which
+ * registered the item.</li>
+ * </ul>
*
* <p>Display data registered with the same key from different components will have different
* namespaces and thus will both be represented in the composed {@link DisplayData}. If a
* single component registers multiple metadata items with the same key, only the most recent
* item will be retained; previous versions are discarded.
*/
- public static class Identifier {
- private final String ns;
- private final String key;
+ @AutoValue
+ public abstract static class Identifier {
+ public abstract Path getPath();
+ public abstract Class<?> getNamespace();
+ public abstract String getKey();
- public static Identifier of(Class<?> namespace, String key) {
- return of(namespaceOf(namespace), key);
+ public static Identifier of(Path path, Class<?> namespace, String key) {
+ return new AutoValue_DisplayData_Identifier(path, namespace, key);
}
- public static Identifier of(String namespace, String key) {
- return new Identifier(namespace, key);
+ @Override
+ public String toString() {
+ return String.format("%s%s:%s", getPath(), getNamespace(), getKey());
}
+ }
- private Identifier(String ns, String key) {
- this.ns = ns;
- this.key = key;
+ /**
+ * Structured path of registered display data within a component hierarchy.
+ *
+ * <p>Display data items registered directly by a component will have the {@link Path#root() root}
+ * path. If the component {@link Builder#include includes} a sub-component, its display data will
+ * be registered at the path specified. Each sub-component path is created by appending a child
+ * element to the path of its parent component, forming a hierarchy.
+ */
+ public static class Path {
+ private final ImmutableList<String> components;
+ private Path(ImmutableList<String> components) {
+ this.components = components;
}
- public String getNamespace() {
- return ns;
+ /**
+ * Path for display data registered by a top-level component.
+ */
+ public static Path root() {
+ return new Path(ImmutableList.<String>of());
}
- public String getKey() {
- return key;
+ /**
+ * Construct a path from an absolute component path hierarchy.
+ *
+ * <p>For the root path, use {@link Path#root()}.
+ *
+ * @param firstPath Path of the first sub-component.
+ * @param paths Additional path components.
+ */
+ public static Path absolute(String firstPath, String... paths) {
+ ImmutableList.Builder<String> builder = ImmutableList.builder();
+
+ validatePathElement(firstPath);
+ builder.add(firstPath);
+ for (String path : paths) {
+ validatePathElement(path);
+ builder.add(path);
+ }
+
+ return new Path(builder.build());
}
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof Identifier) {
- Identifier that = (Identifier) obj;
- return Objects.equals(this.ns, that.ns)
- && Objects.equals(this.key, that.key);
- }
+ /**
+ * Hierarchy list of component paths making up the full path, starting with the top-level child
+ * component path. For the {@link #root root} path, returns the empty list.
+ */
+ public List<String> getComponents() {
+ return components;
+ }
- return false;
+ /**
+ * Extend the path by appending a sub-component path. The new path element is added to the end
+ * of the path hierarchy.
+ *
+ * <p>Returns a new {@link Path} instance; the originating {@link Path} is not modified.
+ */
+ public Path extend(String path) {
+ validatePathElement(path);
+ return new Path(ImmutableList.<String>builder()
+ .addAll(components.iterator())
+ .add(path)
+ .build());
}
- @Override
- public int hashCode() {
- return Objects.hash(ns, key);
+ private static void validatePathElement(String path) {
+ checkNotNull(path);
+ checkArgument(!"".equals(path), "path cannot be empty");
}
@Override
public String toString() {
- return String.format("%s:%s", ns, key);
+ StringBuilder b = new StringBuilder().append("[");
+ Joiner.on("/").appendTo(b, components);
+ b.append("]");
+ return b.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof Path
+ && Objects.equals(components, ((Path) obj).components);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return components.hashCode();
}
}
@@ -551,65 +729,79 @@ public class DisplayData implements Serializable {
Object getLongValue() {
return this.longValue;
}
-
Object getShortValue() {
return this.shortValue;
}
}
private static class InternalBuilder implements Builder {
- private final Map<Identifier, Item<?>> entries;
- private final Set<Object> visited;
+ private final Map<Identifier, Item> entries;
+ private final Set<HasDisplayData> visitedComponents;
+ private final Map<Path, HasDisplayData> visitedPathMap;
- private String latestNs;
+ private Path latestPath;
+ private Class<?> latestNs;
private InternalBuilder() {
this.entries = Maps.newHashMap();
- this.visited = Sets.newIdentityHashSet();
- }
-
- private static InternalBuilder forRoot(HasDisplayData instance) {
- InternalBuilder builder = new InternalBuilder();
- builder.include(instance);
- return builder;
+ this.visitedComponents = Sets.newIdentityHashSet();
+ this.visitedPathMap = Maps.newHashMap();
}
@Override
- public Builder include(HasDisplayData subComponent) {
+ public Builder include(String path, HasDisplayData subComponent) {
checkNotNull(subComponent, "subComponent argument cannot be null");
- return include(subComponent, subComponent.getClass());
+ checkNotNull(path, "path argument cannot be null");
+
+ Path absolutePath = latestPath.extend(path);
+
+ HasDisplayData existingComponent = visitedPathMap.get(absolutePath);
+ if (existingComponent != null) {
+ throw new IllegalArgumentException(String.format("Specified path '%s' already used for "
+ + "subcomponent %s. Subcomponents must be included using unique paths.",
+ path, existingComponent));
+ }
+
+ return include(absolutePath, subComponent);
}
@Override
- public Builder include(HasDisplayData subComponent, Class<?> namespace) {
- checkNotNull(namespace, "Input namespace override cannot be null");
- return include(subComponent, namespaceOf(namespace));
+ public Builder delegate(HasDisplayData component) {
+ checkNotNull(component);
+
+ return include(latestPath, component);
}
- @Override
- public Builder include(HasDisplayData subComponent, String namespace) {
- checkNotNull(subComponent, "subComponent argument cannot be null");
- checkNotNull(namespace, "Input namespace override cannot be null");
-
- boolean newComponent = visited.add(subComponent);
- if (newComponent) {
- String prevNs = this.latestNs;
- this.latestNs = namespace;
-
- try {
- subComponent.populateDisplayData(this);
- } catch (PopulateDisplayDataException e) {
- // Don't re-wrap exceptions recursively.
- throw e;
- } catch (Throwable e) {
- String msg = String.format("Error while populating display data for component: %s",
- namespace);
- throw new PopulateDisplayDataException(msg, e);
- }
+ private Builder include(Path path, HasDisplayData subComponent) {
+ if (visitedComponents.contains(subComponent)) {
+ // Component previously registered; ignore in order to break cyclic dependencies
+ return this;
+ }
- this.latestNs = prevNs;
+ // New component; add it.
+ visitedComponents.add(subComponent);
+ visitedPathMap.put(path, subComponent);
+ Class<?> namespace = subComponent.getClass();
+
+ Path prevPath = latestPath;
+ Class<?> prevNs = latestNs;
+ latestPath = path;
+ latestNs = namespace;
+
+ try {
+ subComponent.populateDisplayData(this);
+ } catch (PopulateDisplayDataException e) {
+ // Don't re-wrap exceptions recursively.
+ throw e;
+ } catch (Throwable e) {
+ String msg = String.format("Error while populating display data for component: %s",
+ namespace.getName());
+ throw new PopulateDisplayDataException(msg, e);
}
+ latestPath = prevPath;
+ latestNs = prevNs;
+
return this;
}
@@ -623,39 +815,41 @@ public class DisplayData implements Serializable {
}
@Override
- public Builder add(Item<?> item) {
+ public Builder add(ItemSpec<?> item) {
checkNotNull(item, "Input display item cannot be null");
return addItemIf(true, item);
}
@Override
- public Builder addIfNotNull(Item<?> item) {
+ public Builder addIfNotNull(ItemSpec<?> item) {
checkNotNull(item, "Input display item cannot be null");
return addItemIf(item.getValue() != null, item);
}
@Override
- public <T> Builder addIfNotDefault(Item<T> item, @Nullable T defaultValue) {
+ public <T> Builder addIfNotDefault(ItemSpec<T> item, @Nullable T defaultValue) {
checkNotNull(item, "Input display item cannot be null");
- Item<T> defaultItem = item.withValue(defaultValue);
+ ItemSpec<T> defaultItem = item.withValue(defaultValue);
return addItemIf(!Objects.equals(item, defaultItem), item);
}
- private Builder addItemIf(boolean condition, Item<?> item) {
+ private Builder addItemIf(boolean condition, ItemSpec<?> spec) {
if (!condition) {
return this;
}
- checkNotNull(item, "Input display item cannot be null");
- checkNotNull(item.getValue(), "Input display value cannot be null");
- if (item.getNamespace() == null) {
- item = item.withNamespace(latestNs);
+ checkNotNull(spec, "Input display item cannot be null");
+ checkNotNull(spec.getValue(), "Input display value cannot be null");
+
+ if (spec.getNamespace() == null) {
+ spec = spec.withNamespace(latestNs);
}
+ Item item = Item.create(spec, latestPath);
- Identifier id = Identifier.of(item.getNamespace(), item.getKey());
+ Identifier id = Identifier.of(item.getPath(), item.getNamespace(), item.getKey());
checkArgument(!entries.containsKey(id),
- "Display data key (%s) is not unique within the specified namespace (%s).",
- item.getKey(), item.getNamespace());
+ "Display data key (%s) is not unique within the specified path and namespace: %s%s.",
+ item.getKey(), item.getPath(), item.getNamespace());
entries.put(id, item);
return this;
@@ -669,63 +863,63 @@ public class DisplayData implements Serializable {
/**
* Create a display item for the specified key and string value.
*/
- public static Item<String> item(String key, @Nullable String value) {
+ public static ItemSpec<String> item(String key, @Nullable String value) {
return item(key, Type.STRING, value);
}
/**
* Create a display item for the specified key and integer value.
*/
- public static Item<Integer> item(String key, @Nullable Integer value) {
+ public static ItemSpec<Integer> item(String key, @Nullable Integer value) {
return item(key, Type.INTEGER, value);
}
/**
* Create a display item for the specified key and integer value.
*/
- public static Item<Long> item(String key, @Nullable Long value) {
+ public static ItemSpec<Long> item(String key, @Nullable Long value) {
return item(key, Type.INTEGER, value);
}
/**
* Create a display item for the specified key and floating point value.
*/
- public static Item<Float> item(String key, @Nullable Float value) {
+ public static ItemSpec<Float> item(String key, @Nullable Float value) {
return item(key, Type.FLOAT, value);
}
/**
* Create a display item for the specified key and floating point value.
*/
- public static Item<Double> item(String key, @Nullable Double value) {
+ public static ItemSpec<Double> item(String key, @Nullable Double value) {
return item(key, Type.FLOAT, value);
}
/**
* Create a display item for the specified key and boolean value.
*/
- public static Item<Boolean> item(String key, @Nullable Boolean value) {
+ public static ItemSpec<Boolean> item(String key, @Nullable Boolean value) {
return item(key, Type.BOOLEAN, value);
}
/**
* Create a display item for the specified key and timestamp value.
*/
- public static Item<Instant> item(String key, @Nullable Instant value) {
+ public static ItemSpec<Instant> item(String key, @Nullable Instant value) {
return item(key, Type.TIMESTAMP, value);
}
/**
* Create a display item for the specified key and duration value.
*/
- public static Item<Duration> item(String key, @Nullable Duration value) {
+ public static ItemSpec<Duration> item(String key, @Nullable Duration value) {
return item(key, Type.DURATION, value);
}
/**
* Create a display item for the specified key and class value.
*/
- public static <T> Item<Class<T>> item(String key, @Nullable Class<T> value) {
+ public static <T> ItemSpec<Class<T>> item(String key, @Nullable Class<T> value) {
return item(key, Type.JAVA_CLASS, value);
}
@@ -739,10 +933,10 @@ public class DisplayData implements Serializable {
*
* @see Type#inferType(Object)
*/
- public static <T> Item<T> item(String key, Type type, @Nullable T value) {
+ public static <T> ItemSpec<T> item(String key, Type type, @Nullable T value) {
checkNotNull(key, "key argument cannot be null");
checkNotNull(type, "type argument cannot be null");
- return Item.create(key, type, value);
+ return ItemSpec.create(key, type, value);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 57f7716..684a776 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -578,7 +578,7 @@ public class Window {
builder
.add(DisplayData.item("windowFn", windowFn.getClass())
.withLabel("Windowing Function"))
- .include(windowFn);
+ .include("windowFn", windowFn);
}
if (allowedLateness != null) {