You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:00 UTC

[36/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java
deleted file mode 100644
index 527f712..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.JsonIgnorePredicate;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.Registration;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.util.common.ReflectHelpers;
-import com.google.common.base.Defaults;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ClassToInstanceMap;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.MutableClassToInstanceMap;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import java.beans.PropertyDescriptor;
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Represents and {@link InvocationHandler} for a {@link Proxy}. The invocation handler uses bean
- * introspection of the proxy class to store and retrieve values based off of the property name.
- *
- * <p>Unset properties use the {@code @Default} metadata on the getter to return values. If there
- * is no {@code @Default} annotation on the getter, then a <a
- * href="https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html">default</a> as
- * per the Java Language Specification for the expected return type is returned.
- *
- * <p>In addition to the getter/setter pairs, this proxy invocation handler supports
- * {@link Object#equals(Object)}, {@link Object#hashCode()}, {@link Object#toString()} and
- * {@link PipelineOptions#as(Class)}.
- */
-@ThreadSafe
-class ProxyInvocationHandler implements InvocationHandler {
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-  /**
-   * No two instances of this class are considered equivalent hence we generate a random hash code
-   * between 0 and {@link Integer#MAX_VALUE}.
-   */
-  private final int hashCode = (int) (Math.random() * Integer.MAX_VALUE);
-  private final Set<Class<? extends PipelineOptions>> knownInterfaces;
-  private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
-  private final Map<String, Object> options;
-  private final Map<String, JsonNode> jsonOptions;
-  private final Map<String, String> gettersToPropertyNames;
-  private final Map<String, String> settersToPropertyNames;
-
-  ProxyInvocationHandler(Map<String, Object> options) {
-    this(options, Maps.<String, JsonNode>newHashMap());
-  }
-
-  private ProxyInvocationHandler(Map<String, Object> options, Map<String, JsonNode> jsonOptions) {
-    this.options = options;
-    this.jsonOptions = jsonOptions;
-    this.knownInterfaces = new HashSet<>(PipelineOptionsFactory.getRegisteredOptions());
-    gettersToPropertyNames = Maps.newHashMap();
-    settersToPropertyNames = Maps.newHashMap();
-    interfaceToProxyCache = MutableClassToInstanceMap.create();
-  }
-
-  @Override
-  public Object invoke(Object proxy, Method method, Object[] args) {
-    if (args == null && "toString".equals(method.getName())) {
-      return toString();
-    } else if (args != null && args.length == 1 && "equals".equals(method.getName())) {
-      return equals(args[0]);
-    } else if (args == null && "hashCode".equals(method.getName())) {
-      return hashCode();
-    } else if (args != null && "as".equals(method.getName()) && args[0] instanceof Class) {
-      @SuppressWarnings("unchecked")
-      Class<? extends PipelineOptions> clazz = (Class<? extends PipelineOptions>) args[0];
-      return as(clazz);
-    } else if (args != null && "cloneAs".equals(method.getName()) && args[0] instanceof Class) {
-      @SuppressWarnings("unchecked")
-      Class<? extends PipelineOptions> clazz = (Class<? extends PipelineOptions>) args[0];
-      return cloneAs(proxy, clazz);
-    }
-    String methodName = method.getName();
-    synchronized (this) {
-      if (gettersToPropertyNames.keySet().contains(methodName)) {
-        String propertyName = gettersToPropertyNames.get(methodName);
-        if (!options.containsKey(propertyName)) {
-          // Lazy bind the default to the method.
-          Object value = jsonOptions.containsKey(propertyName)
-              ? getValueFromJson(propertyName, method)
-              : getDefault((PipelineOptions) proxy, method);
-          options.put(propertyName, value);
-        }
-        return options.get(propertyName);
-      } else if (settersToPropertyNames.containsKey(methodName)) {
-        options.put(settersToPropertyNames.get(methodName), args[0]);
-        return Void.TYPE;
-      }
-    }
-    throw new RuntimeException("Unknown method [" + method + "] invoked with args ["
-        + Arrays.toString(args) + "].");
-  }
-
-  /**
-   * Backing implementation for {@link PipelineOptions#as(Class)}.
-   *
-   * @param iface The interface that the returned object needs to implement.
-   * @return An object that implements the interface <T>.
-   */
-  synchronized <T extends PipelineOptions> T as(Class<T> iface) {
-    Preconditions.checkNotNull(iface);
-    Preconditions.checkArgument(iface.isInterface());
-    if (!interfaceToProxyCache.containsKey(iface)) {
-      Registration<T> registration =
-          PipelineOptionsFactory.validateWellFormed(iface, knownInterfaces);
-      List<PropertyDescriptor> propertyDescriptors = registration.getPropertyDescriptors();
-      Class<T> proxyClass = registration.getProxyClass();
-      gettersToPropertyNames.putAll(generateGettersToPropertyNames(propertyDescriptors));
-      settersToPropertyNames.putAll(generateSettersToPropertyNames(propertyDescriptors));
-      knownInterfaces.add(iface);
-      interfaceToProxyCache.putInstance(iface,
-          InstanceBuilder.ofType(proxyClass)
-              .fromClass(proxyClass)
-              .withArg(InvocationHandler.class, this)
-              .build());
-    }
-    return interfaceToProxyCache.getInstance(iface);
-  }
-
-  /**
-   * Backing implementation for {@link PipelineOptions#cloneAs(Class)}.
-   *
-   * @return A copy of the PipelineOptions.
-   */
-  synchronized <T extends PipelineOptions> T cloneAs(Object proxy, Class<T> iface) {
-    PipelineOptions clonedOptions;
-    try {
-      clonedOptions = MAPPER.readValue(MAPPER.writeValueAsBytes(proxy), PipelineOptions.class);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to serialize the pipeline options to JSON.", e);
-    }
-    for (Class<? extends PipelineOptions> knownIface : knownInterfaces) {
-      clonedOptions.as(knownIface);
-    }
-    return clonedOptions.as(iface);
-  }
-
-  /**
-   * Returns true if the other object is a ProxyInvocationHandler or is a Proxy object and has the
-   * same ProxyInvocationHandler as this.
-   *
-   * @param obj The object to compare against this.
-   * @return true iff the other object is a ProxyInvocationHandler or is a Proxy object and has the
-   *         same ProxyInvocationHandler as this.
-   */
-  @Override
-  public boolean equals(Object obj) {
-    return obj != null && ((obj instanceof ProxyInvocationHandler && this == obj)
-        || (Proxy.isProxyClass(obj.getClass()) && this == Proxy.getInvocationHandler(obj)));
-  }
-
-  /**
-   * Each instance of this ProxyInvocationHandler is unique and has a random hash code.
-   *
-   * @return A hash code that was generated randomly.
-   */
-  @Override
-  public int hashCode() {
-    return hashCode;
-  }
-
-  /**
-   * This will output all the currently set values. This is a relatively costly function
-   * as it will call {@code toString()} on each object that has been set and format
-   * the results in a readable format.
-   *
-   * @return A pretty printed string representation of this.
-   */
-  @Override
-  public synchronized String toString() {
-    SortedMap<String, Object> sortedOptions = new TreeMap<>();
-    // Add the options that we received from deserialization
-    sortedOptions.putAll(jsonOptions);
-    // Override with any programmatically set options.
-    sortedOptions.putAll(options);
-
-    StringBuilder b = new StringBuilder();
-    b.append("Current Settings:\n");
-    for (Map.Entry<String, Object> entry : sortedOptions.entrySet()) {
-      b.append("  " + entry.getKey() + ": " + entry.getValue() + "\n");
-    }
-    return b.toString();
-  }
-
-  /**
-   * Uses a Jackson {@link ObjectMapper} to attempt type conversion.
-   *
-   * @param method The method whose return type you would like to return.
-   * @param propertyName The name of the property that is being returned.
-   * @return An object matching the return type of the method passed in.
-   */
-  private Object getValueFromJson(String propertyName, Method method) {
-    try {
-      JavaType type = MAPPER.getTypeFactory().constructType(method.getGenericReturnType());
-      JsonNode jsonNode = jsonOptions.get(propertyName);
-      return MAPPER.readValue(jsonNode.toString(), type);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to parse representation", e);
-    }
-  }
-
-  /**
-   * Returns a default value for the method based upon {@code @Default} metadata on the getter
-   * to return values. If there is no {@code @Default} annotation on the getter, then a <a
-   * href="https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html">default</a> as
-   * per the Java Language Specification for the expected return type is returned.
-   *
-   * @param proxy The proxy object for which we are attempting to get the default.
-   * @param method The getter method that was invoked.
-   * @return The default value from an {@link Default} annotation if present, otherwise a default
-   *         value as per the Java Language Specification.
-   */
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  private Object getDefault(PipelineOptions proxy, Method method) {
-    for (Annotation annotation : method.getAnnotations()) {
-      if (annotation instanceof Default.Class) {
-        return ((Default.Class) annotation).value();
-      } else if (annotation instanceof Default.String) {
-        return ((Default.String) annotation).value();
-      } else if (annotation instanceof Default.Boolean) {
-        return ((Default.Boolean) annotation).value();
-      } else if (annotation instanceof Default.Character) {
-        return ((Default.Character) annotation).value();
-      } else if (annotation instanceof Default.Byte) {
-        return ((Default.Byte) annotation).value();
-      } else if (annotation instanceof Default.Short) {
-        return ((Default.Short) annotation).value();
-      } else if (annotation instanceof Default.Integer) {
-        return ((Default.Integer) annotation).value();
-      } else if (annotation instanceof Default.Long) {
-        return ((Default.Long) annotation).value();
-      } else if (annotation instanceof Default.Float) {
-        return ((Default.Float) annotation).value();
-      } else if (annotation instanceof Default.Double) {
-        return ((Default.Double) annotation).value();
-      } else if (annotation instanceof Default.Enum) {
-        return Enum.valueOf((Class<Enum>) method.getReturnType(),
-            ((Default.Enum) annotation).value());
-      } else if (annotation instanceof Default.InstanceFactory) {
-        return InstanceBuilder.ofType(((Default.InstanceFactory) annotation).value())
-            .build()
-            .create(proxy);
-      }
-    }
-
-    /*
-     * We need to make sure that we return something appropriate for the return type. Thus we return
-     * a default value as defined by the JLS.
-     */
-    return Defaults.defaultValue(method.getReturnType());
-  }
-
-  /**
-   * Returns a map from the getters method name to the name of the property based upon the passed in
-   * {@link PropertyDescriptor}s property descriptors.
-   *
-   * @param propertyDescriptors A list of {@link PropertyDescriptor}s to use when generating the
-   *        map.
-   * @return A map of getter method name to property name.
-   */
-  private static Map<String, String> generateGettersToPropertyNames(
-      List<PropertyDescriptor> propertyDescriptors) {
-    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
-    for (PropertyDescriptor descriptor : propertyDescriptors) {
-      if (descriptor.getReadMethod() != null) {
-        builder.put(descriptor.getReadMethod().getName(), descriptor.getName());
-      }
-    }
-    return builder.build();
-  }
-
-  /**
-   * Returns a map from the setters method name to its matching getters method name based upon the
-   * passed in {@link PropertyDescriptor}s property descriptors.
-   *
-   * @param propertyDescriptors A list of {@link PropertyDescriptor}s to use when generating the
-   *        map.
-   * @return A map of setter method name to getter method name.
-   */
-  private static Map<String, String> generateSettersToPropertyNames(
-      List<PropertyDescriptor> propertyDescriptors) {
-    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
-    for (PropertyDescriptor descriptor : propertyDescriptors) {
-      if (descriptor.getWriteMethod() != null) {
-        builder.put(descriptor.getWriteMethod().getName(), descriptor.getName());
-      }
-    }
-    return builder.build();
-  }
-
-  static class Serializer extends JsonSerializer<PipelineOptions> {
-    @Override
-    public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvider provider)
-        throws IOException, JsonProcessingException {
-      ProxyInvocationHandler handler = (ProxyInvocationHandler) Proxy.getInvocationHandler(value);
-      synchronized (handler) {
-        // We first filter out any properties that have been modified since
-        // the last serialization of this PipelineOptions and then verify that
-        // they are all serializable.
-        Map<String, Object> filteredOptions = Maps.newHashMap(handler.options);
-        removeIgnoredOptions(handler.knownInterfaces, filteredOptions);
-        ensureSerializable(handler.knownInterfaces, filteredOptions);
-
-        // Now we create the map of serializable options by taking the original
-        // set of serialized options (if any) and updating them with any properties
-        // instances that have been modified since the previous serialization.
-        Map<String, Object> serializableOptions =
-            Maps.<String, Object>newHashMap(handler.jsonOptions);
-        serializableOptions.putAll(filteredOptions);
-        jgen.writeStartObject();
-        jgen.writeFieldName("options");
-        jgen.writeObject(serializableOptions);
-        jgen.writeEndObject();
-      }
-    }
-
-    /**
-     * We remove all properties within the passed in options where there getter is annotated with
-     * {@link JsonIgnore @JsonIgnore} from the passed in options using the passed in interfaces.
-     */
-    private void removeIgnoredOptions(
-        Set<Class<? extends PipelineOptions>> interfaces, Map<String, Object> options) {
-      // Find all the method names that are annotated with JSON ignore.
-      Set<String> jsonIgnoreMethodNames = FluentIterable.from(
-          ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces))
-          .filter(JsonIgnorePredicate.INSTANCE).transform(new Function<Method, String>() {
-            @Override
-            public String apply(Method input) {
-              return input.getName();
-            }
-          }).toSet();
-
-      // Remove all options that have the same method name as the descriptor.
-      for (PropertyDescriptor descriptor
-          : PipelineOptionsFactory.getPropertyDescriptors(interfaces)) {
-        if (jsonIgnoreMethodNames.contains(descriptor.getReadMethod().getName())) {
-          options.remove(descriptor.getName());
-        }
-      }
-    }
-
-    /**
-     * We use an {@link ObjectMapper} to verify that the passed in options are serializable
-     * and deserializable.
-     */
-    private void ensureSerializable(Set<Class<? extends PipelineOptions>> interfaces,
-        Map<String, Object> options) throws IOException {
-      // Construct a map from property name to the return type of the getter.
-      Map<String, Type> propertyToReturnType = Maps.newHashMap();
-      for (PropertyDescriptor descriptor
-          : PipelineOptionsFactory.getPropertyDescriptors(interfaces)) {
-        if (descriptor.getReadMethod() != null) {
-          propertyToReturnType.put(descriptor.getName(),
-              descriptor.getReadMethod().getGenericReturnType());
-        }
-      }
-
-      // Attempt to serialize and deserialize each property.
-      for (Map.Entry<String, Object> entry : options.entrySet()) {
-        try {
-          String serializedValue = MAPPER.writeValueAsString(entry.getValue());
-          JavaType type = MAPPER.getTypeFactory()
-              .constructType(propertyToReturnType.get(entry.getKey()));
-          MAPPER.readValue(serializedValue, type);
-        } catch (Exception e) {
-          throw new IOException(String.format(
-              "Failed to serialize and deserialize property '%s' with value '%s'",
-              entry.getKey(), entry.getValue()), e);
-        }
-      }
-    }
-  }
-
-  static class Deserializer extends JsonDeserializer<PipelineOptions> {
-    @Override
-    public PipelineOptions deserialize(JsonParser jp, DeserializationContext ctxt)
-        throws IOException, JsonProcessingException {
-      ObjectNode objectNode = (ObjectNode) jp.readValueAsTree();
-      ObjectNode optionsNode = (ObjectNode) objectNode.get("options");
-
-      Map<String, JsonNode> fields = Maps.newHashMap();
-      for (Iterator<Map.Entry<String, JsonNode>> iterator = optionsNode.fields();
-          iterator.hasNext(); ) {
-        Map.Entry<String, JsonNode> field = iterator.next();
-        fields.put(field.getKey(), field.getValue());
-      }
-      PipelineOptions options =
-          new ProxyInvocationHandler(Maps.<String, Object>newHashMap(), fields)
-              .as(PipelineOptions.class);
-      return options;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java
deleted file mode 100644
index 9563c58..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Options used to configure streaming.
- */
-public interface StreamingOptions extends
-    ApplicationNameOptions, GcpOptions, PipelineOptions {
-  /**
-   * Set to true if running a streaming pipeline.
-   */
-  @Description("Set to true if running a streaming pipeline.")
-  boolean isStreaming();
-  void setStreaming(boolean value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java
deleted file mode 100644
index 20034f8..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.options;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * {@link Validation} represents a set of annotations that can be used to annotate getter
- * properties on {@link PipelineOptions} with information representing the validation criteria to
- * be used when validating with the {@link PipelineOptionsValidator}.
- */
-public @interface Validation {
-  /**
-   * This criteria specifies that the value must be not null. Note that this annotation
-   * should only be applied to methods that return nullable objects.
-   */
-  @Target(value = ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Required {
-    /**
-     * The groups that the annotated attribute is a member of. A member can be in 0 or more groups.
-     * Members not in any groups are considered to be in a group consisting exclusively of
-     * themselves. At least one member of a group must be non-null if the options are to be valid.
-     */
-    String[] groups() default {};
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java
deleted file mode 100644
index cef995f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines {@link com.google.cloud.dataflow.sdk.options.PipelineOptions} for
- * configuring pipeline execution.
- *
- * <p>{@link com.google.cloud.dataflow.sdk.options.PipelineOptions} encapsulates the various
- * parameters that describe how a pipeline should be run. {@code PipelineOptions} are created
- * using a {@link com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory}.
- */
-package com.google.cloud.dataflow.sdk.options;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/package-info.java
deleted file mode 100644
index 5567f03..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/package-info.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Provides a simple, powerful model for building both batch and
- * streaming parallel data processing
- * {@link com.google.cloud.dataflow.sdk.Pipeline}s.
- *
- * <p>To use the Google Cloud Dataflow SDK, you build a
- * {@link com.google.cloud.dataflow.sdk.Pipeline}, which manages a graph of
- * {@link com.google.cloud.dataflow.sdk.transforms.PTransform}s
- * and the {@link com.google.cloud.dataflow.sdk.values.PCollection}s that
- * the PTransforms consume and produce.
- *
- * <p>Each Pipeline has a
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner} to specify
- * where and how it should run after pipeline construction is complete.
- *
- */
-package com.google.cloud.dataflow.sdk;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java
deleted file mode 100644
index ab87f2e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AggregatorRetriever;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map} of
- * {@link Aggregator} to the {@link PTransform PTransforms} in which it is present.
- */
-public class AggregatorPipelineExtractor {
-  private final Pipeline pipeline;
-
-  /**
-   * Creates an {@code AggregatorPipelineExtractor} for the given {@link Pipeline}.
-   */
-  public AggregatorPipelineExtractor(Pipeline pipeline) {
-    this.pipeline = pipeline;
-  }
-
-  /**
-   * Returns a {@link Map} between each {@link Aggregator} in the {@link Pipeline} to the {@link
-   * PTransform PTransforms} in which it is used.
-   */
-  public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
-    HashMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps = HashMultimap.create();
-    pipeline.traverseTopologically(new AggregatorVisitor(aggregatorSteps));
-    return aggregatorSteps.asMap();
-  }
-
-  private static class AggregatorVisitor implements PipelineVisitor {
-    private final SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps;
-
-    public AggregatorVisitor(SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps) {
-      this.aggregatorSteps = aggregatorSteps;
-    }
-
-    @Override
-    public void enterCompositeTransform(TransformTreeNode node) {}
-
-    @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {}
-
-    @Override
-    public void visitTransform(TransformTreeNode node) {
-      PTransform<?, ?> transform = node.getTransform();
-      addStepToAggregators(transform, getAggregators(transform));
-    }
-
-    private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
-      if (transform != null) {
-        if (transform instanceof ParDo.Bound) {
-          return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn());
-        } else if (transform instanceof ParDo.BoundMulti) {
-          return AggregatorRetriever.getAggregators(((ParDo.BoundMulti<?, ?>) transform).getFn());
-        }
-      }
-      return Collections.emptyList();
-    }
-
-    private void addStepToAggregators(
-        PTransform<?, ?> transform, Collection<Aggregator<?, ?>> aggregators) {
-      for (Aggregator<?, ?> aggregator : aggregators) {
-        aggregatorSteps.put(aggregator, transform);
-      }
-    }
-
-    @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java
deleted file mode 100644
index 90162ad..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-
-/**
- * Signals that an exception has occurred while retrieving {@link Aggregator}s.
- */
-public class AggregatorRetrievalException extends Exception {
-  /**
-   * Constructs a new {@code AggregatorRetrievalException} with the specified detail message and
-   * cause.
-   */
-  public AggregatorRetrievalException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java
deleted file mode 100644
index 21f0282..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * A collection of values associated with an {@link Aggregator}. Aggregators declared in a
- * {@link DoFn} are emitted on a per-{@code DoFn}-application basis.
- *
- * @param <T> the output type of the aggregator
- */
-public abstract class AggregatorValues<T> {
-  /**
-   * Get the values of the {@link Aggregator} at all steps it was used.
-   */
-  public Collection<T> getValues() {
-    return getValuesAtSteps().values();
-  }
-
-  /**
-   * Get the values of the {@link Aggregator} by the user name at each step it was used.
-   */
-  public abstract Map<String, T> getValuesAtSteps();
-
-  /**
-   * Get the total value of this {@link Aggregator} by applying the specified {@link CombineFn}.
-   */
-  public T getTotalValue(CombineFn<T, ?, T> combineFn) {
-    return combineFn.apply(getValues());
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
deleted file mode 100644
index 95e3dfe..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link PipelineRunner} that's like {@link DataflowPipelineRunner}
- * but that waits for the launched job to finish.
- *
- * <p>Prints out job status updates and console messages while it waits.
- *
- * <p>Returns the final job state, or throws an exception if the job
- * fails or cannot be monitored.
- *
- * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code BlockingDataflowPipelineRunner}, the Google cloud services account and the Google compute
- * engine service account of the GCP project running the Dataflow Job will need access to the
- * corresponding source/sink.
- *
- * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
- * Dataflow Security and Permissions</a> for more details.
- */
-public class BlockingDataflowPipelineRunner extends
-    PipelineRunner<DataflowPipelineJob> {
-  private static final Logger LOG = LoggerFactory.getLogger(BlockingDataflowPipelineRunner.class);
-
-  // Defaults to an infinite wait period.
-  // TODO: make this configurable after removal of option map.
-  private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L;
-
-  private final DataflowPipelineRunner dataflowPipelineRunner;
-  private final BlockingDataflowPipelineOptions options;
-
-  protected BlockingDataflowPipelineRunner(
-      DataflowPipelineRunner internalRunner,
-      BlockingDataflowPipelineOptions options) {
-    this.dataflowPipelineRunner = internalRunner;
-    this.options = options;
-  }
-
-  /**
-   * Constructs a runner from the provided options.
-   */
-  public static BlockingDataflowPipelineRunner fromOptions(
-      PipelineOptions options) {
-    BlockingDataflowPipelineOptions dataflowOptions =
-        PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, options);
-    DataflowPipelineRunner dataflowPipelineRunner =
-        DataflowPipelineRunner.fromOptions(dataflowOptions);
-
-    return new BlockingDataflowPipelineRunner(dataflowPipelineRunner, dataflowOptions);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws DataflowJobExecutionException if there is an exception during job execution.
-   * @throws DataflowServiceException if there is an exception retrieving information about the job.
-   */
-  @Override
-  public DataflowPipelineJob run(Pipeline p) {
-    final DataflowPipelineJob job = dataflowPipelineRunner.run(p);
-
-    // We ignore the potential race condition here (Ctrl-C after job submission but before the
-    // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
-    // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
-    // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
-    // etc. If the user wants to verify the job was cancelled they should look at the job status.
-    Thread shutdownHook = new Thread() {
-      @Override
-      public void run() {
-        LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
-            + "To cancel the job in the cloud, run:\n> {}",
-            MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
-      }
-    };
-
-    try {
-      Runtime.getRuntime().addShutdownHook(shutdownHook);
-
-      @Nullable
-      State result;
-      try {
-        result = job.waitToFinish(
-            BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
-            new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
-      } catch (IOException | InterruptedException ex) {
-        LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
-        throw new DataflowServiceException(
-            job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
-      }
-
-      if (result == null) {
-        throw new DataflowServiceException(
-            job, "Timed out while retrieving status for job " + job.getJobId());
-      }
-
-      LOG.info("Job finished with status {}", result);
-      if (!result.isTerminal()) {
-        throw new IllegalStateException("Expected terminal state for job " + job.getJobId()
-            + ", got " + result);
-      }
-
-      if (result == State.DONE) {
-        return job;
-      } else if (result == State.UPDATED) {
-        DataflowPipelineJob newJob = job.getReplacedByJob();
-        LOG.info("Job {} has been updated and is running as the new job with id {}."
-            + "To access the updated job on the Dataflow monitoring console, please navigate to {}",
-            job.getJobId(),
-            newJob.getJobId(),
-            MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId()));
-        throw new DataflowJobUpdatedException(
-            job,
-            String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()),
-            newJob);
-      } else if (result == State.CANCELLED) {
-        String message = String.format("Job %s cancelled by user", job.getJobId());
-        LOG.info(message);
-        throw new DataflowJobCancelledException(job, message);
-      } else {
-        throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
-            + " failed with status " + result);
-      }
-    } finally {
-      Runtime.getRuntime().removeShutdownHook(shutdownHook);
-    }
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    return dataflowPipelineRunner.apply(transform, input);
-  }
-
-  /**
-   * Sets callbacks to invoke during execution. See {@link DataflowPipelineRunnerHooks}.
-   */
-  @Experimental
-  public void setHooks(DataflowPipelineRunnerHooks hooks) {
-    this.dataflowPipelineRunner.setHooks(hooks);
-  }
-
-  @Override
-  public String toString() {
-    return "BlockingDataflowPipelineRunner#" + options.getJobName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
deleted file mode 100644
index 1547f73..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the unique job name constraint of the Dataflow
- * service is broken because an existing job with the same job name is currently active.
- * The {@link DataflowPipelineJob} contained within this exception contains information
- * about the pre-existing job.
- */
-public class DataflowJobAlreadyExistsException extends DataflowJobException {
-  /**
-   * Create a new {@code DataflowJobAlreadyExistsException} with the specified {@link
-   * DataflowPipelineJob} and message.
-   */
-  public DataflowJobAlreadyExistsException(
-      DataflowPipelineJob job, String message) {
-    super(job, message, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
deleted file mode 100644
index d4ae4f5..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the existing job has already been updated within the Dataflow
- * service and is no longer able to be updated. The {@link DataflowPipelineJob} contained within
- * this exception contains information about the pre-existing updated job.
- */
-public class DataflowJobAlreadyUpdatedException extends DataflowJobException {
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
-   * DataflowPipelineJob} and message.
-   */
-  public DataflowJobAlreadyUpdatedException(
-      DataflowPipelineJob job, String message) {
-    super(job, message, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
deleted file mode 100644
index 0d31726..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
- */
-public class DataflowJobCancelledException extends DataflowJobException {
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
-   * DataflowPipelineJob} and message.
-   */
-  public DataflowJobCancelledException(DataflowPipelineJob job, String message) {
-    super(job, message, null);
-  }
-
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
-   * DataflowPipelineJob}, message, and cause.
-   */
-  public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) {
-    super(job, message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
deleted file mode 100644
index 9e305d5..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link RuntimeException} that contains information about a {@link DataflowPipelineJob}.
- */
-public abstract class DataflowJobException extends RuntimeException {
-  private final DataflowPipelineJob job;
-
-  DataflowJobException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
-    super(message, cause);
-    this.job = Objects.requireNonNull(job);
-  }
-
-  /**
-   * Returns the failed job.
-   */
-  public DataflowPipelineJob getJob() {
-    return job;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
deleted file mode 100644
index ae6df0f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import javax.annotation.Nullable;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} fails during execution, and
- * provides access to the failed job.
- */
-public class DataflowJobExecutionException extends DataflowJobException {
-  DataflowJobExecutionException(DataflowPipelineJob job, String message) {
-    this(job, message, null);
-  }
-
-  DataflowJobExecutionException(
-      DataflowPipelineJob job, String message, @Nullable Throwable cause) {
-    super(job, message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
deleted file mode 100644
index 1becdd7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
- */
-public class DataflowJobUpdatedException extends DataflowJobException {
-  private DataflowPipelineJob replacedByJob;
-
-  /**
-   * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
-   * DataflowPipelineJob}, message, and replacement {@link DataflowPipelineJob}.
-   */
-  public DataflowJobUpdatedException(
-      DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob) {
-    this(job, message, replacedByJob, null);
-  }
-
-  /**
-   * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
-   * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, and cause.
-   */
-  public DataflowJobUpdatedException(
-      DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob, Throwable cause) {
-    super(job, message, cause);
-    this.replacedByJob = replacedByJob;
-  }
-
-  /**
-   * The new job that replaces the job terminated with this exception.
-   */
-  public DataflowPipelineJob getReplacedByJob() {
-    return replacedByJob;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
deleted file mode 100644
index 5a78624..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-/**
- * A {@link DataflowPipeline} is a {@link Pipeline} that returns a
- * {@link DataflowPipelineJob} when it is
- * {@link com.google.cloud.dataflow.sdk.Pipeline#run()}.
- *
- * <p>This is not intended for use by users of Cloud Dataflow.
- * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a
- * {@link Pipeline}.
- */
-public class DataflowPipeline extends Pipeline {
-
-  /**
-   * Creates and returns a new {@link DataflowPipeline} instance for tests.
-   */
-  public static DataflowPipeline create(DataflowPipelineOptions options) {
-    return new DataflowPipeline(options);
-  }
-
-  private DataflowPipeline(DataflowPipelineOptions options) {
-    super(DataflowPipelineRunner.fromOptions(options), options);
-  }
-
-  @Override
-  public DataflowPipelineJob run() {
-    return (DataflowPipelineJob) super.run();
-  }
-
-  @Override
-  public DataflowPipelineRunner getRunner() {
-    return (DataflowPipelineRunner) super.getRunner();
-  }
-
-  @Override
-  public String toString() {
-    return "DataflowPipeline#" + getOptions().as(DataflowPipelineOptions.class).getJobName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
deleted file mode 100644
index e9f134c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
-import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowMetricUpdateExtractor;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
-import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
-import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A DataflowPipelineJob represents a job submitted to Dataflow using
- * {@link DataflowPipelineRunner}.
- */
-public class DataflowPipelineJob implements PipelineResult {
-  private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
-
-  /**
-   * The id for the job.
-   */
-  private String jobId;
-
-  /**
-   * Google cloud project to associate this pipeline with.
-   */
-  private String projectId;
-
-  /**
-   * Client for the Dataflow service. This can be used to query the service
-   * for information about the job.
-   */
-  private Dataflow dataflowClient;
-
-  /**
-   * The state the job terminated in or {@code null} if the job has not terminated.
-   */
-  @Nullable
-  private State terminalState = null;
-
-  /**
-   * The job that replaced this one or {@code null} if the job has not been replaced.
-   */
-  @Nullable
-  private DataflowPipelineJob replacedByJob = null;
-
-  private DataflowAggregatorTransforms aggregatorTransforms;
-
-  /**
-   * The Metric Updates retrieved after the job was in a terminal state.
-   */
-  private List<MetricUpdate> terminalMetricUpdates;
-
-  /**
-   * The polling interval for job status and messages information.
-   */
-  static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
-  static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
-
-  /**
-   * The amount of polling attempts for job status and messages information.
-   */
-  static final int MESSAGES_POLLING_ATTEMPTS = 10;
-  static final int STATUS_POLLING_ATTEMPTS = 5;
-
-  /**
-   * Constructs the job.
-   *
-   * @param projectId the project id
-   * @param jobId the job id
-   * @param dataflowClient the client for the Dataflow Service
-   */
-  public DataflowPipelineJob(String projectId, String jobId, Dataflow dataflowClient,
-      DataflowAggregatorTransforms aggregatorTransforms) {
-    this.projectId = projectId;
-    this.jobId = jobId;
-    this.dataflowClient = dataflowClient;
-    this.aggregatorTransforms = aggregatorTransforms;
-  }
-
-  /**
-   * Get the id of this job.
-   */
-  public String getJobId() {
-    return jobId;
-  }
-
-  /**
-   * Get the project this job exists in.
-   */
-  public String getProjectId() {
-    return projectId;
-  }
-
-  /**
-   * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable.
-   *
-   * @throws IllegalStateException if called before the job has terminated or if the job terminated
-   * but was not updated
-   */
-  public DataflowPipelineJob getReplacedByJob() {
-    if (terminalState == null) {
-      throw new IllegalStateException("getReplacedByJob() called before job terminated");
-    }
-    if (replacedByJob == null) {
-      throw new IllegalStateException("getReplacedByJob() called for job that was not replaced");
-    }
-    return replacedByJob;
-  }
-
-  /**
-   * Get the Cloud Dataflow API Client used by this job.
-   */
-  public Dataflow getDataflowClient() {
-    return dataflowClient;
-  }
-
-  /**
-   * Waits for the job to finish and return the final status.
-   *
-   * @param timeToWait The time to wait in units timeUnit for the job to finish.
-   *     Provide a value less than 1 ms for an infinite wait.
-   * @param timeUnit The unit of time for timeToWait.
-   * @param messageHandler If non null this handler will be invoked for each
-   *   batch of messages received.
-   * @return The final state of the job or null on timeout or if the
-   *   thread is interrupted.
-   * @throws IOException If there is a persistent problem getting job
-   *   information.
-   * @throws InterruptedException
-   */
-  @Nullable
-  public State waitToFinish(
-      long timeToWait,
-      TimeUnit timeUnit,
-      MonitoringUtil.JobMessagesHandler messageHandler)
-          throws IOException, InterruptedException {
-    return waitToFinish(timeToWait, timeUnit, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
-  }
-
-  /**
-   * Wait for the job to finish and return the final status.
-   *
-   * @param timeToWait The time to wait in units timeUnit for the job to finish.
-   *     Provide a value less than 1 ms for an infinite wait.
-   * @param timeUnit The unit of time for timeToWait.
-   * @param messageHandler If non null this handler will be invoked for each
-   *   batch of messages received.
-   * @param sleeper A sleeper to use to sleep between attempts.
-   * @param nanoClock A nanoClock used to time the total time taken.
-   * @return The final state of the job or null on timeout or if the
-   *   thread is interrupted.
-   * @throws IOException If there is a persistent problem getting job
-   *   information.
-   * @throws InterruptedException
-   */
-  @Nullable
-  @VisibleForTesting
-  State waitToFinish(
-      long timeToWait,
-      TimeUnit timeUnit,
-      MonitoringUtil.JobMessagesHandler messageHandler,
-      Sleeper sleeper,
-      NanoClock nanoClock)
-          throws IOException, InterruptedException {
-    MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient);
-
-    long lastTimestamp = 0;
-    BackOff backoff =
-        timeUnit.toMillis(timeToWait) > 0
-            ? new AttemptAndTimeBoundedExponentialBackOff(
-                MESSAGES_POLLING_ATTEMPTS,
-                MESSAGES_POLLING_INTERVAL,
-                timeUnit.toMillis(timeToWait),
-                AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
-                nanoClock)
-            : new AttemptBoundedExponentialBackOff(
-                MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL);
-    State state;
-    do {
-      // Get the state of the job before listing messages. This ensures we always fetch job
-      // messages after the job finishes to ensure we have all them.
-      state = getStateWithRetries(1, sleeper);
-      boolean hasError = state == State.UNKNOWN;
-
-      if (messageHandler != null && !hasError) {
-        // Process all the job messages that have accumulated so far.
-        try {
-          List<JobMessage> allMessages = monitor.getJobMessages(
-              jobId, lastTimestamp);
-
-          if (!allMessages.isEmpty()) {
-            lastTimestamp =
-                fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
-            messageHandler.process(allMessages);
-          }
-        } catch (GoogleJsonResponseException | SocketTimeoutException e) {
-          hasError = true;
-          LOG.warn("There were problems getting current job messages: {}.", e.getMessage());
-          LOG.debug("Exception information:", e);
-        }
-      }
-
-      if (!hasError) {
-        backoff.reset();
-        // Check if the job is done.
-        if (state.isTerminal()) {
-          return state;
-        }
-      }
-    } while(BackOffUtils.next(sleeper, backoff));
-    LOG.warn("No terminal state was returned.  State value {}", state);
-    return null;  // Timed out.
-  }
-
-  /**
-   * Cancels the job.
-   * @throws IOException if there is a problem executing the cancel request.
-   */
-  public void cancel() throws IOException {
-    Job content = new Job();
-    content.setProjectId(projectId);
-    content.setId(jobId);
-    content.setRequestedState("JOB_STATE_CANCELLED");
-    dataflowClient.projects().jobs()
-        .update(projectId, jobId, content)
-        .execute();
-  }
-
-  @Override
-  public State getState() {
-    if (terminalState != null) {
-      return terminalState;
-    }
-
-    return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT);
-  }
-
-  /**
-   * Attempts to get the state. Uses exponential backoff on failure up to the maximum number
-   * of passed in attempts.
-   *
-   * @param attempts The amount of attempts to make.
-   * @param sleeper Object used to do the sleeps between attempts.
-   * @return The state of the job or State.UNKNOWN in case of failure.
-   */
-  @VisibleForTesting
-  State getStateWithRetries(int attempts, Sleeper sleeper) {
-    if (terminalState != null) {
-      return terminalState;
-    }
-    try {
-      Job job = getJobWithRetries(attempts, sleeper);
-      return MonitoringUtil.toState(job.getCurrentState());
-    } catch (IOException exn) {
-      // The only IOException that getJobWithRetries is permitted to throw is the final IOException
-      // that caused the failure of retry. Other exceptions are wrapped in an unchecked exceptions
-      // and will propagate.
-      return State.UNKNOWN;
-    }
-  }
-
-  /**
-   * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the
-   * maximum number of passed in attempts.
-   *
-   * @param attempts The amount of attempts to make.
-   * @param sleeper Object used to do the sleeps between attempts.
-   * @return The underlying {@link Job} object.
-   * @throws IOException When the maximum number of retries is exhausted, the last exception is
-   * thrown.
-   */
-  @VisibleForTesting
-  Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException {
-    AttemptBoundedExponentialBackOff backoff =
-        new AttemptBoundedExponentialBackOff(attempts, STATUS_POLLING_INTERVAL);
-
-    // Retry loop ends in return or throw
-    while (true) {
-      try {
-        Job job = dataflowClient
-            .projects()
-            .jobs()
-            .get(projectId, jobId)
-            .execute();
-        State currentState = MonitoringUtil.toState(job.getCurrentState());
-        if (currentState.isTerminal()) {
-          terminalState = currentState;
-          replacedByJob = new DataflowPipelineJob(
-              getProjectId(), job.getReplacedByJobId(), dataflowClient, aggregatorTransforms);
-        }
-        return job;
-      } catch (IOException exn) {
-        LOG.warn("There were problems getting current job status: {}.", exn.getMessage());
-        LOG.debug("Exception information:", exn);
-
-        if (!nextBackOff(sleeper, backoff)) {
-          throw exn;
-        }
-      }
-    }
-  }
-
-  /**
-   * Identical to {@link BackOffUtils#next} but without checked exceptions.
-   */
-  private boolean nextBackOff(Sleeper sleeper, BackOff backoff) {
-    try {
-      return BackOffUtils.next(sleeper, backoff);
-    } catch (InterruptedException | IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
-      throws AggregatorRetrievalException {
-    try {
-      return new MapAggregatorValues<>(fromMetricUpdates(aggregator));
-    } catch (IOException e) {
-      throw new AggregatorRetrievalException(
-          "IOException when retrieving Aggregator values for Aggregator " + aggregator, e);
-    }
-  }
-
-  private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator)
-      throws IOException {
-    if (aggregatorTransforms.contains(aggregator)) {
-      List<MetricUpdate> metricUpdates;
-      if (terminalMetricUpdates != null) {
-        metricUpdates = terminalMetricUpdates;
-      } else {
-        boolean terminal = getState().isTerminal();
-        JobMetrics jobMetrics =
-            dataflowClient.projects().jobs().getMetrics(projectId, jobId).execute();
-        metricUpdates = jobMetrics.getMetrics();
-        if (terminal && jobMetrics.getMetrics() != null) {
-          terminalMetricUpdates = metricUpdates;
-        }
-      }
-
-      return DataflowMetricUpdateExtractor.fromMetricUpdates(
-          aggregator, aggregatorTransforms, metricUpdates);
-    } else {
-      throw new IllegalArgumentException(
-          "Aggregator " + aggregator + " is not used in this pipeline");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
deleted file mode 100644
index 0e4d4e9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.auto.service.AutoService;
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
- * the {@link DataflowPipeline}.
- */
-public class DataflowPipelineRegistrar {
-  private DataflowPipelineRegistrar() { }
-
-  /**
-   * Register the {@link DataflowPipelineOptions} and {@link BlockingDataflowPipelineOptions}.
-   */
-  @AutoService(PipelineOptionsRegistrar.class)
-  public static class Options implements PipelineOptionsRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(
-          DataflowPipelineOptions.class,
-          BlockingDataflowPipelineOptions.class);
-    }
-  }
-
-  /**
-   * Register the {@link DataflowPipelineRunner} and {@link BlockingDataflowPipelineRunner}.
-   */
-  @AutoService(PipelineRunnerRegistrar.class)
-  public static class Runner implements PipelineRunnerRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
-          DataflowPipelineRunner.class,
-          BlockingDataflowPipelineRunner.class);
-    }
-  }
-}