You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/23 00:42:19 UTC
[1/2] incubator-beam git commit: Publish DisplayData for
PipelineOptions.
Repository: incubator-beam
Updated Branches:
refs/heads/master 692f3a136 -> f3e6c5351
Publish DisplayData for PipelineOptions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c16bdd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c16bdd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c16bdd6
Branch: refs/heads/master
Commit: 2c16bdd68c1196c4a394c240f232150476f70b0e
Parents: 692f3a1
Author: Scott Wegner <sw...@google.com>
Authored: Tue Apr 5 11:49:26 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 22 15:41:34 2016 -0700
----------------------------------------------------------------------
.../runners/DataflowPipelineTranslatorTest.java | 5 +-
.../beam/sdk/options/PipelineOptionSpec.java | 89 ++++++
.../beam/sdk/options/PipelineOptions.java | 4 +-
.../sdk/options/PipelineOptionsFactory.java | 128 +++++----
.../sdk/options/PipelineOptionsReflector.java | 112 ++++++++
.../sdk/options/ProxyInvocationHandler.java | 224 ++++++++++++++--
.../sdk/transforms/display/DisplayData.java | 26 +-
.../options/PipelineOptionsReflectorTest.java | 196 ++++++++++++++
.../sdk/options/ProxyInvocationHandlerTest.java | 268 ++++++++++++++++++-
.../sdk/transforms/display/DisplayDataTest.java | 25 ++
10 files changed, 979 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
index a62f550..27c0acc 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -204,8 +204,9 @@ public class DataflowPipelineTranslatorTest implements Serializable {
settings.put("numberOfWorkerHarnessThreads", 0);
settings.put("experiments", null);
- assertEquals(ImmutableMap.of("options", settings),
- job.getEnvironment().getSdkPipelineOptions());
+ Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
+ assertThat(sdkPipelineOptions, hasKey("options"));
+ assertEquals(settings, sdkPipelineOptions.get("options"));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java
new file mode 100644
index 0000000..71f9d46
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+
+import java.lang.reflect.Method;
+
+/**
+ * For internal use. Specification for an option defined in a {@link PipelineOptions} interface.
+ */
+class PipelineOptionSpec {
+ private final Class<? extends PipelineOptions> clazz;
+ private final String name;
+ private final Method getter;
+
+ static PipelineOptionSpec of(Class<? extends PipelineOptions> clazz, String name, Method getter) {
+ return new PipelineOptionSpec(clazz, name, getter);
+ }
+
+ private PipelineOptionSpec(Class<? extends PipelineOptions> clazz, String name, Method getter) {
+ this.clazz = clazz;
+ this.name = name;
+ this.getter = getter;
+ }
+
+ /**
+ * The {@link PipelineOptions} interface which defines this {@link PipelineOptionSpec}.
+ */
+ Class<? extends PipelineOptions> getDefiningInterface() {
+ return clazz;
+ }
+
+ /**
+ * Name of the property.
+ */
+ String getName() {
+ return name;
+ }
+
+ /**
+ * The getter method for this property.
+ */
+ Method getGetterMethod() {
+ return getter;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("definingInterface", getDefiningInterface())
+ .add("name", getName())
+ .add("getterMethod", getGetterMethod())
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(getDefiningInterface(), getName(), getGetterMethod());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof PipelineOptionSpec)) {
+ return false;
+ }
+
+ PipelineOptionSpec that = (PipelineOptionSpec) obj;
+ return Objects.equal(this.getDefiningInterface(), that.getDefiningInterface())
+ && Objects.equal(this.getName(), that.getName())
+ && Objects.equal(this.getGetterMethod(), that.getGetterMethod());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 17cf5b3..a2f38ed 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Context;
-
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
import com.google.auto.service.AutoService;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -194,7 +194,7 @@ import javax.annotation.concurrent.ThreadSafe;
@JsonSerialize(using = Serializer.class)
@JsonDeserialize(using = Deserializer.class)
@ThreadSafe
-public interface PipelineOptions {
+public interface PipelineOptions extends HasDisplayData {
/**
* Transforms this object into an object of type {@code <T>} saving each property
* that has been manipulated. {@code <T>} must extend {@link PipelineOptions}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index 87ac05e..5fc7312 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -22,9 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.StringUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@@ -32,7 +33,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
-import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableListMultimap;
@@ -43,8 +43,11 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.RowSortedTable;
import com.google.common.collect.Sets;
import com.google.common.collect.SortedSetMultimap;
+import com.google.common.collect.TreeBasedTable;
import com.google.common.collect.TreeMultimap;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -77,6 +80,7 @@ import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import javax.annotation.Nullable;
@@ -444,6 +448,7 @@ public class PipelineOptionsFactory {
@SuppressWarnings("rawtypes")
private static final Class<?>[] EMPTY_CLASS_ARRAY = new Class[0];
private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ClassLoader CLASS_LOADER;
private static final Map<String, Class<? extends PipelineRunner<?>>> SUPPORTED_PIPELINE_RUNNERS;
/** Classes that are used as the boundary in the stack trace to find the callers class name. */
@@ -510,7 +515,7 @@ public class PipelineOptionsFactory {
throw new ExceptionInInitializerError(e);
}
- ClassLoader classLoader = findClassLoader();
+ CLASS_LOADER = findClassLoader();
// Store the list of all available pipeline runners.
ImmutableMap.Builder<String, Class<? extends PipelineRunner<?>>> builder =
@@ -518,25 +523,14 @@ public class PipelineOptionsFactory {
Set<PipelineRunnerRegistrar> pipelineRunnerRegistrars =
Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
pipelineRunnerRegistrars.addAll(
- Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, classLoader)));
+ Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, CLASS_LOADER)));
for (PipelineRunnerRegistrar registrar : pipelineRunnerRegistrars) {
for (Class<? extends PipelineRunner<?>> klass : registrar.getPipelineRunners()) {
builder.put(klass.getSimpleName(), klass);
}
}
SUPPORTED_PIPELINE_RUNNERS = builder.build();
-
- // Load and register the list of all classes that extend PipelineOptions.
- register(PipelineOptions.class);
- Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
- Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
- pipelineOptionsRegistrars.addAll(
- Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, classLoader)));
- for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
- for (Class<? extends PipelineOptions> klass : registrar.getPipelineOptions()) {
- register(klass);
- }
- }
+ initializeRegistry();
}
/**
@@ -566,6 +560,33 @@ public class PipelineOptionsFactory {
}
/**
+ * Resets the set of interfaces registered with this factory to the default state.
+ *
+ * @see PipelineOptionsFactory#register(Class)
+ */
+ @VisibleForTesting
+ static synchronized void resetRegistry() {
+ REGISTERED_OPTIONS.clear();
+ initializeRegistry();
+ }
+
+ /**
+ * Load and register the list of all classes that extend PipelineOptions.
+ */
+ private static void initializeRegistry() {
+ register(PipelineOptions.class);
+ Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
+ Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+ pipelineOptionsRegistrars.addAll(
+ Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, CLASS_LOADER)));
+ for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
+ for (Class<? extends PipelineOptions> klass : registrar.getPipelineOptions()) {
+ register(klass);
+ }
+ }
+ }
+
+ /**
* Validates that the interface conforms to the following:
* <ul>
* <li>Any property with the same name must have the same return type for all derived
@@ -674,32 +695,20 @@ public class PipelineOptionsFactory {
Preconditions.checkNotNull(iface);
validateWellFormed(iface, REGISTERED_OPTIONS);
- Iterable<Method> methods =
- Iterables.filter(
- ReflectHelpers.getClosureOfMethodsOnInterface(iface), NOT_SYNTHETIC_PREDICATE);
- ListMultimap<Class<?>, Method> ifaceToMethods = ArrayListMultimap.create();
- for (Method method : methods) {
- // Process only methods that are not marked as hidden.
- if (method.getAnnotation(Hidden.class) == null) {
- ifaceToMethods.put(method.getDeclaringClass(), method);
- }
+ Set<PipelineOptionSpec> properties =
+ PipelineOptionsReflector.getOptionSpecs(iface);
+
+ RowSortedTable<Class<?>, String, Method> ifacePropGetterTable = TreeBasedTable.create(
+ ClassNameComparator.INSTANCE, Ordering.natural());
+ for (PipelineOptionSpec prop : properties) {
+ ifacePropGetterTable.put(prop.getDefiningInterface(), prop.getName(), prop.getGetterMethod());
}
- SortedSet<Class<?>> ifaces = new TreeSet<>(ClassNameComparator.INSTANCE);
- // Keep interfaces that are not marked as hidden.
- ifaces.addAll(Collections2.filter(ifaceToMethods.keySet(), new Predicate<Class<?>>() {
- @Override
- public boolean apply(Class<?> input) {
- return input.getAnnotation(Hidden.class) == null;
- }
- }));
- for (Class<?> currentIface : ifaces) {
- Map<String, Method> propertyNamesToGetters =
- getPropertyNamesToGetters(ifaceToMethods.get(currentIface));
- // Don't output anything if there are no defined options
- if (propertyNamesToGetters.isEmpty()) {
- continue;
- }
+ for (Map.Entry<Class<?>, Map<String, Method>> ifaceToPropertyMap :
+ ifacePropGetterTable.rowMap().entrySet()) {
+ Class<?> currentIface = ifaceToPropertyMap.getKey();
+ Map<String, Method> propertyNamesToGetters = ifaceToPropertyMap.getValue();
+
SortedSetMultimap<String, String> requiredGroupNameToProperties =
getRequiredGroupNamesToProperties(propertyNamesToGetters);
@@ -838,15 +847,21 @@ public class PipelineOptionsFactory {
* <p>TODO: Swap back to using Introspector once the proxy class issue with AppEngine is
* resolved.
*/
- private static List<PropertyDescriptor> getPropertyDescriptors(Class<?> beanClass)
+ private static List<PropertyDescriptor> getPropertyDescriptors(
+ Class<? extends PipelineOptions> beanClass)
throws IntrospectionException {
// The sorting is important to make this method stable.
SortedSet<Method> methods = Sets.newTreeSet(MethodComparator.INSTANCE);
methods.addAll(
Collections2.filter(Arrays.asList(beanClass.getMethods()), NOT_SYNTHETIC_PREDICATE));
- SortedMap<String, Method> propertyNamesToGetters = getPropertyNamesToGetters(methods);
- List<PropertyDescriptor> descriptors = Lists.newArrayList();
+ SortedMap<String, Method> propertyNamesToGetters = new TreeMap<>();
+ for (Map.Entry<String, Method> entry :
+ PipelineOptionsReflector.getPropertyNamesToGetters(methods).entries()) {
+ propertyNamesToGetters.put(entry.getKey(), entry.getValue());
+ }
+
+ List<PropertyDescriptor> descriptors = Lists.newArrayList();
List<TypeMismatch> mismatches = new ArrayList<>();
/*
* Add all the getter/setter pairs to the list of descriptors removing the getter once
@@ -919,28 +934,6 @@ public class PipelineOptionsFactory {
}
/**
- * Returns a map of the property name to the getter method it represents.
- * If there are duplicate methods with the same bean name, then it is indeterminate
- * as to which method will be returned.
- */
- private static SortedMap<String, Method> getPropertyNamesToGetters(Iterable<Method> methods) {
- SortedMap<String, Method> propertyNamesToGetters = Maps.newTreeMap();
- for (Method method : methods) {
- String methodName = method.getName();
- if ((!methodName.startsWith("get")
- && !methodName.startsWith("is"))
- || method.getParameterTypes().length != 0
- || method.getReturnType() == void.class) {
- continue;
- }
- String propertyName = Introspector.decapitalize(
- methodName.startsWith("is") ? methodName.substring(2) : methodName.substring(3));
- propertyNamesToGetters.put(propertyName, method);
- }
- return propertyNamesToGetters;
- }
-
- /**
* Returns a map of required groups of arguments to the properties that satisfy the requirement.
*/
private static SortedSetMultimap<String, String> getRequiredGroupNamesToProperties(
@@ -981,21 +974,22 @@ public class PipelineOptionsFactory {
*/
private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOptions> iface,
Set<Class<? extends PipelineOptions>> validatedPipelineOptionsInterfaces,
- Class<?> klass) throws IntrospectionException {
+ Class<? extends PipelineOptions> klass) throws IntrospectionException {
Set<Method> methods = Sets.newHashSet(IGNORED_METHODS);
- // Ignore static methods, "equals", "hashCode", "toString" and "as" on the generated class.
// Ignore synthetic methods
for (Method method : klass.getMethods()) {
if (Modifier.isStatic(method.getModifiers()) || method.isSynthetic()) {
methods.add(method);
}
}
+ // Ignore standard infrastructure methods on the generated class.
try {
methods.add(klass.getMethod("equals", Object.class));
methods.add(klass.getMethod("hashCode"));
methods.add(klass.getMethod("toString"));
methods.add(klass.getMethod("as", Class.class));
methods.add(klass.getMethod("cloneAs", Class.class));
+ methods.add(klass.getMethod("populateDisplayData", DisplayData.Builder.class));
} catch (NoSuchMethodException | SecurityException e) {
throw Throwables.propagate(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
new file mode 100644
index 0000000..815de82
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+import java.beans.Introspector;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Utilities to reflect over {@link PipelineOptions}.
+ */
+class PipelineOptionsReflector {
+ private PipelineOptionsReflector() {}
+
+ /**
+ * Retrieve metadata for the full set of pipeline options visible within the type hierarchy
+ * of a single {@link PipelineOptions} interface.
+ *
+ * @see PipelineOptionsReflector#getOptionSpecs(Iterable)
+ */
+ static Set<PipelineOptionSpec> getOptionSpecs(Class<? extends PipelineOptions> optionsInterface) {
+ Iterable<Method> methods = ReflectHelpers.getClosureOfMethodsOnInterface(optionsInterface);
+ Multimap<String, Method> propsToGetters = getPropertyNamesToGetters(methods);
+
+ ImmutableSet.Builder<PipelineOptionSpec> setBuilder = ImmutableSet.builder();
+ for (Map.Entry<String, Method> propAndGetter : propsToGetters.entries()) {
+ String prop = propAndGetter.getKey();
+ Method getter = propAndGetter.getValue();
+
+ @SuppressWarnings("unchecked")
+ Class<? extends PipelineOptions> declaringClass =
+ (Class<? extends PipelineOptions>) getter.getDeclaringClass();
+
+ if (!PipelineOptions.class.isAssignableFrom(declaringClass)) {
+ continue;
+ }
+
+ if (declaringClass.isAnnotationPresent(Hidden.class)) {
+ continue;
+ }
+
+ setBuilder.add(PipelineOptionSpec.of(declaringClass, prop, getter));
+ }
+
+ return setBuilder.build();
+ }
+
+ /**
+ * Retrieve metadata for the full set of pipeline options visible within the type hierarchy
+ * closure of the set of input interfaces. An option is "visible" if:
+ *
+ * <ul>
+ * <li>The option is defined within the interface hierarchy closure of the input
+ * {@link PipelineOptions}.</li>
+ * <li>The defining interface is not marked {@link Hidden}.</li>
+ * </ul>
+ */
+ static Set<PipelineOptionSpec> getOptionSpecs(
+ Iterable<Class<? extends PipelineOptions>> optionsInterfaces) {
+ ImmutableSet.Builder<PipelineOptionSpec> setBuilder = ImmutableSet.builder();
+ for (Class<? extends PipelineOptions> optionsInterface : optionsInterfaces) {
+ setBuilder.addAll(getOptionSpecs(optionsInterface));
+ }
+
+ return setBuilder.build();
+ }
+
+ /**
+ * Extract pipeline options and their respective getter methods from a series of
+ * {@link Method methods}. A single pipeline option may appear in many methods.
+ *
+ * @return A mapping of option name to the input methods which declare it.
+ */
+ static Multimap<String, Method> getPropertyNamesToGetters(Iterable<Method> methods) {
+ Multimap<String, Method> propertyNamesToGetters = HashMultimap.create();
+ for (Method method : methods) {
+ String methodName = method.getName();
+ if ((!methodName.startsWith("get")
+ && !methodName.startsWith("is"))
+ || method.getParameterTypes().length != 0
+ || method.getReturnType() == void.class) {
+ continue;
+ }
+ String propertyName = Introspector.decapitalize(
+ methodName.startsWith("is") ? methodName.substring(2) : methodName.substring(3));
+ propertyNamesToGetters.put(propertyName, method);
+ }
+ return propertyNamesToGetters;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index e281625..a269f4e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -19,16 +19,22 @@ package org.apache.beam.sdk.options;
import org.apache.beam.sdk.options.PipelineOptionsFactory.JsonIgnorePredicate;
import org.apache.beam.sdk.options.PipelineOptionsFactory.Registration;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.common.ReflectHelpers;
+import com.google.auto.value.AutoValue;
import com.google.common.base.Defaults;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import com.google.common.collect.MutableClassToInstanceMap;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -52,6 +58,8 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -59,7 +67,7 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
-
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/**
@@ -85,16 +93,26 @@ class ProxyInvocationHandler implements InvocationHandler {
private final int hashCode = (int) (Math.random() * Integer.MAX_VALUE);
private final Set<Class<? extends PipelineOptions>> knownInterfaces;
private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
- private final Map<String, Object> options;
+ private final Map<String, BoundValue> options;
private final Map<String, JsonNode> jsonOptions;
private final Map<String, String> gettersToPropertyNames;
private final Map<String, String> settersToPropertyNames;
ProxyInvocationHandler(Map<String, Object> options) {
- this(options, Maps.<String, JsonNode>newHashMap());
+ this(bindOptions(options), Maps.<String, JsonNode>newHashMap());
+ }
+
+ private static Map<String, BoundValue> bindOptions(Map<String, Object> inputOptions) {
+ HashMap<String, BoundValue> options = Maps.newHashMap();
+ for (Map.Entry<String, Object> entry : inputOptions.entrySet()) {
+ options.put(entry.getKey(), BoundValue.fromExplicitOption(entry.getValue()));
+ }
+
+ return options;
}
- private ProxyInvocationHandler(Map<String, Object> options, Map<String, JsonNode> jsonOptions) {
+ private ProxyInvocationHandler(
+ Map<String, BoundValue> options, Map<String, JsonNode> jsonOptions) {
this.options = options;
this.jsonOptions = jsonOptions;
this.knownInterfaces = new HashSet<>(PipelineOptionsFactory.getRegisteredOptions());
@@ -119,21 +137,27 @@ class ProxyInvocationHandler implements InvocationHandler {
@SuppressWarnings("unchecked")
Class<? extends PipelineOptions> clazz = (Class<? extends PipelineOptions>) args[0];
return cloneAs(proxy, clazz);
+ } else if (args != null && "populateDisplayData".equals(method.getName())
+ && args[0] instanceof DisplayData.Builder) {
+ @SuppressWarnings("unchecked")
+ DisplayData.Builder builder = (DisplayData.Builder) args[0];
+ populateDisplayData(builder);
+ return Void.TYPE;
}
String methodName = method.getName();
synchronized (this) {
- if (gettersToPropertyNames.keySet().contains(methodName)) {
+ if (gettersToPropertyNames.containsKey(methodName)) {
String propertyName = gettersToPropertyNames.get(methodName);
if (!options.containsKey(propertyName)) {
// Lazy bind the default to the method.
Object value = jsonOptions.containsKey(propertyName)
? getValueFromJson(propertyName, method)
: getDefault((PipelineOptions) proxy, method);
- options.put(propertyName, value);
+ options.put(propertyName, BoundValue.fromDefault(value));
}
- return options.get(propertyName);
+ return options.get(propertyName).getValue();
} else if (settersToPropertyNames.containsKey(methodName)) {
- options.put(settersToPropertyNames.get(methodName), args[0]);
+ options.put(settersToPropertyNames.get(methodName), BoundValue.fromExplicitOption(args[0]));
return Void.TYPE;
}
}
@@ -142,6 +166,35 @@ class ProxyInvocationHandler implements InvocationHandler {
}
/**
+ * Track whether options values are explicitly set, or retrieved from defaults.
+ */
+ @AutoValue
+ abstract static class BoundValue {
+ @Nullable
+ abstract Object getValue();
+
+ abstract boolean isDefault();
+
+ private static BoundValue of(@Nullable Object value, boolean isDefault) {
+ return new AutoValue_ProxyInvocationHandler_BoundValue(value, isDefault);
+ }
+
+ /**
+ * Create a {@link BoundValue} representing an explicitly set option.
+ */
+ static BoundValue fromExplicitOption(@Nullable Object value) {
+ return BoundValue.of(value, false);
+ }
+
+ /**
+ * Create a {@link BoundValue} representing a default option value.
+ */
+ static BoundValue fromDefault(@Nullable Object value) {
+ return BoundValue.of(value, true);
+ }
+ }
+
+ /**
* Backing implementation for {@link PipelineOptions#as(Class)}.
*
* @param iface The interface that the returned object needs to implement.
@@ -210,6 +263,128 @@ class ProxyInvocationHandler implements InvocationHandler {
}
/**
+ * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set
+ * pipeline options will be added as display data.
+ */
+ private void populateDisplayData(DisplayData.Builder builder) {
+ Set<PipelineOptionSpec> optionSpecs = PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
+ Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
+
+ for (Map.Entry<String, BoundValue> option : options.entrySet()) {
+ BoundValue boundValue = option.getValue();
+ if (boundValue.isDefault()) {
+ continue;
+ }
+
+ Object value = boundValue.getValue() == null ? "" : boundValue.getValue();
+ DisplayData.Type type = DisplayData.inferType(value);
+ HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(option.getKey()));
+
+ for (PipelineOptionSpec optionSpec : specs) {
+ Class<?> pipelineInterface = optionSpec.getDefiningInterface();
+ if (type != null) {
+ builder.add(option.getKey(), type, value)
+ .withNamespace(pipelineInterface);
+ } else {
+ builder.add(option.getKey(), value.toString())
+ .withNamespace(pipelineInterface);
+ }
+ }
+ }
+
+ for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) {
+ if (options.containsKey(jsonOption.getKey())) {
+ // Option overwritten since deserialization; don't re-write
+ continue;
+ }
+
+ HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(jsonOption.getKey()));
+ if (specs.isEmpty()) {
+ builder.add(jsonOption.getKey(), jsonOption.getValue().toString())
+ .withNamespace(UnknownPipelineOptions.class);
+ } else {
+ for (PipelineOptionSpec spec : specs) {
+ Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod());
+ DisplayData.Type type = DisplayData.inferType(value);
+ if (type != null) {
+ builder.add(jsonOption.getKey(), type, value)
+ .withNamespace(spec.getDefiningInterface());
+ } else {
+ builder.add(jsonOption.getKey(), value.toString())
+ .withNamespace(spec.getDefiningInterface());
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Marker interface used when the original {@link PipelineOptions} interface is not known at
+ * runtime. This can occur if {@link PipelineOptions} are deserialized from JSON.
+ *
+ * <p>Pipeline authors can ensure {@link PipelineOptions} type information is available at
+ * runtime by registering their {@link PipelineOptions options} interfaces. See the "Registration"
+ * section of {@link PipelineOptions} documentation.
+ */
+ interface UnknownPipelineOptions extends PipelineOptions {}
+
+ /**
+ * Construct a mapping from an option name to its {@link PipelineOptions} interface(s)
+ * declarations. An option may be declared in multiple interfaces. If it is overridden in a
+ * type hierarchy, only the overriding interface will be included.
+ */
+ private Multimap<String, PipelineOptionSpec> buildOptionNameToSpecMap(
+ Set<PipelineOptionSpec> props) {
+
+ Multimap<String, PipelineOptionSpec> optionsMap = HashMultimap.create();
+ for (PipelineOptionSpec prop : props) {
+ optionsMap.put(prop.getName(), prop);
+ }
+
+ // Filter out overridden options
+ for (Map.Entry<String, Collection<PipelineOptionSpec>> entry : optionsMap.asMap().entrySet()) {
+
+ /* Compare all interfaces for an option pairwise (iface1, iface2) to look for type
+ hierarchies. If one is the base-class of the other, remove it from the output and continue
+ iterating.
+
+ This is an N^2 operation per-option, but the number of interfaces defining an option
+ should always be small (usually 1). */
+ List<PipelineOptionSpec> specs = Lists.newArrayList(entry.getValue());
+ if (specs.size() < 2) {
+ // Only one known implementing interface, no need to check for inheritance
+ continue;
+ }
+
+ for (int i = 0; i < specs.size() - 1; i++) {
+ Class<?> iface1 = specs.get(i).getDefiningInterface();
+ for (int j = i + 1; j < specs.size(); j++) {
+ Class<?> iface2 = specs.get(j).getDefiningInterface();
+
+ if (iface1.isAssignableFrom(iface2)) {
+ optionsMap.remove(entry.getKey(), specs.get(i));
+ specs.remove(i);
+
+ // Removed element at current "i" index. Set iterators to re-evaluate
+ // new "i" element in outer loop.
+ i--;
+ j = specs.size();
+ } else if (iface2.isAssignableFrom(iface1)) {
+ optionsMap.remove(entry.getKey(), specs.get(j));
+ specs.remove(j);
+
+ // Removed element at current "j" index. Set iterator to re-evaluate
+ // new "j" element in inner-loop.
+ j--;
+ }
+ }
+ }
+ }
+
+ return optionsMap;
+ }
+
+ /**
* This will output all the currently set values. This is a relatively costly function
* as it will call {@code toString()} on each object that has been set and format
* the results in a readable format.
@@ -222,7 +397,9 @@ class ProxyInvocationHandler implements InvocationHandler {
// Add the options that we received from deserialization
sortedOptions.putAll(jsonOptions);
// Override with any programmatically set options.
- sortedOptions.putAll(options);
+ for (Map.Entry<String, BoundValue> entry : options.entrySet()) {
+ sortedOptions.put(entry.getKey(), entry.getValue().getValue());
+ }
StringBuilder b = new StringBuilder();
b.append("Current Settings:\n");
@@ -347,7 +524,7 @@ class ProxyInvocationHandler implements InvocationHandler {
// We first filter out any properties that have been modified since
// the last serialization of this PipelineOptions and then verify that
// they are all serializable.
- Map<String, Object> filteredOptions = Maps.newHashMap(handler.options);
+ Map<String, BoundValue> filteredOptions = Maps.newHashMap(handler.options);
removeIgnoredOptions(handler.knownInterfaces, filteredOptions);
ensureSerializable(handler.knownInterfaces, filteredOptions);
@@ -356,10 +533,23 @@ class ProxyInvocationHandler implements InvocationHandler {
// instances that have been modified since the previous serialization.
Map<String, Object> serializableOptions =
Maps.<String, Object>newHashMap(handler.jsonOptions);
- serializableOptions.putAll(filteredOptions);
+ for (Map.Entry<String, BoundValue> entry : filteredOptions.entrySet()) {
+ serializableOptions.put(entry.getKey(), entry.getValue().getValue());
+ }
+
jgen.writeStartObject();
jgen.writeFieldName("options");
jgen.writeObject(serializableOptions);
+
+ List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
+ for (DisplayData.Item item : DisplayData.from(value).items()) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> serializedItem = MAPPER.convertValue(item, Map.class);
+ serializedDisplayData.add(serializedItem);
+ }
+
+ jgen.writeFieldName("display_data");
+ jgen.writeObject(serializedDisplayData);
jgen.writeEndObject();
}
}
@@ -369,7 +559,7 @@ class ProxyInvocationHandler implements InvocationHandler {
* {@link JsonIgnore @JsonIgnore} from the passed in options using the passed in interfaces.
*/
private void removeIgnoredOptions(
- Set<Class<? extends PipelineOptions>> interfaces, Map<String, Object> options) {
+ Set<Class<? extends PipelineOptions>> interfaces, Map<String, ?> options) {
// Find all the method names that are annotated with JSON ignore.
Set<String> jsonIgnoreMethodNames = FluentIterable.from(
ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces))
@@ -394,7 +584,7 @@ class ProxyInvocationHandler implements InvocationHandler {
* and deserializable.
*/
private void ensureSerializable(Set<Class<? extends PipelineOptions>> interfaces,
- Map<String, Object> options) throws IOException {
+ Map<String, BoundValue> options) throws IOException {
// Construct a map from property name to the return type of the getter.
Map<String, Type> propertyToReturnType = Maps.newHashMap();
for (PropertyDescriptor descriptor
@@ -406,16 +596,16 @@ class ProxyInvocationHandler implements InvocationHandler {
}
// Attempt to serialize and deserialize each property.
- for (Map.Entry<String, Object> entry : options.entrySet()) {
+ for (Map.Entry<String, BoundValue> entry : options.entrySet()) {
try {
- String serializedValue = MAPPER.writeValueAsString(entry.getValue());
+ String serializedValue = MAPPER.writeValueAsString(entry.getValue().getValue());
JavaType type = MAPPER.getTypeFactory()
.constructType(propertyToReturnType.get(entry.getKey()));
MAPPER.readValue(serializedValue, type);
} catch (Exception e) {
throw new IOException(String.format(
"Failed to serialize and deserialize property '%s' with value '%s'",
- entry.getKey(), entry.getValue()), e);
+ entry.getKey(), entry.getValue().getValue()), e);
}
}
}
@@ -435,7 +625,7 @@ class ProxyInvocationHandler implements InvocationHandler {
fields.put(field.getKey(), field.getValue());
}
PipelineOptions options =
- new ProxyInvocationHandler(Maps.<String, Object>newHashMap(), fields)
+ new ProxyInvocationHandler(Maps.<String, BoundValue>newHashMap(), fields)
.as(PipelineOptions.class);
return options;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 6065dc4..a1037a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -112,6 +112,21 @@ public class DisplayData {
}
@Override
+ public int hashCode() {
+ return entries.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof DisplayData) {
+ DisplayData that = (DisplayData) obj;
+ return Objects.equals(this.entries, that.entries);
+ }
+
+ return false;
+ }
+
+ @Override
public String toString() {
StringBuilder builder = new StringBuilder();
boolean isFirstLine = true;
@@ -844,7 +859,6 @@ public class DisplayData {
@Override
public ItemBuilder add(String key, Instant value) {
- checkNotNull(value);
return addItemIf(true, key, Type.TIMESTAMP, value);
}
@@ -861,7 +875,6 @@ public class DisplayData {
@Override
public ItemBuilder add(String key, Duration value) {
- checkNotNull(value);
return addItemIf(true, key, Type.DURATION, value);
}
@@ -878,7 +891,6 @@ public class DisplayData {
@Override
public ItemBuilder add(String key, Class<?> value) {
- checkNotNull(value);
return addItemIf(true, key, Type.JAVA_CLASS, value);
}
@@ -912,17 +924,17 @@ public class DisplayData {
@Override
public ItemBuilder add(String key, Type type, Object value) {
- checkNotNull(value);
checkNotNull(type);
return addItemIf(true, key, type, value);
}
private ItemBuilder addItemIf(boolean condition, String key, Type type, Object value) {
- checkNotNull(key);
- checkArgument(!key.isEmpty());
-
+ checkNotNull(key, "Display data keys cannot be null or empty.");
+ checkArgument(!key.isEmpty(), "Display data keys cannot be null or empty.");
commitLatest();
+
if (condition) {
+ checkNotNull(value, "Display data values cannot be null. Key: [%s]", key);
latestItem = Item.create(latestNs, key, type, value);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java
new file mode 100644
index 0000000..82f0329
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.options;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isOneOf;
+import static org.hamcrest.Matchers.not;
+
+import com.google.common.collect.ImmutableSet;
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Set;
+
+/**
+ * Unit tests for {@link PipelineOptionsReflector}.
+ */
+@RunWith(JUnit4.class)
+public class PipelineOptionsReflectorTest {
+ @Test
+ public void testGetOptionSpecs() throws NoSuchMethodException {
+ Set<PipelineOptionSpec> properties =
+ PipelineOptionsReflector.getOptionSpecs(SimpleOptions.class);
+
+ assertThat(properties, Matchers.hasItems(PipelineOptionSpec.of(
+ SimpleOptions.class, "foo", SimpleOptions.class.getDeclaredMethod("getFoo"))));
+ }
+
+ interface SimpleOptions extends PipelineOptions {
+ String getFoo();
+ void setFoo(String value);
+ }
+
+ @Test
+ public void testFiltersNonGetterMethods() {
+ Set<PipelineOptionSpec> properties =
+ PipelineOptionsReflector.getOptionSpecs(OnlyTwoValidGetters.class);
+
+ assertThat(properties, not(hasItem(hasName(isOneOf("misspelled", "hasParameter", "prefix")))));
+ }
+
+ interface OnlyTwoValidGetters extends PipelineOptions {
+ String getFoo();
+ void setFoo(String value);
+
+ boolean isBar();
+ void setBar(boolean value);
+
+ String gtMisspelled();
+ void setMisspelled(String value);
+
+ String getHasParameter(String value);
+ void setHasParameter(String value);
+
+ String noPrefix();
+ void setNoPrefix(String value);
+ }
+
+ @Test
+ public void testBaseClassOptions() {
+ Set<PipelineOptionSpec> props =
+ PipelineOptionsReflector.getOptionSpecs(ExtendsSimpleOptions.class);
+
+ assertThat(props, Matchers.hasItem(
+ allOf(hasName("foo"), hasClass(SimpleOptions.class))));
+ assertThat(props, Matchers.hasItem(
+ allOf(hasName("foo"), hasClass(ExtendsSimpleOptions.class))));
+ assertThat(props, Matchers.hasItem(
+ allOf(hasName("bar"), hasClass(ExtendsSimpleOptions.class))));
+ }
+
+ interface ExtendsSimpleOptions extends SimpleOptions {
+ @Override String getFoo();
+ @Override void setFoo(String value);
+
+ String getBar();
+ void setBar(String value);
+ }
+
+ @Test
+ public void testExcludesNonPipelineOptionsMethods() {
+ Set<PipelineOptionSpec> properties =
+ PipelineOptionsReflector.getOptionSpecs(ExtendsNonPipelineOptions.class);
+
+ assertThat(properties, not(hasItem(hasName("foo"))));
+ }
+
+ interface NoExtendsClause {
+ String getFoo();
+ void setFoo(String value);
+ }
+
+ interface ExtendsNonPipelineOptions extends NoExtendsClause, PipelineOptions {}
+
+ @Test
+ public void testExcludesHiddenInterfaces() {
+ Set<PipelineOptionSpec> properties =
+ PipelineOptionsReflector.getOptionSpecs(HiddenOptions.class);
+
+ assertThat(properties, not(hasItem(hasName("foo"))));
+ }
+
+ @Hidden
+ interface HiddenOptions extends PipelineOptions {
+ String getFoo();
+ void setFoo(String value);
+ }
+
+ @Test
+ public void testMultipleInputInterfaces() {
+ Set<Class<? extends PipelineOptions>> interfaces =
+ ImmutableSet.<Class<? extends PipelineOptions>>of(
+ BaseOptions.class,
+ ExtendOptions1.class,
+ ExtendOptions2.class);
+
+ Set<PipelineOptionSpec> props = PipelineOptionsReflector.getOptionSpecs(interfaces);
+
+ assertThat(props, Matchers.hasItem(allOf(hasName("baseOption"), hasClass(BaseOptions.class))));
+ assertThat(props, Matchers.hasItem(
+ allOf(hasName("extendOption1"), hasClass(ExtendOptions1.class))));
+ assertThat(props, Matchers.hasItem(
+ allOf(hasName("extendOption2"), hasClass(ExtendOptions2.class))));
+ }
+
+ interface BaseOptions extends PipelineOptions {
+ String getBaseOption();
+ void setBaseOption(String value);
+ }
+
+ interface ExtendOptions1 extends BaseOptions {
+ String getExtendOption1();
+ void setExtendOption1(String value);
+ }
+
+ interface ExtendOptions2 extends BaseOptions {
+ String getExtendOption2();
+ void setExtendOption2(String value);
+ }
+
+ private static Matcher<PipelineOptionSpec> hasName(String name) {
+ return hasName(is(name));
+ }
+
+ private static Matcher<PipelineOptionSpec> hasName(Matcher<String> matcher) {
+ return new FeatureMatcher<PipelineOptionSpec, String>(matcher, "name", "name") {
+ @Override
+ protected String featureValueOf(PipelineOptionSpec actual) {
+ return actual.getName();
+ }
+ };
+ }
+
+ private static Matcher<PipelineOptionSpec> hasClass(Class<?> clazz) {
+ return new FeatureMatcher<PipelineOptionSpec, Class<?>>(
+ Matchers.<Class<?>>is(clazz), "defining class", "class") {
+ @Override
+ protected Class<?> featureValueOf(PipelineOptionSpec actual) {
+ return actual.getDefiningInterface();
+ }
+ };
+ }
+
+ private static Matcher<PipelineOptionSpec> hasGetter(String methodName) {
+ return new FeatureMatcher<PipelineOptionSpec, String>(
+ is(methodName), "getter method", "name") {
+ @Override
+ protected String featureValueOf(PipelineOptionSpec actual) {
+ return actual.getGetterMethod().getName();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 7f0fa14..b53000d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -17,12 +17,23 @@
*/
package org.apache.beam.sdk.options;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -33,12 +44,19 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -48,6 +66,14 @@ import java.util.Set;
@RunWith(JUnit4.class)
public class ProxyInvocationHandlerTest {
@Rule public ExpectedException expectedException = ExpectedException.none();
+ @Rule public TestRule resetPipelineOptionsRegistry = new ExternalResource() {
+ @Override
+ protected void before() {
+ PipelineOptionsFactory.resetRegistry();
+ }
+ };
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
/** A test interface with some primitives and objects. */
public static interface Simple extends PipelineOptions {
@@ -433,6 +459,19 @@ public class ProxyInvocationHandlerTest {
}
@Test
+ public void testResetRegistry() {
+ Set<Class<? extends PipelineOptions>> defaultRegistry =
+ new HashSet<>(PipelineOptionsFactory.getRegisteredOptions());
+ assertThat(defaultRegistry, not(hasItem(FooOptions.class)));
+
+ PipelineOptionsFactory.register(FooOptions.class);
+ assertThat(PipelineOptionsFactory.getRegisteredOptions(), hasItem(FooOptions.class));
+
+ PipelineOptionsFactory.resetRegistry();
+ assertEquals(defaultRegistry, PipelineOptionsFactory.getRegisteredOptions());
+ }
+
+ @Test
public void testJsonConversionForDefault() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
assertNotNull(serializeDeserialize(PipelineOptions.class, options));
@@ -683,10 +722,233 @@ public class ProxyInvocationHandlerTest {
assertEquals("TestString", options2.getValue().getValue());
}
+ @Test
+ public void testDisplayDataItemProperties() {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setTempLocation("myTemp");
+ DisplayData displayData = DisplayData.from(options);
+
+ assertThat(displayData, hasDisplayItem(allOf(
+ hasKey("tempLocation"),
+ hasType(DisplayData.Type.STRING),
+ hasValue("myTemp"),
+ hasNamespace(PipelineOptions.class)
+ )));
+ }
+
+ @Test
+ public void testDisplayDataTypes() {
+ Instant now = Instant.now();
+
+ TypedOptions options = PipelineOptionsFactory.as(TypedOptions.class);
+ options.setInteger(1234);
+ options.setTimestamp(now);
+ options.setJavaClass(ProxyInvocationHandlerTest.class);
+ options.setObject(new Serializable() {
+ @Override
+ public String toString() {
+ return "foobar";
+ }
+ });
+
+ DisplayData displayData = DisplayData.from(options);
+
+ assertThat(displayData, hasDisplayItem("integer", 1234));
+ assertThat(displayData, hasDisplayItem("timestamp", now));
+ assertThat(displayData, hasDisplayItem("javaClass", ProxyInvocationHandlerTest.class));
+ assertThat(displayData, hasDisplayItem("object", "foobar"));
+ }
+
+ interface TypedOptions extends PipelineOptions {
+ int getInteger();
+ void setInteger(int value);
+
+ Instant getTimestamp();
+ void setTimestamp(Instant value);
+
+ Class<?> getJavaClass();
+ void setJavaClass(Class<?> value);
+
+ Object getObject();
+ void setObject(Object value);
+ }
+
+ @Test
+ public void testDisplayDataInheritanceNamespace() {
+ ExtendsBaseOptions options = PipelineOptionsFactory.as(ExtendsBaseOptions.class);
+ options.setFoo("bar");
+
+ DisplayData displayData = DisplayData.from(options);
+
+ assertThat(displayData, hasDisplayItem(allOf(
+ hasKey("foo"),
+ hasValue("bar"),
+ hasNamespace(ExtendsBaseOptions.class)
+ )));
+ }
+
+ interface BaseOptions extends PipelineOptions {
+ String getFoo();
+ void setFoo(String value);
+ }
+
+ interface ExtendsBaseOptions extends BaseOptions {
+ @Override String getFoo();
+ @Override void setFoo(String value);
+ }
+
+ @Test
+ public void testDisplayDataExcludedFromOverriddenBaseClass() {
+ ExtendsBaseOptions options = PipelineOptionsFactory.as(ExtendsBaseOptions.class);
+ options.setFoo("bar");
+
+ DisplayData displayData = DisplayData.from(options);
+ assertThat(displayData, not(hasDisplayItem(hasNamespace(BaseOptions.class))));
+ }
+
+ @Test
+ public void testDisplayDataIncludedForDisjointInterfaceHierarchies() {
+ FooOptions fooOptions = PipelineOptionsFactory.as(FooOptions.class);
+ fooOptions.setFoo("foo");
+
+ BarOptions barOptions = fooOptions.as(BarOptions.class);
+ barOptions.setBar("bar");
+
+ DisplayData data = DisplayData.from(barOptions);
+ assertThat(data, hasDisplayItem(allOf(hasKey("foo"), hasNamespace(FooOptions.class))));
+ assertThat(data, hasDisplayItem(allOf(hasKey("bar"), hasNamespace(BarOptions.class))));
+ }
+
+ interface FooOptions extends PipelineOptions {
+ String getFoo();
+ void setFoo(String value);
+ }
+
+ interface BarOptions extends PipelineOptions {
+ String getBar();
+ void setBar(String value);
+ }
+
+ @Test
+ public void testDisplayDataExcludesDefaultValues() {
+ PipelineOptions options = PipelineOptionsFactory.as(HasDefaults.class);
+ DisplayData data = DisplayData.from(options);
+
+ assertThat(data, not(hasDisplayItem(hasKey("foo"))));
+ }
+
+ interface HasDefaults extends PipelineOptions {
+ @Default.String("bar")
+ String getFoo();
+ void setFoo(String value);
+ }
+
+ @Test
+ public void testDisplayDataExcludesValuesAccessedButNeverSet() {
+ HasDefaults options = PipelineOptionsFactory.as(HasDefaults.class);
+ assertEquals("bar", options.getFoo());
+
+ DisplayData data = DisplayData.from(options);
+ assertThat(data, not(hasDisplayItem(hasKey("foo"))));
+ }
+
+ @Test
+ public void testDisplayDataIncludesExplicitlySetDefaults() {
+ HasDefaults options = PipelineOptionsFactory.as(HasDefaults.class);
+ String defaultValue = options.getFoo();
+ options.setFoo(defaultValue);
+
+ DisplayData data = DisplayData.from(options);
+ assertThat(data, hasDisplayItem("foo", defaultValue));
+ }
+
+ @Test
+ public void testDisplayDataNullValuesConvertedToEmptyString() {
+ FooOptions options = PipelineOptionsFactory.as(FooOptions.class);
+ options.setFoo(null);
+
+ DisplayData data = DisplayData.from(options);
+ assertThat(data, hasDisplayItem("foo", ""));
+ }
+
+ @Test
+ public void testDisplayDataJsonSerialization() throws IOException {
+ FooOptions options = PipelineOptionsFactory.as(FooOptions.class);
+ options.setFoo("bar");
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class);
+
+ assertThat("main pipeline options data keyed as 'options'", map, Matchers.hasKey("options"));
+ assertThat("display data keyed as 'display_data'", map, Matchers.hasKey("display_data"));
+
+ Map<?, ?> expectedDisplayItem = ImmutableMap.<String, String>builder()
+ .put("namespace", FooOptions.class.getName())
+ .put("key", "foo")
+ .put("value", "bar")
+ .put("type", "STRING")
+ .build();
+
+ @SuppressWarnings("unchecked")
+ List<Map<?, ?>> deserializedDisplayData = (List<Map<?, ?>>) map.get("display_data");
+ assertThat(deserializedDisplayData, hasItem(expectedDisplayItem));
+ }
+
+ @Test
+ public void testDisplayDataFromDeserializedJson() throws Exception {
+ FooOptions options = PipelineOptionsFactory.as(FooOptions.class);
+ options.setFoo("bar");
+ DisplayData data = DisplayData.from(options);
+ assertThat(data, hasDisplayItem("foo", "bar"));
+
+ FooOptions deserializedOptions = serializeDeserialize(FooOptions.class, options);
+ DisplayData dataAfterDeserialization = DisplayData.from(deserializedOptions);
+ assertEquals(data, dataAfterDeserialization);
+ }
+
+ @Test
+ public void testDisplayDataDeserializationWithRegistration() throws Exception {
+ PipelineOptionsFactory.register(HasClassOptions.class);
+ HasClassOptions options = PipelineOptionsFactory.as(HasClassOptions.class);
+ options.setClassOption(ProxyInvocationHandlerTest.class);
+
+ PipelineOptions deserializedOptions = serializeDeserialize(PipelineOptions.class, options);
+ DisplayData displayData = DisplayData.from(deserializedOptions);
+ assertThat(displayData, hasDisplayItem("classOption", ProxyInvocationHandlerTest.class));
+ }
+
+ @Test
+ public void testDisplayDataMissingPipelineOptionsRegistration() throws Exception {
+ HasClassOptions options = PipelineOptionsFactory.as(HasClassOptions.class);
+ options.setClassOption(ProxyInvocationHandlerTest.class);
+
+ PipelineOptions deserializedOptions = serializeDeserialize(PipelineOptions.class, options);
+ DisplayData displayData = DisplayData.from(deserializedOptions);
+ String expectedJsonValue = MAPPER.writeValueAsString(ProxyInvocationHandlerTest.class);
+ assertThat(displayData, hasDisplayItem("classOption", expectedJsonValue));
+ }
+
+ interface HasClassOptions extends PipelineOptions {
+ Class<?> getClassOption();
+ void setClassOption(Class<?> value);
+ }
+
+ @Test
+ public void testDisplayDataJsonValueSetAfterDeserialization() throws Exception {
+ FooOptions options = PipelineOptionsFactory.as(FooOptions.class);
+ options.setFoo("bar");
+ DisplayData data = DisplayData.from(options);
+ assertThat(data, hasDisplayItem("foo", "bar"));
+
+ FooOptions deserializedOptions = serializeDeserialize(FooOptions.class, options);
+ deserializedOptions.setFoo("baz");
+ DisplayData dataAfterDeserialization = DisplayData.from(deserializedOptions);
+ assertThat(dataAfterDeserialization, hasDisplayItem("foo", "baz"));
+ }
+
private <T extends PipelineOptions> T serializeDeserialize(Class<T> kls, PipelineOptions options)
throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- String value = mapper.writeValueAsString(options);
- return mapper.readValue(value, PipelineOptions.class).as(kls);
+ String value = MAPPER.writeValueAsString(options);
+ return MAPPER.readValue(value, PipelineOptions.class).as(kls);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 106c441..05d0f6f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -384,6 +384,31 @@ public class DisplayDataTest {
}
@Test
+ public void testDisplayDataEquality() {
+ HasDisplayData component1 = new HasDisplayData() {
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+ HasDisplayData component2 = new HasDisplayData() {
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+
+ DisplayData component1DisplayData1 = DisplayData.from(component1);
+ DisplayData component1DisplayData2 = DisplayData.from(component1);
+ DisplayData component2DisplayData = DisplayData.from(component2);
+
+ new EqualsTester()
+ .addEqualityGroup(component1DisplayData1, component1DisplayData2)
+ .addEqualityGroup(component2DisplayData)
+ .testEquals();
+ }
+
+ @Test
public void testAnonymousClassNamespace() {
DisplayData data =
DisplayData.from(
[2/2] incubator-beam git commit: [BEAM-173] This closes #127
Posted by lc...@apache.org.
[BEAM-173] This closes #127
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f3e6c535
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f3e6c535
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f3e6c535
Branch: refs/heads/master
Commit: f3e6c535138b39623c9ea18c1edb8bb0d3aaeab5
Parents: 692f3a1 2c16bdd
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 22 15:42:07 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 22 15:42:07 2016 -0700
----------------------------------------------------------------------
.../runners/DataflowPipelineTranslatorTest.java | 5 +-
.../beam/sdk/options/PipelineOptionSpec.java | 89 ++++++
.../beam/sdk/options/PipelineOptions.java | 4 +-
.../sdk/options/PipelineOptionsFactory.java | 128 +++++----
.../sdk/options/PipelineOptionsReflector.java | 112 ++++++++
.../sdk/options/ProxyInvocationHandler.java | 224 ++++++++++++++--
.../sdk/transforms/display/DisplayData.java | 26 +-
.../options/PipelineOptionsReflectorTest.java | 196 ++++++++++++++
.../sdk/options/ProxyInvocationHandlerTest.java | 268 ++++++++++++++++++-
.../sdk/transforms/display/DisplayDataTest.java | 25 ++
10 files changed, 979 insertions(+), 98 deletions(-)
----------------------------------------------------------------------