You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/14 05:42:30 UTC
[1/2] incubator-beam git commit: Add a basic implementation of
StaticValueProvider and DynamicValueProvider
Repository: incubator-beam
Updated Branches:
refs/heads/master 93d2e374c -> 5bfeb958d
Add a basic implementation of StaticValueProvider and DynamicValueProvider
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66686e63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66686e63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66686e63
Branch: refs/heads/master
Commit: 66686e63f1d55fab05ceb70d71b5f43c5b78e077
Parents: 93d2e37
Author: sammcveety <sa...@gmail.com>
Authored: Mon Sep 26 18:27:15 2016 -0400
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Oct 13 22:38:07 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/options/PipelineOptions.java | 23 ++
.../sdk/options/PipelineOptionsFactory.java | 15 +-
.../sdk/options/ProxyInvocationHandler.java | 94 +++++---
.../apache/beam/sdk/options/ValueProvider.java | 228 +++++++++++++++++++
.../sdk/options/ProxyInvocationHandlerTest.java | 12 +-
.../beam/sdk/options/ValueProviderTest.java | 213 +++++++++++++++++
.../apache/beam/sdk/util/ApiSurfaceTest.java | 3 +
7 files changed, 552 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index deb1cf4..3d6cad6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -26,6 +26,7 @@ import com.google.common.base.MoreObjects;
import java.lang.reflect.Proxy;
import java.util.ServiceLoader;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
@@ -327,4 +328,26 @@ public interface PipelineOptions extends HasDisplayData {
normalizedAppName, normalizedUserName, datePart, randomPart);
}
}
+
+ /**
+ * Provides a unique ID for this {@link PipelineOptions} object, assigned at graph
+ * construction time.
+ */
+ @Hidden
+ @Default.InstanceFactory(AtomicLongFactory.class)
+ Long getOptionsId();
+ void setOptionsId(Long id);
+
+ /**
+ * {@link DefaultValueFactory} which supplies an ID that is guaranteed to be unique
+ * within the given process.
+ */
+ class AtomicLongFactory implements DefaultValueFactory<Long> {
+ private static final AtomicLong NEXT_ID = new AtomicLong(0);
+
+ @Override
+ public Long create(PipelineOptions options) {
+ return NEXT_ID.getAndIncrement();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index 9fc6c2c..cd0c6b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -54,6 +54,7 @@ import java.io.PrintStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.util.ArrayList;
@@ -1440,8 +1441,12 @@ public class PipelineOptionsFactory {
}
}
} else if ((returnType.isArray() && (SIMPLE_TYPES.contains(returnType.getComponentType())
- || returnType.getComponentType().isEnum()))
- || Collection.class.isAssignableFrom(returnType)) {
+ || returnType.getComponentType().isEnum()))
+ || Collection.class.isAssignableFrom(returnType)
+ || (returnType.equals(ValueProvider.class)
+ && MAPPER.getTypeFactory().constructType(
+ ((ParameterizedType) method.getGenericReturnType())
+ .getActualTypeArguments()[0]).isCollectionLikeType())) {
// Split any strings with ","
List<String> values = FluentIterable.from(entry.getValue())
.transformAndConcat(new Function<String, Iterable<String>>() {
@@ -1452,7 +1457,8 @@ public class PipelineOptionsFactory {
}).toList();
if (returnType.isArray() && !returnType.getComponentType().equals(String.class)
- || Collection.class.isAssignableFrom(returnType)) {
+ || Collection.class.isAssignableFrom(returnType)
+ || returnType.equals(ValueProvider.class)) {
for (String value : values) {
checkArgument(!value.isEmpty(),
"Empty argument value is only allowed for String, String Array, "
@@ -1461,7 +1467,8 @@ public class PipelineOptionsFactory {
}
}
convertedOptions.put(entry.getKey(), MAPPER.convertValue(values, type));
- } else if (SIMPLE_TYPES.contains(returnType) || returnType.isEnum()) {
+ } else if (SIMPLE_TYPES.contains(returnType) || returnType.isEnum()
+ || returnType.equals(ValueProvider.class)) {
String value = Iterables.getOnlyElement(entry.getValue());
checkArgument(returnType.equals(String.class) || !value.isEmpty(),
"Empty argument value is only allowed for String, String Array, "
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index c438a43..47d7cee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -65,6 +65,8 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.options.PipelineOptionsFactory.JsonIgnorePredicate;
import org.apache.beam.sdk.options.PipelineOptionsFactory.Registration;
+import org.apache.beam.sdk.options.ValueProvider.RuntimeValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.InstanceBuilder;
@@ -202,7 +204,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
*/
synchronized <T extends PipelineOptions> T as(Class<T> iface) {
checkNotNull(iface);
- checkArgument(iface.isInterface());
+ checkArgument(iface.isInterface(), "Not an interface: %s", iface);
if (!interfaceToProxyCache.containsKey(iface)) {
Registration<T> registration =
PipelineOptionsFactory.validateWellFormed(iface, knownInterfaces);
@@ -468,36 +470,34 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private Object getDefault(PipelineOptions proxy, Method method) {
+ if (method.getReturnType().equals(RuntimeValueProvider.class)) {
+ throw new RuntimeException(String.format(
+ "Method %s should not have return type "
+ + "RuntimeValueProvider, use ValueProvider instead.", method.getName()));
+ }
+ if (method.getReturnType().equals(StaticValueProvider.class)) {
+ throw new RuntimeException(String.format(
+ "Method %s should not have return type "
+ + "StaticValueProvider, use ValueProvider instead.", method.getName()));
+ }
+ @Nullable Object defaultObject = null;
for (Annotation annotation : method.getAnnotations()) {
- if (annotation instanceof Default.Class) {
- return ((Default.Class) annotation).value();
- } else if (annotation instanceof Default.String) {
- return ((Default.String) annotation).value();
- } else if (annotation instanceof Default.Boolean) {
- return ((Default.Boolean) annotation).value();
- } else if (annotation instanceof Default.Character) {
- return ((Default.Character) annotation).value();
- } else if (annotation instanceof Default.Byte) {
- return ((Default.Byte) annotation).value();
- } else if (annotation instanceof Default.Short) {
- return ((Default.Short) annotation).value();
- } else if (annotation instanceof Default.Integer) {
- return ((Default.Integer) annotation).value();
- } else if (annotation instanceof Default.Long) {
- return ((Default.Long) annotation).value();
- } else if (annotation instanceof Default.Float) {
- return ((Default.Float) annotation).value();
- } else if (annotation instanceof Default.Double) {
- return ((Default.Double) annotation).value();
- } else if (annotation instanceof Default.Enum) {
- return Enum.valueOf((Class<Enum>) method.getReturnType(),
- ((Default.Enum) annotation).value());
- } else if (annotation instanceof Default.InstanceFactory) {
- return InstanceBuilder.ofType(((Default.InstanceFactory) annotation).value())
- .build()
- .create(proxy);
+ defaultObject = returnDefaultHelper(annotation, proxy, method);
+ if (defaultObject != null) {
+ break;
}
}
+ if (method.getReturnType().equals(ValueProvider.class)) {
+ return defaultObject == null
+ ? new RuntimeValueProvider(
+ method.getName(), (Class<? extends PipelineOptions>) method.getDeclaringClass(),
+ proxy.getOptionsId())
+ : new RuntimeValueProvider(
+ method.getName(), (Class<? extends PipelineOptions>) method.getDeclaringClass(),
+ defaultObject, proxy.getOptionsId());
+ } else if (defaultObject != null) {
+ return defaultObject;
+ }
/*
* We need to make sure that we return something appropriate for the return type. Thus we return
@@ -507,6 +507,43 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
}
/**
+ * Helper method to return standard Default cases.
+ */
+ @Nullable
+ private Object returnDefaultHelper(
+ Annotation annotation, PipelineOptions proxy, Method method) {
+ if (annotation instanceof Default.Class) {
+ return ((Default.Class) annotation).value();
+ } else if (annotation instanceof Default.String) {
+ return ((Default.String) annotation).value();
+ } else if (annotation instanceof Default.Boolean) {
+ return ((Default.Boolean) annotation).value();
+ } else if (annotation instanceof Default.Character) {
+ return ((Default.Character) annotation).value();
+ } else if (annotation instanceof Default.Byte) {
+ return ((Default.Byte) annotation).value();
+ } else if (annotation instanceof Default.Short) {
+ return ((Default.Short) annotation).value();
+ } else if (annotation instanceof Default.Integer) {
+ return ((Default.Integer) annotation).value();
+ } else if (annotation instanceof Default.Long) {
+ return ((Default.Long) annotation).value();
+ } else if (annotation instanceof Default.Float) {
+ return ((Default.Float) annotation).value();
+ } else if (annotation instanceof Default.Double) {
+ return ((Default.Double) annotation).value();
+ } else if (annotation instanceof Default.Enum) {
+ return Enum.valueOf((Class<Enum>) method.getReturnType(),
+ ((Default.Enum) annotation).value());
+ } else if (annotation instanceof Default.InstanceFactory) {
+ return InstanceBuilder.ofType(((Default.InstanceFactory) annotation).value())
+ .build()
+ .create(proxy);
+ }
+ return null;
+ }
+
+ /**
* Returns a map from the getters method name to the name of the property based upon the passed in
* {@link PropertyDescriptor}s property descriptors.
*
@@ -657,6 +694,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
PipelineOptions options =
new ProxyInvocationHandler(Maps.<String, BoundValue>newHashMap(), fields)
.as(PipelineOptions.class);
+ ValueProvider.RuntimeValueProvider.setRuntimeOptions(options);
return options;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
new file mode 100644
index 0000000..e4502fc
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.BeanProperty;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.ContextualDeserializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+
+/**
+ * {@link ValueProvider} is an interface which abstracts the notion of
+ * fetching a value that may or may not be currently available. This can be
+ * used to parameterize transforms that only read values in at runtime, for
+ * example.
+ */
+@JsonSerialize(using = ValueProvider.Serializer.class)
+@JsonDeserialize(using = ValueProvider.Deserializer.class)
+public interface ValueProvider<T> {
+ /**
+ * Return the value wrapped by this {@link ValueProvider}.
+ */
+ T get();
+
+ /**
+ * Whether the contents of this {@link ValueProvider} is available to
+ * routines that run at graph construction time.
+ */
+ boolean isAccessible();
+
+ /**
+ * {@link StaticValueProvider} is an implementation of {@link ValueProvider} that
+ * allows for a static value to be provided.
+ */
+ public static class StaticValueProvider<T> implements ValueProvider<T>, Serializable {
+ @Nullable
+ private final T value;
+
+ StaticValueProvider(@Nullable T value) {
+ this.value = value;
+ }
+
+ /**
+ * Creates a {@link StaticValueProvider} that wraps the provided value.
+ */
+ public static <T> StaticValueProvider<T> of(T value) {
+ StaticValueProvider<T> factory = new StaticValueProvider<>(value);
+ return factory;
+ }
+
+ @Override
+ public T get() {
+ return value;
+ }
+
+ @Override
+ public boolean isAccessible() {
+ return true;
+ }
+ }
+
+ /**
+ * {@link RuntimeValueProvider} is an implementation of {@link ValueProvider} that
+ * allows for a value to be provided at execution time rather than at graph
+ * construction time.
+ *
+ * <p>To enforce this contract, if there is no default, users must only call
+ * {@link #get()} at execution time (after a call to {@link Pipeline#run}),
+ * which will provide the value of {@code optionsMap}.
+ */
+ public static class RuntimeValueProvider<T> implements ValueProvider<T>, Serializable {
+ private static ConcurrentHashMap<Long, PipelineOptions> optionsMap =
+ new ConcurrentHashMap<>();
+
+ private final Class<? extends PipelineOptions> klass;
+ private final String methodName;
+ @Nullable
+ private final T defaultValue;
+ private final Long optionsId;
+
+ /**
+ * Creates a {@link RuntimeValueProvider} that will query the provided
+ * {@code optionsId} for a value.
+ */
+ RuntimeValueProvider(String methodName, Class<? extends PipelineOptions> klass,
+ Long optionsId) {
+ this.methodName = methodName;
+ this.klass = klass;
+ this.defaultValue = null;
+ this.optionsId = optionsId;
+ }
+
+ /**
+ * Creates a {@link RuntimeValueProvider} that will query the provided
+ * {@code optionsId} for a value, or use the default if no value is available.
+ */
+ RuntimeValueProvider(String methodName, Class<? extends PipelineOptions> klass,
+ T defaultValue, Long optionsId) {
+ this.methodName = methodName;
+ this.klass = klass;
+ this.defaultValue = defaultValue;
+ this.optionsId = optionsId;
+ }
+
+ /**
+ * Once set, all {@code RuntimeValueProviders} will return {@code true}
+ * from {@code isAccessible()}. By default, the value is set when
+ * deserializing {@link PipelineOptions}.
+ */
+ static void setRuntimeOptions(PipelineOptions runtimeOptions) {
+ optionsMap.put(runtimeOptions.getOptionsId(), runtimeOptions);
+ }
+
+ @Override
+ public T get() {
+ PipelineOptions options = optionsMap.get(optionsId);
+ if (options == null) {
+ throw new RuntimeException("Not called from a runtime context.");
+ }
+ try {
+ Method method = klass.getMethod(methodName);
+ PipelineOptions methodOptions = options.as(klass);
+ InvocationHandler handler = Proxy.getInvocationHandler(methodOptions);
+ T value = ((ValueProvider<T>) handler.invoke(methodOptions, method, null)).get();
+ return firstNonNull(value, defaultValue);
+ } catch (Throwable e) {
+ throw new RuntimeException("Unable to load runtime value.", e);
+ }
+ }
+
+ @Override
+ public boolean isAccessible() {
+ PipelineOptions options = optionsMap.get(optionsId);
+ return options != null;
+ }
+ }
+
+ /**
+ * Serializer for {@link ValueProvider}.
+ */
+ static class Serializer extends JsonSerializer<ValueProvider<?>> {
+ @Override
+ public void serialize(ValueProvider<?> value, JsonGenerator jgen,
+ SerializerProvider provider) throws IOException {
+ if (value.isAccessible()) {
+ jgen.writeObject(value.get());
+ } else {
+ jgen.writeNull();
+ }
+ }
+ }
+
+ /**
+ * Deserializer for {@link ValueProvider}, which handles type marshalling.
+ */
+ static class Deserializer extends JsonDeserializer<ValueProvider<?>>
+ implements ContextualDeserializer {
+
+ private final JavaType innerType;
+
+ // A 0-arg constructor is required by the compiler.
+ Deserializer() {
+ this.innerType = null;
+ }
+
+ Deserializer(JavaType innerType) {
+ this.innerType = innerType;
+ }
+
+ @Override
+ public JsonDeserializer<?> createContextual(DeserializationContext ctxt,
+ BeanProperty property)
+ throws JsonMappingException {
+ checkNotNull(ctxt, "Null DeserializationContext.");
+ JavaType type = checkNotNull(ctxt.getContextualType(), "Invalid type: %s", getClass());
+ JavaType[] params = type.findTypeParameters(ValueProvider.class);
+ if (params.length != 1) {
+ throw new RuntimeException(
+ "Unable to derive type for ValueProvider: " + type.toString());
+ }
+ JavaType param = params[0];
+ return new Deserializer(param);
+ }
+
+ @Override
+ public ValueProvider<?> deserialize(JsonParser jp, DeserializationContext ctxt)
+ throws IOException, JsonProcessingException {
+ JsonDeserializer dser = ctxt.findRootValueDeserializer(
+ checkNotNull(innerType, "Invalid %s: innerType is null. Serialization error?", getClass()));
+ Object o = dser.deserialize(jp, ctxt);
+ return StaticValueProvider.of(o);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 5d8ef43..eecfff8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -244,12 +244,14 @@ public class ProxyInvocationHandlerTest {
public void testToStringAfterDeserializationContainsJsonEntries() throws Exception {
ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.<String, Object>newHashMap());
Simple proxy = handler.as(Simple.class);
+ Long optionsId = proxy.getOptionsId();
proxy.setString("stringValue");
DefaultAnnotations proxy2 = proxy.as(DefaultAnnotations.class);
proxy2.setLong(57L);
- assertEquals("Current Settings:\n"
+ assertEquals(String.format("Current Settings:\n"
+ " long: 57\n"
- + " string: \"stringValue\"\n",
+ + " optionsId: %d\n"
+ + " string: \"stringValue\"\n", optionsId),
serializeDeserialize(PipelineOptions.class, proxy2).toString());
}
@@ -257,14 +259,16 @@ public class ProxyInvocationHandlerTest {
public void testToStringAfterDeserializationContainsOverriddenEntries() throws Exception {
ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.<String, Object>newHashMap());
Simple proxy = handler.as(Simple.class);
+ Long optionsId = proxy.getOptionsId();
proxy.setString("stringValue");
DefaultAnnotations proxy2 = proxy.as(DefaultAnnotations.class);
proxy2.setLong(57L);
Simple deserializedOptions = serializeDeserialize(Simple.class, proxy2);
deserializedOptions.setString("overriddenValue");
- assertEquals("Current Settings:\n"
+ assertEquals(String.format("Current Settings:\n"
+ " long: 57\n"
- + " string: overriddenValue\n",
+ + " optionsId: %d\n"
+ + " string: overriddenValue\n", optionsId),
deserializedOptions.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
new file mode 100644
index 0000000..0cde615
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.options.ValueProvider.RuntimeValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ValueProvider}. */
+@RunWith(JUnit4.class)
+public class ValueProviderTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ /** A test interface. */
+ public static interface TestOptions extends PipelineOptions {
+ @Default.String("bar")
+ ValueProvider<String> getBar();
+ void setBar(ValueProvider<String> bar);
+
+ ValueProvider<String> getFoo();
+ void setFoo(ValueProvider<String> foo);
+
+ ValueProvider<List<Integer>> getList();
+ void setList(ValueProvider<List<Integer>> list);
+ }
+
+ @Test
+ public void testCommandLineNoDefault() {
+ TestOptions options = PipelineOptionsFactory.fromArgs(
+ new String[]{"--foo=baz"}).as(TestOptions.class);
+ ValueProvider<String> provider = options.getFoo();
+ assertEquals("baz", provider.get());
+ assertTrue(provider.isAccessible());
+ }
+
+ @Test
+ public void testListValueProvider() {
+ TestOptions options = PipelineOptionsFactory.fromArgs(
+ new String[]{"--list=1,2,3"}).as(TestOptions.class);
+ ValueProvider<List<Integer>> provider = options.getList();
+ assertEquals(ImmutableList.of(1, 2, 3), provider.get());
+ assertTrue(provider.isAccessible());
+ }
+
+ @Test
+ public void testCommandLineWithDefault() {
+ TestOptions options = PipelineOptionsFactory.fromArgs(
+ new String[]{"--bar=baz"}).as(TestOptions.class);
+ ValueProvider<String> provider = options.getBar();
+ assertEquals("baz", provider.get());
+ assertTrue(provider.isAccessible());
+ }
+
+ @Test
+ public void testStaticValueProvider() {
+ ValueProvider<String> provider = StaticValueProvider.of("foo");
+ assertEquals("foo", provider.get());
+ assertTrue(provider.isAccessible());
+ }
+
+ @Test
+ public void testNoDefaultRuntimeProvider() {
+ TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
+ ValueProvider<String> provider = options.getFoo();
+ assertFalse(provider.isAccessible());
+
+ expectedException.expect(RuntimeException.class);
+ expectedException.expectMessage("Not called from a runtime context");
+ provider.get();
+ }
+
+ @Test
+ public void testDefaultRuntimeProvider() {
+ TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
+ ValueProvider<String> provider = options.getBar();
+ assertFalse(provider.isAccessible());
+ }
+
+ @Test
+ public void testNoDefaultRuntimeProviderWithOverride() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ TestOptions runtime = mapper.readValue(
+ "{ \"options\": { \"foo\": \"quux\" }}", PipelineOptions.class)
+ .as(TestOptions.class);
+
+ TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
+ runtime.setOptionsId(options.getOptionsId());
+ RuntimeValueProvider.setRuntimeOptions(runtime);
+
+ ValueProvider<String> provider = options.getFoo();
+ assertTrue(provider.isAccessible());
+ assertEquals("quux", provider.get());
+ }
+
+ @Test
+ public void testDefaultRuntimeProviderWithOverride() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ TestOptions runtime = mapper.readValue(
+ "{ \"options\": { \"bar\": \"quux\" }}", PipelineOptions.class)
+ .as(TestOptions.class);
+
+ TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
+ runtime.setOptionsId(options.getOptionsId());
+ RuntimeValueProvider.setRuntimeOptions(runtime);
+
+ ValueProvider<String> provider = options.getBar();
+ assertTrue(provider.isAccessible());
+ assertEquals("quux", provider.get());
+ }
+
+ /** A test interface. */
+ public static interface BadOptionsRuntime extends PipelineOptions {
+ RuntimeValueProvider<String> getBar();
+ void setBar(RuntimeValueProvider<String> bar);
+ }
+
+ @Test
+ public void testOptionReturnTypeRuntime() {
+ BadOptionsRuntime options = PipelineOptionsFactory.as(BadOptionsRuntime.class);
+ expectedException.expect(RuntimeException.class);
+ expectedException.expectMessage(
+ "Method getBar should not have return type "
+ + "RuntimeValueProvider, use ValueProvider instead.");
+ RuntimeValueProvider<String> provider = options.getBar();
+ }
+
+ /** A test interface. */
+ public static interface BadOptionsStatic extends PipelineOptions {
+ StaticValueProvider<String> getBar();
+ void setBar(StaticValueProvider<String> bar);
+ }
+
+ @Test
+ public void testOptionReturnTypeStatic() {
+ BadOptionsStatic options = PipelineOptionsFactory.as(BadOptionsStatic.class);
+ expectedException.expect(RuntimeException.class);
+ expectedException.expectMessage(
+ "Method getBar should not have return type "
+ + "StaticValueProvider, use ValueProvider instead.");
+ StaticValueProvider<String> provider = options.getBar();
+ }
+
+ @Test
+ public void testSerializeDeserializeNoArg() throws Exception {
+ TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class);
+ assertFalse(submitOptions.getFoo().isAccessible());
+ ObjectMapper mapper = new ObjectMapper();
+ String serializedOptions = mapper.writeValueAsString(submitOptions);
+
+ // This is the expected behavior of the runner: deserialize and set the
+ // the runtime options.
+ String anchor = "\"appName\":\"ValueProviderTest\"";
+ assertThat(serializedOptions, containsString("\"foo\":null"));
+ String runnerString = serializedOptions.replaceAll(
+ "\"foo\":null", "\"foo\":\"quux\"");
+ TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class)
+ .as(TestOptions.class);
+
+ ValueProvider<String> vp = runtime.getFoo();
+ assertTrue(vp.isAccessible());
+ assertEquals("quux", vp.get());
+ assertEquals(vp.getClass(), StaticValueProvider.class);
+ }
+
+ @Test
+ public void testSerializeDeserializeWithArg() throws Exception {
+ TestOptions submitOptions = PipelineOptionsFactory.fromArgs(
+ new String[]{"--foo=baz"}).as(TestOptions.class);
+ assertEquals("baz", submitOptions.getFoo().get());
+ assertTrue(submitOptions.getFoo().isAccessible());
+ ObjectMapper mapper = new ObjectMapper();
+ String serializedOptions = mapper.writeValueAsString(submitOptions);
+
+ // This is the expected behavior of the runner: deserialize and set the
+ // the runtime options.
+ assertThat(serializedOptions, containsString("baz"));
+ String runnerString = serializedOptions.replaceAll("baz", "quux");
+ TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class)
+ .as(TestOptions.class);
+
+ ValueProvider<String> vp = runtime.getFoo();
+ assertTrue(vp.isAccessible());
+ assertEquals("quux", vp.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
index ea771b4..92dcbb8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
@@ -89,6 +89,9 @@ public class ApiSurfaceTest {
inPackage("com.google.rpc"),
inPackage("com.google.type"),
inPackage("com.fasterxml.jackson.annotation"),
+ inPackage("com.fasterxml.jackson.core"),
+ inPackage("com.fasterxml.jackson.databind"),
+ inPackage("com.fasterxml.jackson.deser"),
inPackage("io.grpc"),
inPackage("org.apache.avro"),
inPackage("org.apache.commons.logging"), // via BigTable
[2/2] incubator-beam git commit: Closes #1009
Posted by dh...@apache.org.
Closes #1009
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5bfeb958
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5bfeb958
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5bfeb958
Branch: refs/heads/master
Commit: 5bfeb958de40e07a5a2af52b1b039f1dfa06cfc1
Parents: 93d2e37 66686e6
Author: Dan Halperin <dh...@google.com>
Authored: Thu Oct 13 22:38:59 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Oct 13 22:38:59 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/options/PipelineOptions.java | 23 ++
.../sdk/options/PipelineOptionsFactory.java | 15 +-
.../sdk/options/ProxyInvocationHandler.java | 94 +++++---
.../apache/beam/sdk/options/ValueProvider.java | 228 +++++++++++++++++++
.../sdk/options/ProxyInvocationHandlerTest.java | 12 +-
.../beam/sdk/options/ValueProviderTest.java | 213 +++++++++++++++++
.../apache/beam/sdk/util/ApiSurfaceTest.java | 3 +
7 files changed, 552 insertions(+), 36 deletions(-)
----------------------------------------------------------------------