You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:00 UTC
[36/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java
deleted file mode 100644
index 527f712..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.JsonIgnorePredicate;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.Registration;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.util.common.ReflectHelpers;
-import com.google.common.base.Defaults;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ClassToInstanceMap;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.MutableClassToInstanceMap;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import java.beans.PropertyDescriptor;
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Represents and {@link InvocationHandler} for a {@link Proxy}. The invocation handler uses bean
- * introspection of the proxy class to store and retrieve values based off of the property name.
- *
- * <p>Unset properties use the {@code @Default} metadata on the getter to return values. If there
- * is no {@code @Default} annotation on the getter, then a <a
- * href="https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html">default</a> as
- * per the Java Language Specification for the expected return type is returned.
- *
- * <p>In addition to the getter/setter pairs, this proxy invocation handler supports
- * {@link Object#equals(Object)}, {@link Object#hashCode()}, {@link Object#toString()} and
- * {@link PipelineOptions#as(Class)}.
- */
-@ThreadSafe
-class ProxyInvocationHandler implements InvocationHandler {
- private static final ObjectMapper MAPPER = new ObjectMapper();
- /**
- * No two instances of this class are considered equivalent hence we generate a random hash code
- * between 0 and {@link Integer#MAX_VALUE}.
- */
- private final int hashCode = (int) (Math.random() * Integer.MAX_VALUE);
- private final Set<Class<? extends PipelineOptions>> knownInterfaces;
- private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
- private final Map<String, Object> options;
- private final Map<String, JsonNode> jsonOptions;
- private final Map<String, String> gettersToPropertyNames;
- private final Map<String, String> settersToPropertyNames;
-
- ProxyInvocationHandler(Map<String, Object> options) {
- this(options, Maps.<String, JsonNode>newHashMap());
- }
-
- private ProxyInvocationHandler(Map<String, Object> options, Map<String, JsonNode> jsonOptions) {
- this.options = options;
- this.jsonOptions = jsonOptions;
- this.knownInterfaces = new HashSet<>(PipelineOptionsFactory.getRegisteredOptions());
- gettersToPropertyNames = Maps.newHashMap();
- settersToPropertyNames = Maps.newHashMap();
- interfaceToProxyCache = MutableClassToInstanceMap.create();
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) {
- if (args == null && "toString".equals(method.getName())) {
- return toString();
- } else if (args != null && args.length == 1 && "equals".equals(method.getName())) {
- return equals(args[0]);
- } else if (args == null && "hashCode".equals(method.getName())) {
- return hashCode();
- } else if (args != null && "as".equals(method.getName()) && args[0] instanceof Class) {
- @SuppressWarnings("unchecked")
- Class<? extends PipelineOptions> clazz = (Class<? extends PipelineOptions>) args[0];
- return as(clazz);
- } else if (args != null && "cloneAs".equals(method.getName()) && args[0] instanceof Class) {
- @SuppressWarnings("unchecked")
- Class<? extends PipelineOptions> clazz = (Class<? extends PipelineOptions>) args[0];
- return cloneAs(proxy, clazz);
- }
- String methodName = method.getName();
- synchronized (this) {
- if (gettersToPropertyNames.keySet().contains(methodName)) {
- String propertyName = gettersToPropertyNames.get(methodName);
- if (!options.containsKey(propertyName)) {
- // Lazy bind the default to the method.
- Object value = jsonOptions.containsKey(propertyName)
- ? getValueFromJson(propertyName, method)
- : getDefault((PipelineOptions) proxy, method);
- options.put(propertyName, value);
- }
- return options.get(propertyName);
- } else if (settersToPropertyNames.containsKey(methodName)) {
- options.put(settersToPropertyNames.get(methodName), args[0]);
- return Void.TYPE;
- }
- }
- throw new RuntimeException("Unknown method [" + method + "] invoked with args ["
- + Arrays.toString(args) + "].");
- }
-
- /**
- * Backing implementation for {@link PipelineOptions#as(Class)}.
- *
- * @param iface The interface that the returned object needs to implement.
- * @return An object that implements the interface <T>.
- */
- synchronized <T extends PipelineOptions> T as(Class<T> iface) {
- Preconditions.checkNotNull(iface);
- Preconditions.checkArgument(iface.isInterface());
- if (!interfaceToProxyCache.containsKey(iface)) {
- Registration<T> registration =
- PipelineOptionsFactory.validateWellFormed(iface, knownInterfaces);
- List<PropertyDescriptor> propertyDescriptors = registration.getPropertyDescriptors();
- Class<T> proxyClass = registration.getProxyClass();
- gettersToPropertyNames.putAll(generateGettersToPropertyNames(propertyDescriptors));
- settersToPropertyNames.putAll(generateSettersToPropertyNames(propertyDescriptors));
- knownInterfaces.add(iface);
- interfaceToProxyCache.putInstance(iface,
- InstanceBuilder.ofType(proxyClass)
- .fromClass(proxyClass)
- .withArg(InvocationHandler.class, this)
- .build());
- }
- return interfaceToProxyCache.getInstance(iface);
- }
-
- /**
- * Backing implementation for {@link PipelineOptions#cloneAs(Class)}.
- *
- * @return A copy of the PipelineOptions.
- */
- synchronized <T extends PipelineOptions> T cloneAs(Object proxy, Class<T> iface) {
- PipelineOptions clonedOptions;
- try {
- clonedOptions = MAPPER.readValue(MAPPER.writeValueAsBytes(proxy), PipelineOptions.class);
- } catch (IOException e) {
- throw new IllegalStateException("Failed to serialize the pipeline options to JSON.", e);
- }
- for (Class<? extends PipelineOptions> knownIface : knownInterfaces) {
- clonedOptions.as(knownIface);
- }
- return clonedOptions.as(iface);
- }
-
- /**
- * Returns true if the other object is a ProxyInvocationHandler or is a Proxy object and has the
- * same ProxyInvocationHandler as this.
- *
- * @param obj The object to compare against this.
- * @return true iff the other object is a ProxyInvocationHandler or is a Proxy object and has the
- * same ProxyInvocationHandler as this.
- */
- @Override
- public boolean equals(Object obj) {
- return obj != null && ((obj instanceof ProxyInvocationHandler && this == obj)
- || (Proxy.isProxyClass(obj.getClass()) && this == Proxy.getInvocationHandler(obj)));
- }
-
- /**
- * Each instance of this ProxyInvocationHandler is unique and has a random hash code.
- *
- * @return A hash code that was generated randomly.
- */
- @Override
- public int hashCode() {
- return hashCode;
- }
-
- /**
- * This will output all the currently set values. This is a relatively costly function
- * as it will call {@code toString()} on each object that has been set and format
- * the results in a readable format.
- *
- * @return A pretty printed string representation of this.
- */
- @Override
- public synchronized String toString() {
- SortedMap<String, Object> sortedOptions = new TreeMap<>();
- // Add the options that we received from deserialization
- sortedOptions.putAll(jsonOptions);
- // Override with any programmatically set options.
- sortedOptions.putAll(options);
-
- StringBuilder b = new StringBuilder();
- b.append("Current Settings:\n");
- for (Map.Entry<String, Object> entry : sortedOptions.entrySet()) {
- b.append(" " + entry.getKey() + ": " + entry.getValue() + "\n");
- }
- return b.toString();
- }
-
- /**
- * Uses a Jackson {@link ObjectMapper} to attempt type conversion.
- *
- * @param method The method whose return type you would like to return.
- * @param propertyName The name of the property that is being returned.
- * @return An object matching the return type of the method passed in.
- */
- private Object getValueFromJson(String propertyName, Method method) {
- try {
- JavaType type = MAPPER.getTypeFactory().constructType(method.getGenericReturnType());
- JsonNode jsonNode = jsonOptions.get(propertyName);
- return MAPPER.readValue(jsonNode.toString(), type);
- } catch (IOException e) {
- throw new RuntimeException("Unable to parse representation", e);
- }
- }
-
- /**
- * Returns a default value for the method based upon {@code @Default} metadata on the getter
- * to return values. If there is no {@code @Default} annotation on the getter, then a <a
- * href="https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html">default</a> as
- * per the Java Language Specification for the expected return type is returned.
- *
- * @param proxy The proxy object for which we are attempting to get the default.
- * @param method The getter method that was invoked.
- * @return The default value from an {@link Default} annotation if present, otherwise a default
- * value as per the Java Language Specification.
- */
- @SuppressWarnings({"unchecked", "rawtypes"})
- private Object getDefault(PipelineOptions proxy, Method method) {
- for (Annotation annotation : method.getAnnotations()) {
- if (annotation instanceof Default.Class) {
- return ((Default.Class) annotation).value();
- } else if (annotation instanceof Default.String) {
- return ((Default.String) annotation).value();
- } else if (annotation instanceof Default.Boolean) {
- return ((Default.Boolean) annotation).value();
- } else if (annotation instanceof Default.Character) {
- return ((Default.Character) annotation).value();
- } else if (annotation instanceof Default.Byte) {
- return ((Default.Byte) annotation).value();
- } else if (annotation instanceof Default.Short) {
- return ((Default.Short) annotation).value();
- } else if (annotation instanceof Default.Integer) {
- return ((Default.Integer) annotation).value();
- } else if (annotation instanceof Default.Long) {
- return ((Default.Long) annotation).value();
- } else if (annotation instanceof Default.Float) {
- return ((Default.Float) annotation).value();
- } else if (annotation instanceof Default.Double) {
- return ((Default.Double) annotation).value();
- } else if (annotation instanceof Default.Enum) {
- return Enum.valueOf((Class<Enum>) method.getReturnType(),
- ((Default.Enum) annotation).value());
- } else if (annotation instanceof Default.InstanceFactory) {
- return InstanceBuilder.ofType(((Default.InstanceFactory) annotation).value())
- .build()
- .create(proxy);
- }
- }
-
- /*
- * We need to make sure that we return something appropriate for the return type. Thus we return
- * a default value as defined by the JLS.
- */
- return Defaults.defaultValue(method.getReturnType());
- }
-
- /**
- * Returns a map from the getters method name to the name of the property based upon the passed in
- * {@link PropertyDescriptor}s property descriptors.
- *
- * @param propertyDescriptors A list of {@link PropertyDescriptor}s to use when generating the
- * map.
- * @return A map of getter method name to property name.
- */
- private static Map<String, String> generateGettersToPropertyNames(
- List<PropertyDescriptor> propertyDescriptors) {
- ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
- for (PropertyDescriptor descriptor : propertyDescriptors) {
- if (descriptor.getReadMethod() != null) {
- builder.put(descriptor.getReadMethod().getName(), descriptor.getName());
- }
- }
- return builder.build();
- }
-
- /**
- * Returns a map from the setters method name to its matching getters method name based upon the
- * passed in {@link PropertyDescriptor}s property descriptors.
- *
- * @param propertyDescriptors A list of {@link PropertyDescriptor}s to use when generating the
- * map.
- * @return A map of setter method name to getter method name.
- */
- private static Map<String, String> generateSettersToPropertyNames(
- List<PropertyDescriptor> propertyDescriptors) {
- ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
- for (PropertyDescriptor descriptor : propertyDescriptors) {
- if (descriptor.getWriteMethod() != null) {
- builder.put(descriptor.getWriteMethod().getName(), descriptor.getName());
- }
- }
- return builder.build();
- }
-
- static class Serializer extends JsonSerializer<PipelineOptions> {
- @Override
- public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvider provider)
- throws IOException, JsonProcessingException {
- ProxyInvocationHandler handler = (ProxyInvocationHandler) Proxy.getInvocationHandler(value);
- synchronized (handler) {
- // We first filter out any properties that have been modified since
- // the last serialization of this PipelineOptions and then verify that
- // they are all serializable.
- Map<String, Object> filteredOptions = Maps.newHashMap(handler.options);
- removeIgnoredOptions(handler.knownInterfaces, filteredOptions);
- ensureSerializable(handler.knownInterfaces, filteredOptions);
-
- // Now we create the map of serializable options by taking the original
- // set of serialized options (if any) and updating them with any properties
- // instances that have been modified since the previous serialization.
- Map<String, Object> serializableOptions =
- Maps.<String, Object>newHashMap(handler.jsonOptions);
- serializableOptions.putAll(filteredOptions);
- jgen.writeStartObject();
- jgen.writeFieldName("options");
- jgen.writeObject(serializableOptions);
- jgen.writeEndObject();
- }
- }
-
- /**
- * We remove all properties within the passed in options where there getter is annotated with
- * {@link JsonIgnore @JsonIgnore} from the passed in options using the passed in interfaces.
- */
- private void removeIgnoredOptions(
- Set<Class<? extends PipelineOptions>> interfaces, Map<String, Object> options) {
- // Find all the method names that are annotated with JSON ignore.
- Set<String> jsonIgnoreMethodNames = FluentIterable.from(
- ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces))
- .filter(JsonIgnorePredicate.INSTANCE).transform(new Function<Method, String>() {
- @Override
- public String apply(Method input) {
- return input.getName();
- }
- }).toSet();
-
- // Remove all options that have the same method name as the descriptor.
- for (PropertyDescriptor descriptor
- : PipelineOptionsFactory.getPropertyDescriptors(interfaces)) {
- if (jsonIgnoreMethodNames.contains(descriptor.getReadMethod().getName())) {
- options.remove(descriptor.getName());
- }
- }
- }
-
- /**
- * We use an {@link ObjectMapper} to verify that the passed in options are serializable
- * and deserializable.
- */
- private void ensureSerializable(Set<Class<? extends PipelineOptions>> interfaces,
- Map<String, Object> options) throws IOException {
- // Construct a map from property name to the return type of the getter.
- Map<String, Type> propertyToReturnType = Maps.newHashMap();
- for (PropertyDescriptor descriptor
- : PipelineOptionsFactory.getPropertyDescriptors(interfaces)) {
- if (descriptor.getReadMethod() != null) {
- propertyToReturnType.put(descriptor.getName(),
- descriptor.getReadMethod().getGenericReturnType());
- }
- }
-
- // Attempt to serialize and deserialize each property.
- for (Map.Entry<String, Object> entry : options.entrySet()) {
- try {
- String serializedValue = MAPPER.writeValueAsString(entry.getValue());
- JavaType type = MAPPER.getTypeFactory()
- .constructType(propertyToReturnType.get(entry.getKey()));
- MAPPER.readValue(serializedValue, type);
- } catch (Exception e) {
- throw new IOException(String.format(
- "Failed to serialize and deserialize property '%s' with value '%s'",
- entry.getKey(), entry.getValue()), e);
- }
- }
- }
- }
-
- static class Deserializer extends JsonDeserializer<PipelineOptions> {
- @Override
- public PipelineOptions deserialize(JsonParser jp, DeserializationContext ctxt)
- throws IOException, JsonProcessingException {
- ObjectNode objectNode = (ObjectNode) jp.readValueAsTree();
- ObjectNode optionsNode = (ObjectNode) objectNode.get("options");
-
- Map<String, JsonNode> fields = Maps.newHashMap();
- for (Iterator<Map.Entry<String, JsonNode>> iterator = optionsNode.fields();
- iterator.hasNext(); ) {
- Map.Entry<String, JsonNode> field = iterator.next();
- fields.put(field.getKey(), field.getValue());
- }
- PipelineOptions options =
- new ProxyInvocationHandler(Maps.<String, Object>newHashMap(), fields)
- .as(PipelineOptions.class);
- return options;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java
deleted file mode 100644
index 9563c58..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Options used to configure streaming.
- */
-public interface StreamingOptions extends
- ApplicationNameOptions, GcpOptions, PipelineOptions {
- /**
- * Set to true if running a streaming pipeline.
- */
- @Description("Set to true if running a streaming pipeline.")
- boolean isStreaming();
- void setStreaming(boolean value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java
deleted file mode 100644
index 20034f8..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * {@link Validation} represents a set of annotations that can be used to annotate getter
- * properties on {@link PipelineOptions} with information representing the validation criteria to
- * be used when validating with the {@link PipelineOptionsValidator}.
- */
-public @interface Validation {
- /**
- * This criteria specifies that the value must be not null. Note that this annotation
- * should only be applied to methods that return nullable objects.
- */
- @Target(value = ElementType.METHOD)
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface Required {
- /**
- * The groups that the annotated attribute is a member of. A member can be in 0 or more groups.
- * Members not in any groups are considered to be in a group consisting exclusively of
- * themselves. At least one member of a group must be non-null if the options are to be valid.
- */
- String[] groups() default {};
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java
deleted file mode 100644
index cef995f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines {@link com.google.cloud.dataflow.sdk.options.PipelineOptions} for
- * configuring pipeline execution.
- *
- * <p>{@link com.google.cloud.dataflow.sdk.options.PipelineOptions} encapsulates the various
- * parameters that describe how a pipeline should be run. {@code PipelineOptions} are created
- * using a {@link com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory}.
- */
-package com.google.cloud.dataflow.sdk.options;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/package-info.java
deleted file mode 100644
index 5567f03..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/package-info.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Provides a simple, powerful model for building both batch and
- * streaming parallel data processing
- * {@link com.google.cloud.dataflow.sdk.Pipeline}s.
- *
- * <p>To use the Google Cloud Dataflow SDK, you build a
- * {@link com.google.cloud.dataflow.sdk.Pipeline}, which manages a graph of
- * {@link com.google.cloud.dataflow.sdk.transforms.PTransform}s
- * and the {@link com.google.cloud.dataflow.sdk.values.PCollection}s that
- * the PTransforms consume and produce.
- *
- * <p>Each Pipeline has a
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner} to specify
- * where and how it should run after pipeline construction is complete.
- *
- */
-package com.google.cloud.dataflow.sdk;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java
deleted file mode 100644
index ab87f2e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AggregatorRetriever;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map} of
- * {@link Aggregator} to the {@link PTransform PTransforms} in which it is present.
- */
-public class AggregatorPipelineExtractor {
- private final Pipeline pipeline;
-
- /**
- * Creates an {@code AggregatorPipelineExtractor} for the given {@link Pipeline}.
- */
- public AggregatorPipelineExtractor(Pipeline pipeline) {
- this.pipeline = pipeline;
- }
-
- /**
- * Returns a {@link Map} between each {@link Aggregator} in the {@link Pipeline} to the {@link
- * PTransform PTransforms} in which it is used.
- */
- public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
- HashMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps = HashMultimap.create();
- pipeline.traverseTopologically(new AggregatorVisitor(aggregatorSteps));
- return aggregatorSteps.asMap();
- }
-
- private static class AggregatorVisitor implements PipelineVisitor {
- private final SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps;
-
- public AggregatorVisitor(SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps) {
- this.aggregatorSteps = aggregatorSteps;
- }
-
- @Override
- public void enterCompositeTransform(TransformTreeNode node) {}
-
- @Override
- public void leaveCompositeTransform(TransformTreeNode node) {}
-
- @Override
- public void visitTransform(TransformTreeNode node) {
- PTransform<?, ?> transform = node.getTransform();
- addStepToAggregators(transform, getAggregators(transform));
- }
-
- private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
- if (transform != null) {
- if (transform instanceof ParDo.Bound) {
- return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn());
- } else if (transform instanceof ParDo.BoundMulti) {
- return AggregatorRetriever.getAggregators(((ParDo.BoundMulti<?, ?>) transform).getFn());
- }
- }
- return Collections.emptyList();
- }
-
- private void addStepToAggregators(
- PTransform<?, ?> transform, Collection<Aggregator<?, ?>> aggregators) {
- for (Aggregator<?, ?> aggregator : aggregators) {
- aggregatorSteps.put(aggregator, transform);
- }
- }
-
- @Override
- public void visitValue(PValue value, TransformTreeNode producer) {}
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java
deleted file mode 100644
index 90162ad..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-
-/**
- * Signals that an exception has occurred while retrieving {@link Aggregator}s.
- */
-public class AggregatorRetrievalException extends Exception {
- /**
- * Constructs a new {@code AggregatorRetrievalException} with the specified detail message and
- * cause.
- */
- public AggregatorRetrievalException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java
deleted file mode 100644
index 21f0282..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * A collection of values associated with an {@link Aggregator}. Aggregators declared in a
- * {@link DoFn} are emitted on a per-{@code DoFn}-application basis.
- *
- * @param <T> the output type of the aggregator
- */
-public abstract class AggregatorValues<T> {
- /**
- * Get the values of the {@link Aggregator} at all steps it was used.
- */
- public Collection<T> getValues() {
- return getValuesAtSteps().values();
- }
-
- /**
- * Get the values of the {@link Aggregator} by the user name at each step it was used.
- */
- public abstract Map<String, T> getValuesAtSteps();
-
- /**
- * Get the total value of this {@link Aggregator} by applying the specified {@link CombineFn}.
- */
- public T getTotalValue(CombineFn<T, ?, T> combineFn) {
- return combineFn.apply(getValues());
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
deleted file mode 100644
index 95e3dfe..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link PipelineRunner} that's like {@link DataflowPipelineRunner}
- * but that waits for the launched job to finish.
- *
- * <p>Prints out job status updates and console messages while it waits.
- *
- * <p>Returns the final job state, or throws an exception if the job
- * fails or cannot be monitored.
- *
- * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code BlockingDataflowPipelineRunner}, the Google cloud services account and the Google compute
- * engine service account of the GCP project running the Dataflow Job will need access to the
- * corresponding source/sink.
- *
- * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
- * Dataflow Security and Permissions</a> for more details.
- */
-public class BlockingDataflowPipelineRunner extends
- PipelineRunner<DataflowPipelineJob> {
- private static final Logger LOG = LoggerFactory.getLogger(BlockingDataflowPipelineRunner.class);
-
- // Defaults to an infinite wait period.
- // TODO: make this configurable after removal of option map.
- private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L;
-
- private final DataflowPipelineRunner dataflowPipelineRunner;
- private final BlockingDataflowPipelineOptions options;
-
- protected BlockingDataflowPipelineRunner(
- DataflowPipelineRunner internalRunner,
- BlockingDataflowPipelineOptions options) {
- this.dataflowPipelineRunner = internalRunner;
- this.options = options;
- }
-
- /**
- * Constructs a runner from the provided options.
- */
- public static BlockingDataflowPipelineRunner fromOptions(
- PipelineOptions options) {
- BlockingDataflowPipelineOptions dataflowOptions =
- PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, options);
- DataflowPipelineRunner dataflowPipelineRunner =
- DataflowPipelineRunner.fromOptions(dataflowOptions);
-
- return new BlockingDataflowPipelineRunner(dataflowPipelineRunner, dataflowOptions);
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws DataflowJobExecutionException if there is an exception during job execution.
- * @throws DataflowServiceException if there is an exception retrieving information about the job.
- */
- @Override
- public DataflowPipelineJob run(Pipeline p) {
- final DataflowPipelineJob job = dataflowPipelineRunner.run(p);
-
- // We ignore the potential race condition here (Ctrl-C after job submission but before the
- // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
- // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
- // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
- // etc. If the user wants to verify the job was cancelled they should look at the job status.
- Thread shutdownHook = new Thread() {
- @Override
- public void run() {
- LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
- + "To cancel the job in the cloud, run:\n> {}",
- MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
- }
- };
-
- try {
- Runtime.getRuntime().addShutdownHook(shutdownHook);
-
- @Nullable
- State result;
- try {
- result = job.waitToFinish(
- BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
- new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
- } catch (IOException | InterruptedException ex) {
- LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
- throw new DataflowServiceException(
- job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
- }
-
- if (result == null) {
- throw new DataflowServiceException(
- job, "Timed out while retrieving status for job " + job.getJobId());
- }
-
- LOG.info("Job finished with status {}", result);
- if (!result.isTerminal()) {
- throw new IllegalStateException("Expected terminal state for job " + job.getJobId()
- + ", got " + result);
- }
-
- if (result == State.DONE) {
- return job;
- } else if (result == State.UPDATED) {
- DataflowPipelineJob newJob = job.getReplacedByJob();
- LOG.info("Job {} has been updated and is running as the new job with id {}."
- + "To access the updated job on the Dataflow monitoring console, please navigate to {}",
- job.getJobId(),
- newJob.getJobId(),
- MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId()));
- throw new DataflowJobUpdatedException(
- job,
- String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()),
- newJob);
- } else if (result == State.CANCELLED) {
- String message = String.format("Job %s cancelled by user", job.getJobId());
- LOG.info(message);
- throw new DataflowJobCancelledException(job, message);
- } else {
- throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
- + " failed with status " + result);
- }
- } finally {
- Runtime.getRuntime().removeShutdownHook(shutdownHook);
- }
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- return dataflowPipelineRunner.apply(transform, input);
- }
-
- /**
- * Sets callbacks to invoke during execution. See {@link DataflowPipelineRunnerHooks}.
- */
- @Experimental
- public void setHooks(DataflowPipelineRunnerHooks hooks) {
- this.dataflowPipelineRunner.setHooks(hooks);
- }
-
- @Override
- public String toString() {
- return "BlockingDataflowPipelineRunner#" + options.getJobName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
deleted file mode 100644
index 1547f73..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the unique job name constraint of the Dataflow
- * service is broken because an existing job with the same job name is currently active.
- * The {@link DataflowPipelineJob} contained within this exception contains information
- * about the pre-existing job.
- */
-public class DataflowJobAlreadyExistsException extends DataflowJobException {
- /**
- * Create a new {@code DataflowJobAlreadyExistsException} with the specified {@link
- * DataflowPipelineJob} and message.
- */
- public DataflowJobAlreadyExistsException(
- DataflowPipelineJob job, String message) {
- super(job, message, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
deleted file mode 100644
index d4ae4f5..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the existing job has already been updated within the Dataflow
- * service and is no longer able to be updated. The {@link DataflowPipelineJob} contained within
- * this exception contains information about the pre-existing updated job.
- */
-public class DataflowJobAlreadyUpdatedException extends DataflowJobException {
- /**
- * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
- * DataflowPipelineJob} and message.
- */
- public DataflowJobAlreadyUpdatedException(
- DataflowPipelineJob job, String message) {
- super(job, message, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
deleted file mode 100644
index 0d31726..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
- */
-public class DataflowJobCancelledException extends DataflowJobException {
- /**
- * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
- * DataflowPipelineJob} and message.
- */
- public DataflowJobCancelledException(DataflowPipelineJob job, String message) {
- super(job, message, null);
- }
-
- /**
- * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
- * DataflowPipelineJob}, message, and cause.
- */
- public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) {
- super(job, message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
deleted file mode 100644
index 9e305d5..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link RuntimeException} that contains information about a {@link DataflowPipelineJob}.
- */
-public abstract class DataflowJobException extends RuntimeException {
- private final DataflowPipelineJob job;
-
- DataflowJobException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
- super(message, cause);
- this.job = Objects.requireNonNull(job);
- }
-
- /**
- * Returns the failed job.
- */
- public DataflowPipelineJob getJob() {
- return job;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
deleted file mode 100644
index ae6df0f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import javax.annotation.Nullable;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} fails during execution, and
- * provides access to the failed job.
- */
-public class DataflowJobExecutionException extends DataflowJobException {
- DataflowJobExecutionException(DataflowPipelineJob job, String message) {
- this(job, message, null);
- }
-
- DataflowJobExecutionException(
- DataflowPipelineJob job, String message, @Nullable Throwable cause) {
- super(job, message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
deleted file mode 100644
index 1becdd7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
- */
-public class DataflowJobUpdatedException extends DataflowJobException {
- private DataflowPipelineJob replacedByJob;
-
- /**
- * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
- * DataflowPipelineJob}, message, and replacement {@link DataflowPipelineJob}.
- */
- public DataflowJobUpdatedException(
- DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob) {
- this(job, message, replacedByJob, null);
- }
-
- /**
- * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
- * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, and cause.
- */
- public DataflowJobUpdatedException(
- DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob, Throwable cause) {
- super(job, message, cause);
- this.replacedByJob = replacedByJob;
- }
-
- /**
- * The new job that replaces the job terminated with this exception.
- */
- public DataflowPipelineJob getReplacedByJob() {
- return replacedByJob;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
deleted file mode 100644
index 5a78624..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-/**
- * A {@link DataflowPipeline} is a {@link Pipeline} that returns a
- * {@link DataflowPipelineJob} when it is
- * {@link com.google.cloud.dataflow.sdk.Pipeline#run()}.
- *
- * <p>This is not intended for use by users of Cloud Dataflow.
- * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a
- * {@link Pipeline}.
- */
-public class DataflowPipeline extends Pipeline {
-
- /**
- * Creates and returns a new {@link DataflowPipeline} instance for tests.
- */
- public static DataflowPipeline create(DataflowPipelineOptions options) {
- return new DataflowPipeline(options);
- }
-
- private DataflowPipeline(DataflowPipelineOptions options) {
- super(DataflowPipelineRunner.fromOptions(options), options);
- }
-
- @Override
- public DataflowPipelineJob run() {
- return (DataflowPipelineJob) super.run();
- }
-
- @Override
- public DataflowPipelineRunner getRunner() {
- return (DataflowPipelineRunner) super.getRunner();
- }
-
- @Override
- public String toString() {
- return "DataflowPipeline#" + getOptions().as(DataflowPipelineOptions.class).getJobName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
deleted file mode 100644
index e9f134c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
-import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowMetricUpdateExtractor;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
-import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
-import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A DataflowPipelineJob represents a job submitted to Dataflow using
- * {@link DataflowPipelineRunner}.
- */
-public class DataflowPipelineJob implements PipelineResult {
- private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
-
- /**
- * The id for the job.
- */
- private String jobId;
-
- /**
- * Google cloud project to associate this pipeline with.
- */
- private String projectId;
-
- /**
- * Client for the Dataflow service. This can be used to query the service
- * for information about the job.
- */
- private Dataflow dataflowClient;
-
- /**
- * The state the job terminated in or {@code null} if the job has not terminated.
- */
- @Nullable
- private State terminalState = null;
-
- /**
- * The job that replaced this one or {@code null} if the job has not been replaced.
- */
- @Nullable
- private DataflowPipelineJob replacedByJob = null;
-
- private DataflowAggregatorTransforms aggregatorTransforms;
-
- /**
- * The Metric Updates retrieved after the job was in a terminal state.
- */
- private List<MetricUpdate> terminalMetricUpdates;
-
- /**
- * The polling interval for job status and messages information.
- */
- static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
- static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
-
- /**
- * The amount of polling attempts for job status and messages information.
- */
- static final int MESSAGES_POLLING_ATTEMPTS = 10;
- static final int STATUS_POLLING_ATTEMPTS = 5;
-
- /**
- * Constructs the job.
- *
- * @param projectId the project id
- * @param jobId the job id
- * @param dataflowClient the client for the Dataflow Service
- */
- public DataflowPipelineJob(String projectId, String jobId, Dataflow dataflowClient,
- DataflowAggregatorTransforms aggregatorTransforms) {
- this.projectId = projectId;
- this.jobId = jobId;
- this.dataflowClient = dataflowClient;
- this.aggregatorTransforms = aggregatorTransforms;
- }
-
- /**
- * Get the id of this job.
- */
- public String getJobId() {
- return jobId;
- }
-
- /**
- * Get the project this job exists in.
- */
- public String getProjectId() {
- return projectId;
- }
-
- /**
- * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable.
- *
- * @throws IllegalStateException if called before the job has terminated or if the job terminated
- * but was not updated
- */
- public DataflowPipelineJob getReplacedByJob() {
- if (terminalState == null) {
- throw new IllegalStateException("getReplacedByJob() called before job terminated");
- }
- if (replacedByJob == null) {
- throw new IllegalStateException("getReplacedByJob() called for job that was not replaced");
- }
- return replacedByJob;
- }
-
- /**
- * Get the Cloud Dataflow API Client used by this job.
- */
- public Dataflow getDataflowClient() {
- return dataflowClient;
- }
-
- /**
- * Waits for the job to finish and return the final status.
- *
- * @param timeToWait The time to wait in units timeUnit for the job to finish.
- * Provide a value less than 1 ms for an infinite wait.
- * @param timeUnit The unit of time for timeToWait.
- * @param messageHandler If non null this handler will be invoked for each
- * batch of messages received.
- * @return The final state of the job or null on timeout or if the
- * thread is interrupted.
- * @throws IOException If there is a persistent problem getting job
- * information.
- * @throws InterruptedException
- */
- @Nullable
- public State waitToFinish(
- long timeToWait,
- TimeUnit timeUnit,
- MonitoringUtil.JobMessagesHandler messageHandler)
- throws IOException, InterruptedException {
- return waitToFinish(timeToWait, timeUnit, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
- }
-
- /**
- * Wait for the job to finish and return the final status.
- *
- * @param timeToWait The time to wait in units timeUnit for the job to finish.
- * Provide a value less than 1 ms for an infinite wait.
- * @param timeUnit The unit of time for timeToWait.
- * @param messageHandler If non null this handler will be invoked for each
- * batch of messages received.
- * @param sleeper A sleeper to use to sleep between attempts.
- * @param nanoClock A nanoClock used to time the total time taken.
- * @return The final state of the job or null on timeout or if the
- * thread is interrupted.
- * @throws IOException If there is a persistent problem getting job
- * information.
- * @throws InterruptedException
- */
- @Nullable
- @VisibleForTesting
- State waitToFinish(
- long timeToWait,
- TimeUnit timeUnit,
- MonitoringUtil.JobMessagesHandler messageHandler,
- Sleeper sleeper,
- NanoClock nanoClock)
- throws IOException, InterruptedException {
- MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient);
-
- long lastTimestamp = 0;
- BackOff backoff =
- timeUnit.toMillis(timeToWait) > 0
- ? new AttemptAndTimeBoundedExponentialBackOff(
- MESSAGES_POLLING_ATTEMPTS,
- MESSAGES_POLLING_INTERVAL,
- timeUnit.toMillis(timeToWait),
- AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
- nanoClock)
- : new AttemptBoundedExponentialBackOff(
- MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL);
- State state;
- do {
- // Get the state of the job before listing messages. This ensures we always fetch job
- // messages after the job finishes to ensure we have all them.
- state = getStateWithRetries(1, sleeper);
- boolean hasError = state == State.UNKNOWN;
-
- if (messageHandler != null && !hasError) {
- // Process all the job messages that have accumulated so far.
- try {
- List<JobMessage> allMessages = monitor.getJobMessages(
- jobId, lastTimestamp);
-
- if (!allMessages.isEmpty()) {
- lastTimestamp =
- fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
- messageHandler.process(allMessages);
- }
- } catch (GoogleJsonResponseException | SocketTimeoutException e) {
- hasError = true;
- LOG.warn("There were problems getting current job messages: {}.", e.getMessage());
- LOG.debug("Exception information:", e);
- }
- }
-
- if (!hasError) {
- backoff.reset();
- // Check if the job is done.
- if (state.isTerminal()) {
- return state;
- }
- }
- } while(BackOffUtils.next(sleeper, backoff));
- LOG.warn("No terminal state was returned. State value {}", state);
- return null; // Timed out.
- }
-
- /**
- * Cancels the job.
- * @throws IOException if there is a problem executing the cancel request.
- */
- public void cancel() throws IOException {
- Job content = new Job();
- content.setProjectId(projectId);
- content.setId(jobId);
- content.setRequestedState("JOB_STATE_CANCELLED");
- dataflowClient.projects().jobs()
- .update(projectId, jobId, content)
- .execute();
- }
-
- @Override
- public State getState() {
- if (terminalState != null) {
- return terminalState;
- }
-
- return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT);
- }
-
- /**
- * Attempts to get the state. Uses exponential backoff on failure up to the maximum number
- * of passed in attempts.
- *
- * @param attempts The amount of attempts to make.
- * @param sleeper Object used to do the sleeps between attempts.
- * @return The state of the job or State.UNKNOWN in case of failure.
- */
- @VisibleForTesting
- State getStateWithRetries(int attempts, Sleeper sleeper) {
- if (terminalState != null) {
- return terminalState;
- }
- try {
- Job job = getJobWithRetries(attempts, sleeper);
- return MonitoringUtil.toState(job.getCurrentState());
- } catch (IOException exn) {
- // The only IOException that getJobWithRetries is permitted to throw is the final IOException
- // that caused the failure of retry. Other exceptions are wrapped in an unchecked exceptions
- // and will propagate.
- return State.UNKNOWN;
- }
- }
-
- /**
- * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the
- * maximum number of passed in attempts.
- *
- * @param attempts The amount of attempts to make.
- * @param sleeper Object used to do the sleeps between attempts.
- * @return The underlying {@link Job} object.
- * @throws IOException When the maximum number of retries is exhausted, the last exception is
- * thrown.
- */
- @VisibleForTesting
- Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException {
- AttemptBoundedExponentialBackOff backoff =
- new AttemptBoundedExponentialBackOff(attempts, STATUS_POLLING_INTERVAL);
-
- // Retry loop ends in return or throw
- while (true) {
- try {
- Job job = dataflowClient
- .projects()
- .jobs()
- .get(projectId, jobId)
- .execute();
- State currentState = MonitoringUtil.toState(job.getCurrentState());
- if (currentState.isTerminal()) {
- terminalState = currentState;
- replacedByJob = new DataflowPipelineJob(
- getProjectId(), job.getReplacedByJobId(), dataflowClient, aggregatorTransforms);
- }
- return job;
- } catch (IOException exn) {
- LOG.warn("There were problems getting current job status: {}.", exn.getMessage());
- LOG.debug("Exception information:", exn);
-
- if (!nextBackOff(sleeper, backoff)) {
- throw exn;
- }
- }
- }
- }
-
- /**
- * Identical to {@link BackOffUtils#next} but without checked exceptions.
- */
- private boolean nextBackOff(Sleeper sleeper, BackOff backoff) {
- try {
- return BackOffUtils.next(sleeper, backoff);
- } catch (InterruptedException | IOException e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
- throws AggregatorRetrievalException {
- try {
- return new MapAggregatorValues<>(fromMetricUpdates(aggregator));
- } catch (IOException e) {
- throw new AggregatorRetrievalException(
- "IOException when retrieving Aggregator values for Aggregator " + aggregator, e);
- }
- }
-
- private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator)
- throws IOException {
- if (aggregatorTransforms.contains(aggregator)) {
- List<MetricUpdate> metricUpdates;
- if (terminalMetricUpdates != null) {
- metricUpdates = terminalMetricUpdates;
- } else {
- boolean terminal = getState().isTerminal();
- JobMetrics jobMetrics =
- dataflowClient.projects().jobs().getMetrics(projectId, jobId).execute();
- metricUpdates = jobMetrics.getMetrics();
- if (terminal && jobMetrics.getMetrics() != null) {
- terminalMetricUpdates = metricUpdates;
- }
- }
-
- return DataflowMetricUpdateExtractor.fromMetricUpdates(
- aggregator, aggregatorTransforms, metricUpdates);
- } else {
- throw new IllegalArgumentException(
- "Aggregator " + aggregator + " is not used in this pipeline");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
deleted file mode 100644
index 0e4d4e9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.auto.service.AutoService;
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
- * the {@link DataflowPipeline}.
- */
-public class DataflowPipelineRegistrar {
- private DataflowPipelineRegistrar() { }
-
- /**
- * Register the {@link DataflowPipelineOptions} and {@link BlockingDataflowPipelineOptions}.
- */
- @AutoService(PipelineOptionsRegistrar.class)
- public static class Options implements PipelineOptionsRegistrar {
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(
- DataflowPipelineOptions.class,
- BlockingDataflowPipelineOptions.class);
- }
- }
-
- /**
- * Register the {@link DataflowPipelineRunner} and {@link BlockingDataflowPipelineRunner}.
- */
- @AutoService(PipelineRunnerRegistrar.class)
- public static class Runner implements PipelineRunnerRegistrar {
- @Override
- public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
- DataflowPipelineRunner.class,
- BlockingDataflowPipelineRunner.class);
- }
- }
-}