You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:07 UTC

[20/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
deleted file mode 100644
index 1ce5fe5..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.Structs;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Coder} using Google Protocol Buffers 2 binary format.
- *
- * <p>To learn more about Protocol Buffers, visit:
- * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a>
- *
- * <p>To use, specify the {@link Coder} type on a PCollection containing Protocol Buffers messages.
- *
- * <pre>
- * {@code
- * PCollection<MyProto.Message> records =
- *     input.apply(...)
- *          .setCoder(Proto2Coder.of(MyProto.Message.class));
- * }
- * </pre>
- *
- * <p>Custom message extensions are also supported, but the coder must be made
- * aware of them explicitly:
- *
- * <pre>
- * {@code
- * PCollection<MyProto.Message> records =
- *     input.apply(...)
- *          .setCoder(Proto2Coder.of(MyProto.Message.class)
- *              .addExtensionsFrom(MyProto.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder, must extend {@code Message}
- * @deprecated Use {@link ProtoCoder}.
- */
-@Deprecated
-public class Proto2Coder<T extends Message> extends AtomicCoder<T> {
-
-  /** The class of Protobuf message to be encoded. */
-  private final Class<T> protoMessageClass;
-
-  /**
-   * All extension host classes included in this Proto2Coder. The extensions from
-   * these classes will be included in the {@link ExtensionRegistry} used during
-   * encoding and decoding.
-   */
-  private final List<Class<?>> extensionHostClasses;
-
-  private Proto2Coder(Class<T> protoMessageClass, List<Class<?>> extensionHostClasses) {
-    this.protoMessageClass = protoMessageClass;
-    this.extensionHostClasses = extensionHostClasses;
-  }
-
-  private static final CoderProvider PROVIDER =
-      new CoderProvider() {
-        @Override
-        public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-          if (type.isSubtypeOf(new TypeDescriptor<Message>() {})) {
-            @SuppressWarnings("unchecked")
-            TypeDescriptor<? extends Message> messageType =
-                (TypeDescriptor<? extends Message>) type;
-            @SuppressWarnings("unchecked")
-            Coder<T> coder = (Coder<T>) Proto2Coder.of(messageType);
-            return coder;
-          } else {
-            throw new CannotProvideCoderException(
-                String.format(
-                    "Cannot provide Proto2Coder because %s "
-                        + "is not a subclass of protocol buffer Messsage",
-                    type));
-          }
-        }
-      };
-
-  public static CoderProvider coderProvider() {
-    return PROVIDER;
-  }
-
-  /**
-   * Returns a {@code Proto2Coder} for the given Protobuf message class.
-   */
-  public static <T extends Message> Proto2Coder<T> of(Class<T> protoMessageClass) {
-    return new Proto2Coder<T>(protoMessageClass, Collections.<Class<?>>emptyList());
-  }
-
-  /**
-   * Returns a {@code Proto2Coder} for the given Protobuf message class.
-   */
-  public static <T extends Message> Proto2Coder<T> of(TypeDescriptor<T> protoMessageType) {
-    @SuppressWarnings("unchecked")
-    Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
-    return of(protoMessageClass);
-  }
-
-  /**
-   * Produces a {@code Proto2Coder} like this one, but with the extensions from
-   * the given classes registered.
-   *
-   * @param moreExtensionHosts an iterable of classes that define a static
-   *      method {@code registerAllExtensions(ExtensionRegistry)}
-   */
-  public Proto2Coder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) {
-    for (Class<?> extensionHost : moreExtensionHosts) {
-      // Attempt to access the required method, to make sure it's present.
-      try {
-        Method registerAllExtensions =
-            extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
-        checkArgument(
-            Modifier.isStatic(registerAllExtensions.getModifiers()),
-            "Method registerAllExtensions() must be static for use with Proto2Coder");
-      } catch (NoSuchMethodException | SecurityException e) {
-        throw new IllegalArgumentException(e);
-      }
-    }
-
-    return new Proto2Coder<T>(
-        protoMessageClass,
-        new ImmutableList.Builder<Class<?>>()
-            .addAll(extensionHostClasses)
-            .addAll(moreExtensionHosts)
-            .build());
-  }
-
-  /**
-   * See {@link #withExtensionsFrom(Iterable)}.
-   */
-  public Proto2Coder<T> withExtensionsFrom(Class<?>... extensionHosts) {
-    return withExtensionsFrom(ImmutableList.copyOf(extensionHosts));
-  }
-
-  /**
-   * Adds custom Protobuf extensions to the coder. Returns {@code this}
-   * for method chaining.
-   *
-   * @param extensionHosts must be a class that defines a static
-   *      method name {@code registerAllExtensions}
-   * @deprecated use {@link #withExtensionsFrom}
-   */
-  @Deprecated
-  public Proto2Coder<T> addExtensionsFrom(Class<?>... extensionHosts) {
-    return addExtensionsFrom(ImmutableList.copyOf(extensionHosts));
-  }
-
-  /**
-   * Adds custom Protobuf extensions to the coder. Returns {@code this}
-   * for method chaining.
-   *
-   * @param extensionHosts must be a class that defines a static
-   *      method name {@code registerAllExtensions}
-   * @deprecated use {@link #withExtensionsFrom}
-   */
-  @Deprecated
-  public Proto2Coder<T> addExtensionsFrom(Iterable<Class<?>> extensionHosts) {
-    for (Class<?> extensionHost : extensionHosts) {
-      try {
-        // Attempt to access the declared method, to make sure it's present.
-        extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
-      } catch (NoSuchMethodException e) {
-        throw new IllegalArgumentException(e);
-      }
-      extensionHostClasses.add(extensionHost);
-    }
-    // The memoized extension registry needs to be recomputed because we have mutated this object.
-    synchronized (this) {
-      memoizedExtensionRegistry = null;
-      getExtensionRegistry();
-    }
-    return this;
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
-    }
-    if (context.isWholeStream) {
-      value.writeTo(outStream);
-    } else {
-      value.writeDelimitedTo(outStream);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
-    if (context.isWholeStream) {
-      return getParser().parseFrom(inStream, getExtensionRegistry());
-    } else {
-      return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
-    }
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-    if (!(other instanceof Proto2Coder)) {
-      return false;
-    }
-    Proto2Coder<?> otherCoder = (Proto2Coder<?>) other;
-    return protoMessageClass.equals(otherCoder.protoMessageClass)
-        && Sets.newHashSet(extensionHostClasses)
-            .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(protoMessageClass, extensionHostClasses);
-  }
-
-  /**
-   * The encoding identifier is designed to support evolution as per the design of Protocol
-   * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol
-   * Buffers documentation at
-   * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating
-   * A Message Type</a>.
-   *
-   * <p>In particular, the encoding identifier is guaranteed to be the same for {@code Proto2Coder}
-   * instances of the same principal message class, and otherwise distinct. Loaded extensions do not
-   * affect the id, nor does it encode the full schema.
-   *
-   * <p>When modifying a message class, here are the broadest guidelines; see the above link
-   * for greater detail.
-   *
-   * <ul>
-   * <li>Do not change the numeric tags for any fields.
-   * <li>Never remove a <code>required</code> field.
-   * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults.
-   * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure
-   * the new and old types are interchangeable.
-   * </ul>
-   *
-   * <p>Code consuming this message class should be prepared to support <i>all</i> versions of
-   * the class until it is certain that no remaining serialized instances exist.
-   *
-   * <p>If backwards incompatible changes must be made, the best recourse is to change the name
-   * of your Protocol Buffers message class.
-   */
-  @Override
-  public String getEncodingId() {
-    return protoMessageClass.getName();
-  }
-
-  private transient Parser<T> memoizedParser;
-
-  private Parser<T> getParser() {
-    if (memoizedParser == null) {
-      try {
-        @SuppressWarnings("unchecked")
-        T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null);
-        @SuppressWarnings("unchecked")
-        Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType();
-        memoizedParser = tParser;
-      } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-        throw new IllegalArgumentException(e);
-      }
-    }
-    return memoizedParser;
-  }
-
-  private transient ExtensionRegistry memoizedExtensionRegistry;
-
-  private synchronized ExtensionRegistry getExtensionRegistry() {
-    if (memoizedExtensionRegistry == null) {
-      ExtensionRegistry registry = ExtensionRegistry.newInstance();
-      for (Class<?> extensionHost : extensionHostClasses) {
-        try {
-          extensionHost
-              .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class)
-              .invoke(null, registry);
-        } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-          throw new IllegalStateException(e);
-        }
-      }
-      memoizedExtensionRegistry = registry.getUnmodifiable();
-    }
-    return memoizedExtensionRegistry;
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  // JSON Serialization details below
-
-  private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
-  private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
-
-  /**
-   * Constructor for JSON deserialization only.
-   */
-  @JsonCreator
-  public static <T extends Message> Proto2Coder<T> of(
-      @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
-      @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) {
-
-    try {
-      @SuppressWarnings("unchecked")
-      Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName);
-      List<Class<?>> extensionHostClasses = Lists.newArrayList();
-      if (extensionHostClassNames != null) {
-        for (String extensionHostClassName : extensionHostClassNames) {
-          extensionHostClasses.add(Class.forName(extensionHostClassName));
-        }
-      }
-      return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
-    List<CloudObject> extensionHostClassNames = Lists.newArrayList();
-    for (Class<?> clazz : extensionHostClasses) {
-      extensionHostClassNames.add(CloudObject.forString(clazz.getName()));
-    }
-    Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java
deleted file mode 100644
index 1590b11..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.ObjectStreamClass;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * A {@link Coder} for Java classes that implement {@link Serializable}.
- *
- * <p>To use, specify the coder type on a PCollection:
- * <pre>
- * {@code
- *   PCollection<MyRecord> records =
- *       foo.apply(...).setCoder(SerializableCoder.of(MyRecord.class));
- * }
- * </pre>
- *
- * <p>{@link SerializableCoder} does not guarantee a deterministic encoding, as Java
- * serialization may produce different binary encodings for two equivalent
- * objects.
- *
- * @param <T> the type of elements handled by this coder
- */
-public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
-
-  /**
-   * Returns a {@link SerializableCoder} instance for the provided element type.
-   * @param <T> the element type
-   */
-  public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> type) {
-    @SuppressWarnings("unchecked")
-    Class<T> clazz = (Class<T>) type.getRawType();
-    return of(clazz);
-  }
-
-  /**
-   * Returns a {@link SerializableCoder} instance for the provided element class.
-   * @param <T> the element type
-   */
-  public static <T extends Serializable> SerializableCoder<T> of(Class<T> clazz) {
-    return new SerializableCoder<>(clazz);
-  }
-
-  @JsonCreator
-  @SuppressWarnings("unchecked")
-  public static SerializableCoder<?> of(@JsonProperty("type") String classType)
-      throws ClassNotFoundException {
-    Class<?> clazz = Class.forName(classType);
-    if (!Serializable.class.isAssignableFrom(clazz)) {
-      throw new ClassNotFoundException(
-          "Class " + classType + " does not implement Serializable");
-    }
-    return of((Class<? extends Serializable>) clazz);
-  }
-
-  /**
-   * A {@link CoderProvider} that constructs a {@link SerializableCoder}
-   * for any class that implements serializable.
-   */
-  public static final CoderProvider PROVIDER = new CoderProvider() {
-    @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
-        throws CannotProvideCoderException {
-      Class<?> clazz = typeDescriptor.getRawType();
-      if (Serializable.class.isAssignableFrom(clazz)) {
-        @SuppressWarnings("unchecked")
-        Class<? extends Serializable> serializableClazz =
-            (Class<? extends Serializable>) clazz;
-        @SuppressWarnings("unchecked")
-        Coder<T> coder = (Coder<T>) SerializableCoder.of(serializableClazz);
-        return coder;
-      } else {
-        throw new CannotProvideCoderException(
-            "Cannot provide SerializableCoder because " + typeDescriptor
-            + " does not implement Serializable");
-      }
-    }
-  };
-
-
-  private final Class<T> type;
-
-  protected SerializableCoder(Class<T> type) {
-    this.type = type;
-  }
-
-  public Class<T> getRecordType() {
-    return type;
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    try {
-      ObjectOutputStream oos = new ObjectOutputStream(outStream);
-      oos.writeObject(value);
-      oos.flush();
-    } catch (IOException exn) {
-      throw new CoderException("unable to serialize record " + value, exn);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    try {
-      ObjectInputStream ois = new ObjectInputStream(inStream);
-      return type.cast(ois.readObject());
-    } catch (ClassNotFoundException e) {
-      throw new CoderException("unable to deserialize record", e);
-    }
-  }
-
-  @Override
-  public String getEncodingId() {
-    return String.format("%s:%s",
-        type.getName(),
-        ObjectStreamClass.lookup(type).getSerialVersionUID());
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    result.put("type", type.getName());
-    return result;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always. Java serialization is not
-   *         deterministic with respect to {@link Object#equals} for all types.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Java Serialization may be non-deterministic.");
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (getClass() != other.getClass()) {
-      return false;
-    }
-    return type == ((SerializableCoder<?>) other).type;
-  }
-
-  @Override
-  public int hashCode() {
-    return type.hashCode();
-  }
-
-  // This coder inherits isRegisterByteSizeObserverCheap,
-  // getEncodedElementByteSize and registerByteSizeObserver
-  // from StandardCoder. Looks like we cannot do much better
-  // in this case.
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java
deleted file mode 100644
index 589a372..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A {@link SetCoder} encodes any {@link Set} using the format of {@link IterableLikeCoder}. The
- * elements may not be in a deterministic order, depending on the {@code Set} implementation.
- *
- * @param <T> the type of the elements of the set
- */
-public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
-
-  /**
-   * Produces a {@link SetCoder} with the given {@code elementCoder}.
-   */
-  public static <T> SetCoder<T> of(Coder<T> elementCoder) {
-    return new SetCoder<>(elementCoder);
-  }
-
-  /**
-   * Dynamically typed constructor for JSON deserialization.
-   */
-  @JsonCreator
-  public static SetCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Object> components) {
-    Preconditions.checkArgument(components.size() == 1,
-        "Expecting 1 component, got " + components.size());
-    return of((Coder<?>) components.get(0));
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always. Sets are not ordered, but
-   *         they are encoded in the order of an arbitrary iteration.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Ordering of elements in a set may be non-deterministic.");
-  }
-
-  /**
-   * Returns the first element in this set if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(
-      Set<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal operations below here.
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return A new {@link Set} built from the elements in the {@link List} decoded by
-   * {@link IterableLikeCoder}.
-   */
-  @Override
-  protected final Set<T> decodeToIterable(List<T> decodedElements) {
-    return new HashSet<>(decodedElements);
-  }
-
-  protected SetCoder(Coder<T> elemCoder) {
-    super(elemCoder, "Set");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java
deleted file mode 100644
index ca189b1..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addList;
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-import static com.google.cloud.dataflow.sdk.util.Structs.addStringList;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
-
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing
- * via the class name and recursively using {@link #getComponents}.
- *
- * <p>To extend {@link StandardCoder}, override the following methods as appropriate:
- *
- * <ul>
- *   <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li>
- *   <li>{@link #getEncodedElementByteSize} and
- *       {@link #isRegisterByteSizeObserverCheap}: the
- *       default implementation encodes values to bytes and counts the bytes, which is considered
- *       expensive.</li>
- *   <li>{@link #getEncodingId} and {@link #getAllowedEncodings}: by default, the encoding id
- *       is the empty string, so only the canonical name of the subclass will be used for
- *       compatibility checks, and no other encoding ids are allowed.</li>
- * </ul>
- */
-public abstract class StandardCoder<T> implements Coder<T> {
-  protected StandardCoder() {}
-
-  @Override
-  public String getEncodingId() {
-    return "";
-  }
-
-  @Override
-  public Collection<String> getAllowedEncodings() {
-    return Collections.emptyList();
-  }
-
-  /**
-   * Returns the list of {@link Coder Coders} that are components of this {@link Coder}.
-   */
-  public List<? extends Coder<?>> getComponents() {
-    List<? extends Coder<?>> coderArguments = getCoderArguments();
-    if (coderArguments == null) {
-      return Collections.emptyList();
-    } else {
-      return coderArguments;
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true} if the two {@link StandardCoder} instances have the
-   * same class and equal components.
-   */
-  @Override
-  public boolean equals(Object o) {
-    if (o == null || this.getClass() != o.getClass()) {
-      return false;
-    }
-    StandardCoder<?> that = (StandardCoder<?>) o;
-    return this.getComponents().equals(that.getComponents());
-  }
-
-  @Override
-  public int hashCode() {
-    return getClass().hashCode() * 31 + getComponents().hashCode();
-  }
-
-  @Override
-  public String toString() {
-    String s = getClass().getName();
-    s = s.substring(s.lastIndexOf('.') + 1);
-    List<? extends Coder<?>> componentCoders = getComponents();
-    if (!componentCoders.isEmpty()) {
-      s += "(";
-      boolean first = true;
-      for (Coder<?> componentCoder : componentCoders) {
-        if (first) {
-          first = false;
-        } else {
-          s += ", ";
-        }
-        s += componentCoder.toString();
-      }
-      s += ")";
-    }
-    return s;
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-
-    List<? extends Coder<?>> components = getComponents();
-    if (!components.isEmpty()) {
-      List<CloudObject> cloudComponents = new ArrayList<>(components.size());
-      for (Coder<?> coder : components) {
-        cloudComponents.add(coder.asCloudObject());
-      }
-      addList(result, PropertyNames.COMPONENT_ENCODINGS, cloudComponents);
-    }
-
-    String encodingId = getEncodingId();
-    checkNotNull(encodingId, "Coder.getEncodingId() must not return null.");
-    if (!encodingId.isEmpty()) {
-      addString(result, PropertyNames.ENCODING_ID, encodingId);
-    }
-
-    Collection<String> allowedEncodings = getAllowedEncodings();
-    if (!allowedEncodings.isEmpty()) {
-      addStringList(result, PropertyNames.ALLOWED_ENCODINGS, Lists.newArrayList(allowedEncodings));
-    }
-
-    return result;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code false} unless it is overridden. {@link StandardCoder#registerByteSizeObserver}
-   *         invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
-   *         unless it is overridden. This is considered expensive.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
-    return false;
-  }
-
-  /**
-   * Returns the size in bytes of the encoded value using this coder.
-   */
-  protected long getEncodedElementByteSize(T value, Context context)
-      throws Exception {
-    try {
-      CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream());
-      encode(value, os, context);
-      return os.getCount();
-    } catch (Exception exn) {
-      throw new IllegalArgumentException(
-          "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>For {@link StandardCoder} subclasses, this notifies {@code observer} about the byte size
-   * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
-   */
-  @Override
-  public void registerByteSizeObserver(
-      T value, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    observer.update(getEncodedElementByteSize(value, context));
-  }
-
-  protected void verifyDeterministic(String message, Iterable<Coder<?>> coders)
-      throws NonDeterministicException {
-    for (Coder<?> coder : coders) {
-      try {
-        coder.verifyDeterministic();
-      } catch (NonDeterministicException e) {
-        throw new NonDeterministicException(this, message, e);
-      }
-    }
-  }
-
-  protected void verifyDeterministic(String message, Coder<?>... coders)
-      throws NonDeterministicException {
-    verifyDeterministic(message, Arrays.asList(coders));
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code false} for {@link StandardCoder} unless overridden.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return false;
-  }
-
-  @Override
-  public Object structuralValue(T value) throws Exception {
-    if (value != null && consistentWithEquals()) {
-      return value;
-    } else {
-      try {
-        ByteArrayOutputStream os = new ByteArrayOutputStream();
-        encode(value, os, Context.OUTER);
-        return new StructuralByteArray(os.toByteArray());
-      } catch (Exception exn) {
-        throw new IllegalArgumentException(
-            "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
deleted file mode 100644
index 3f352d3..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
-
-import java.lang.reflect.InvocationTargetException;
-
-/**
- * A {@link Coder} that wraps a {@code Coder<String>}
- * and encodes/decodes values via string representations.
- *
- * <p>To decode, the input byte stream is decoded to
- * a {@link String}, and this is passed to the single-argument
- * constructor for {@code T}.
- *
- * <p>To encode, the input value is converted via {@code toString()},
- * and this string is encoded.
- *
- * <p>In order for this to operate correctly for a class {@code Clazz},
- * it must be the case for any instance {@code x} that
- * {@code x.equals(new Clazz(x.toString()))}.
- *
- * <p>This method of encoding is not designed for ease of evolution of {@code Clazz};
- * it should only be used in cases where the class is stable or the encoding is not
- * important. If evolution of the class is important, see {@link ProtoCoder}, {@link AvroCoder},
- * or {@link JAXBCoder}.
- *
- * @param <T> The type of objects coded.
- */
-public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
-  public static <T> StringDelegateCoder<T> of(Class<T> clazz) {
-    return new StringDelegateCoder<T>(clazz);
-  }
-
-  @Override
-  public String toString() {
-    return "StringDelegateCoder(" + clazz + ")";
-  }
-
-  private final Class<T> clazz;
-
-  protected StringDelegateCoder(final Class<T> clazz) {
-    super(StringUtf8Coder.of(),
-      new CodingFunction<T, String>() {
-        @Override
-        public String apply(T input) {
-          return input.toString();
-        }
-      },
-      new CodingFunction<String, T>() {
-        @Override
-        public T apply(String input) throws
-            NoSuchMethodException,
-            InstantiationException,
-            IllegalAccessException,
-            InvocationTargetException {
-          return clazz.getConstructor(String.class).newInstance(input);
-        }
-      });
-
-    this.clazz = clazz;
-  }
-
-  /**
-   * The encoding id is the fully qualified name of the encoded/decoded class.
-   */
-  @Override
-  public String getEncodingId() {
-    return clazz.getName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java
deleted file mode 100644
index 25b8f5e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.ExposedByteArrayOutputStream;
-import com.google.cloud.dataflow.sdk.util.StreamUtils;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.common.base.Utf8;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-import java.nio.charset.StandardCharsets;
-
-/**
- * A {@link Coder} that encodes {@link String Strings} in UTF-8 encoding.
- * If in a nested context, prefixes the string with an integer length field,
- * encoded via a {@link VarIntCoder}.
- */
-public class StringUtf8Coder extends AtomicCoder<String> {
-
-  @JsonCreator
-  public static StringUtf8Coder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final StringUtf8Coder INSTANCE = new StringUtf8Coder();
-
-  private static void writeString(String value, DataOutputStream dos)
-      throws IOException {
-    byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
-    VarInt.encode(bytes.length, dos);
-    dos.write(bytes);
-  }
-
-  private static String readString(DataInputStream dis) throws IOException {
-    int len = VarInt.decodeInt(dis);
-    if (len < 0) {
-      throw new CoderException("Invalid encoded string length: " + len);
-    }
-    byte[] bytes = new byte[len];
-    dis.readFully(bytes);
-    return new String(bytes, StandardCharsets.UTF_8);
-  }
-
-  private StringUtf8Coder() {}
-
-  @Override
-  public void encode(String value, OutputStream outStream, Context context)
-      throws IOException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null String");
-    }
-    if (context.isWholeStream) {
-      byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
-      if (outStream instanceof ExposedByteArrayOutputStream) {
-        ((ExposedByteArrayOutputStream) outStream).writeAndOwn(bytes);
-      } else {
-        outStream.write(bytes);
-      }
-    } else {
-      writeString(value, new DataOutputStream(outStream));
-    }
-  }
-
-  @Override
-  public String decode(InputStream inStream, Context context)
-      throws IOException {
-    if (context.isWholeStream) {
-      byte[] bytes = StreamUtils.getBytes(inStream);
-      return new String(bytes, StandardCharsets.UTF_8);
-    } else {
-      try {
-        return readString(new DataInputStream(inStream));
-      } catch (EOFException | UTFDataFormatException exn) {
-        // These exceptions correspond to decoding problems, so change
-        // what kind of exception they're branded as.
-        throw new CoderException(exn);
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. This coder is injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return the byte size of the UTF-8 encoding of the a string or, in a nested context,
-   * the byte size of the encoding plus the encoded length prefix.
-   */
-  @Override
-  protected long getEncodedElementByteSize(String value, Context context)
-      throws Exception {
-    if (value == null) {
-      throw new CoderException("cannot encode a null String");
-    }
-    if (context.isWholeStream) {
-      return Utf8.encodedLength(value);
-    } else {
-      CountingOutputStream countingStream =
-          new CountingOutputStream(ByteStreams.nullOutputStream());
-      DataOutputStream stream = new DataOutputStream(countingStream);
-      writeString(value, stream);
-      return countingStream.getCount();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java
deleted file mode 100644
index aa44456..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.api.client.util.Base64.encodeBase64String;
-
-import java.util.Arrays;
-
-/**
- * A wrapper around a byte[] that uses structural, value-based
- * equality rather than byte[]'s normal object identity.
- */
-public class StructuralByteArray {
-  byte[] value;
-
-  public StructuralByteArray(byte[] value) {
-    this.value = value;
-  }
-
-  public byte[] getValue() {
-    return value;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof StructuralByteArray) {
-      StructuralByteArray that = (StructuralByteArray) o;
-      return Arrays.equals(this.value, that.value);
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public int hashCode() {
-    return Arrays.hashCode(value);
-  }
-
-  @Override
-  public String toString() {
-    return "base64:" + encodeBase64String(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java
deleted file mode 100644
index b02fb08..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.api.services.bigquery.model.TableRow;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format.
- */
-public class TableRowJsonCoder extends AtomicCoder<TableRow> {
-
-  @JsonCreator
-  public static TableRowJsonCoder of() {
-    return INSTANCE;
-  }
-
-  @Override
-  public void encode(TableRow value, OutputStream outStream, Context context)
-      throws IOException {
-    String strValue = MAPPER.writeValueAsString(value);
-    StringUtf8Coder.of().encode(strValue, outStream, context);
-  }
-
-  @Override
-  public TableRow decode(InputStream inStream, Context context)
-      throws IOException {
-    String strValue = StringUtf8Coder.of().decode(inStream, context);
-    return MAPPER.readValue(strValue, TableRow.class);
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(TableRow value, Context context)
-      throws Exception {
-    String strValue = MAPPER.writeValueAsString(value);
-    return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
-  // TableRow.
-  private static final ObjectMapper MAPPER =
-      new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
-
-  private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
-
-  private TableRowJsonCoder() { }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always. A {@link TableRow} can hold arbitrary
-   *         {@link Object} instances, which makes the encoding non-deterministic.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "TableCell can hold arbitrary instances, which may be non-deterministic.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java
deleted file mode 100644
index 539c56a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of
- * their textual, decimal, representation.
- */
-public class TextualIntegerCoder extends AtomicCoder<Integer> {
-
-  @JsonCreator
-  public static TextualIntegerCoder of() {
-    return new TextualIntegerCoder();
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  protected TextualIntegerCoder() {}
-
-  @Override
-  public void encode(Integer value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Integer");
-    }
-    String textualValue = value.toString();
-    StringUtf8Coder.of().encode(textualValue, outStream, context);
-  }
-
-  @Override
-  public Integer decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    String textualValue = StringUtf8Coder.of().decode(inStream, context);
-    try {
-      return Integer.valueOf(textualValue);
-    } catch (NumberFormatException exn) {
-      throw new CoderException("error when decoding a textual integer", exn);
-    }
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(Integer value, Context context) throws Exception {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Integer");
-    }
-    String textualValue = value.toString();
-    return StringUtf8Coder.of().getEncodedElementByteSize(textualValue, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java
deleted file mode 100644
index 42862bb..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.VarInt;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link Coder} that encodes {@link Integer Integers} using between 1 and 5 bytes. Negative
- * numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for
- * integers that are known to often be large or negative.
- */
-public class VarIntCoder extends AtomicCoder<Integer> {
-
-  @JsonCreator
-  public static VarIntCoder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final VarIntCoder INSTANCE =
-      new VarIntCoder();
-
-  private VarIntCoder() {}
-
-  @Override
-  public void encode(Integer value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Integer");
-    }
-    VarInt.encode(value.intValue(), outStream);
-  }
-
-  @Override
-  public Integer decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    try {
-      return VarInt.decodeInt(inStream);
-    } catch (EOFException | UTFDataFormatException exn) {
-      // These exceptions correspond to decoding problems, so change
-      // what kind of exception they're branded as.
-      throw new CoderException(exn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link VarIntCoder} is injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link #getEncodedElementByteSize} is cheap.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) {
-    return true;
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(Integer value, Context context)
-      throws Exception {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Integer");
-    }
-    return VarInt.getLength(value.longValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java
deleted file mode 100644
index 669453e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.VarInt;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link Coder} that encodes {@link Long Longs} using between 1 and 10 bytes. Negative
- * numbers always take 10 bytes, so {@link BigEndianLongCoder} may be preferable for
- * longs that are known to often be large or negative.
- */
-public class VarLongCoder extends AtomicCoder<Long> {
-
-  @JsonCreator
-  public static VarLongCoder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final VarLongCoder INSTANCE = new VarLongCoder();
-
-  private VarLongCoder() {}
-
-  @Override
-  public void encode(Long value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Long");
-    }
-    VarInt.encode(value.longValue(), outStream);
-  }
-
-  @Override
-  public Long decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    try {
-      return VarInt.decodeLong(inStream);
-    } catch (EOFException | UTFDataFormatException exn) {
-      // These exceptions correspond to decoding problems, so change
-      // what kind of exception they're branded as.
-      throw new CoderException(exn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link VarLongCoder} is injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link #getEncodedElementByteSize} is cheap.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(Long value, Context context) {
-    return true;
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(Long value, Context context)
-      throws Exception {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Long");
-    }
-    return VarInt.getLength(value.longValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java
deleted file mode 100644
index 813ee2f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}.
- */
-public class VoidCoder extends AtomicCoder<Void> {
-
-  @JsonCreator
-  public static VoidCoder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final VoidCoder INSTANCE = new VoidCoder();
-
-  private VoidCoder() {}
-
-  @Override
-  public void encode(Void value, OutputStream outStream, Context context) {
-    // Nothing to write!
-  }
-
-  @Override
-  public Void decode(InputStream inStream, Context context) {
-    // Nothing to read!
-    return null;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return  {@code true}. {@link VoidCoder} is (vacuously) injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link VoidCoder#getEncodedElementByteSize} runs in constant time.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(Void value, Context context) {
-    return true;
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(Void value, Context context)
-      throws Exception {
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java
deleted file mode 100644
index a3bc150..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines {@link com.google.cloud.dataflow.sdk.coders.Coder Coders}
- * to specify how data is encoded to and decoded from byte strings.
- *
- * <p>During execution of a Pipeline, elements in a
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}
- * may need to be encoded into byte strings.
- * This happens both at the beginning and end of a pipeline when data is read from and written to
- * persistent storage and also during execution of a pipeline when elements are communicated between
- * machines.
- *
- * <p>Exactly when PCollection elements are encoded during execution depends on which
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner} is being used and how that runner
- * chooses to execute the pipeline. As such, Dataflow requires that all PCollections have an
- * appropriate Coder in case it becomes necessary. In many cases, the Coder can be inferred from
- * the available Java type
- * information and the Pipeline's {@link com.google.cloud.dataflow.sdk.coders.CoderRegistry}. It
- * can be specified per PCollection via
- * {@link com.google.cloud.dataflow.sdk.values.PCollection#setCoder(Coder)} or per type using the
- * {@link com.google.cloud.dataflow.sdk.coders.DefaultCoder} annotation.
- *
- * <p>This package provides a number of coders for common types like {@code Integer},
- * {@code String}, and {@code List}, as well as coders like
- * {@link com.google.cloud.dataflow.sdk.coders.AvroCoder} that can be used to encode many custom
- * types.
- *
- */
-package com.google.cloud.dataflow.sdk.coders;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
deleted file mode 100644
index 88d13e9..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders.protobuf;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.CoderProvider;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.Structs;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Coder} using Google Protocol Buffers binary format. {@link ProtoCoder} supports both
- * Protocol Buffers syntax versions 2 and 3.
- *
- * <p>To learn more about Protocol Buffers, visit:
- * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a>
- *
- * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as the default
- * {@link Coder} for any {@link Message} object. Custom message extensions are also supported, but
- * these extensions must be registered for a particular {@link ProtoCoder} instance and that
- * instance must be registered on the {@link PCollection} that needs the extensions:
- *
- * <pre>{@code
- * import MyProtoFile;
- * import MyProtoFile.MyMessage;
- *
- * Coder<MyMessage> coder = ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class);
- * PCollection<MyMessage> records =  input.apply(...).setCoder(coder);
- * }</pre>
- *
- * <h3>Versioning</h3>
- *
- * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol Buffers syntax. However,
- * the Java runtime version of the <code>google.com.protobuf</code> library must match exactly the
- * version of <code>protoc</code> that was used to produce the JAR files containing the compiled
- * <code>.proto</code> messages.
- *
- * <p>For more information, see the
- * <a href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types">Protocol Buffers documentation</a>.
- *
- * <h3>{@link ProtoCoder} and Determinism</h3>
- *
- * <p>In general, Protocol Buffers messages can be encoded deterministically within a single
- * pipeline as long as:
- *
- * <ul>
- * <li>The encoded messages (and any transitively linked messages) do not use <code>map</code>
- *     fields.</li>
- * <li>Every Java VM that encodes or decodes the messages use the same runtime version of the
- *     Protocol Buffers library and the same compiled <code>.proto</code> file JAR.</li>
- * </ul>
- *
- * <h3>{@link ProtoCoder} and Encoding Stability</h3>
- *
- * <p>When changing Protocol Buffers messages, follow the rules in the Protocol Buffers language
- * guides for
- * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">{@code proto2}</a>
- * and
- * <a href="https://developers.google.com/protocol-buffers/docs/proto3#updating">{@code proto3}</a>
- * syntaxes, depending on your message type. Following these guidelines will ensure that the
- * old encoded data can be read by new versions of the code.
- *
- * <p>Generally, any change to the message type, registered extensions, runtime library, or
- * compiled proto JARs may change the encoding. Thus even if both the original and updated messages
- * can be encoded deterministically within a single job, these deterministic encodings may not be
- * the same across jobs.
- *
- * @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}.
- */
-public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
-
-  /**
-   * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
-   * {@link ExtensionRegistry}.
-   */
-  public static CoderProvider coderProvider() {
-    return PROVIDER;
-  }
-
-  /**
-   * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
-   */
-  public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) {
-    return new ProtoCoder<T>(protoMessageClass, ImmutableSet.<Class<?>>of());
-  }
-
-  /**
-   * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} indicated by the given
-   * {@link TypeDescriptor}.
-   */
-  public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> protoMessageType) {
-    @SuppressWarnings("unchecked")
-    Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
-    return of(protoMessageClass);
-  }
-
-  /**
-   * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes
-   * registered.
-   *
-   * <p>Each of the extension host classes must be an class automatically generated by the
-   * Protocol Buffers compiler, {@code protoc}, that contains messages.
-   *
-   * <p>Does not modify this object.
-   */
-  public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) {
-    for (Class<?> extensionHost : moreExtensionHosts) {
-      // Attempt to access the required method, to make sure it's present.
-      try {
-        Method registerAllExtensions =
-            extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
-        checkArgument(
-            Modifier.isStatic(registerAllExtensions.getModifiers()),
-            "Method registerAllExtensions() must be static");
-      } catch (NoSuchMethodException | SecurityException e) {
-        throw new IllegalArgumentException(
-            String.format("Unable to register extensions for %s", extensionHost.getCanonicalName()),
-            e);
-      }
-    }
-
-    return new ProtoCoder<T>(
-        protoMessageClass,
-        new ImmutableSet.Builder<Class<?>>()
-            .addAll(extensionHostClasses)
-            .addAll(moreExtensionHosts)
-            .build());
-  }
-
-  /**
-   * See {@link #withExtensionsFrom(Iterable)}.
-   *
-   * <p>Does not modify this object.
-   */
-  public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) {
-    return withExtensionsFrom(Arrays.asList(moreExtensionHosts));
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
-    }
-    if (context.isWholeStream) {
-      value.writeTo(outStream);
-    } else {
-      value.writeDelimitedTo(outStream);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
-    if (context.isWholeStream) {
-      return getParser().parseFrom(inStream, getExtensionRegistry());
-    } else {
-      return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
-    }
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-    if (!(other instanceof ProtoCoder)) {
-      return false;
-    }
-    ProtoCoder<?> otherCoder = (ProtoCoder<?>) other;
-    return protoMessageClass.equals(otherCoder.protoMessageClass)
-        && Sets.newHashSet(extensionHostClasses)
-            .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(protoMessageClass, extensionHostClasses);
-  }
-
-  /**
-   * The encoding identifier is designed to support evolution as per the design of Protocol
-   * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol
-   * Buffers documentation at
-   * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating
-   * A Message Type</a>.
-   *
-   * <p>In particular, the encoding identifier is guaranteed to be the same for {@link ProtoCoder}
-   * instances of the same principal message class, with the same registered extension host classes,
-   * and otherwise distinct. Note that the encoding ID does not encode any version of the message
-   * or extensions, nor does it include the message schema.
-   *
-   * <p>When modifying a message class, here are the broadest guidelines; see the above link
-   * for greater detail.
-   *
-   * <ul>
-   * <li>Do not change the numeric tags for any fields.
-   * <li>Never remove a <code>required</code> field.
-   * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults.
-   * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure
-   * the new and old types are interchangeable.
-   * </ul>
-   *
-   * <p>Code consuming this message class should be prepared to support <i>all</i> versions of
-   * the class until it is certain that no remaining serialized instances exist.
-   *
-   * <p>If backwards incompatible changes must be made, the best recourse is to change the name
-   * of your Protocol Buffers message class.
-   */
-  @Override
-  public String getEncodingId() {
-    return protoMessageClass.getName() + getSortedExtensionClasses().toString();
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    ProtobufUtil.verifyDeterministic(this);
-  }
-
-  /**
-   * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} supports.
-   */
-  public Class<T> getMessageType() {
-    return protoMessageClass;
-  }
-
-  /**
-   * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages
-   * to {@code T} registered with this {@link ProtoCoder}.
-   */
-  public ExtensionRegistry getExtensionRegistry() {
-    if (memoizedExtensionRegistry == null) {
-      ExtensionRegistry registry = ExtensionRegistry.newInstance();
-      for (Class<?> extensionHost : extensionHostClasses) {
-        try {
-          extensionHost
-              .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class)
-              .invoke(null, registry);
-        } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-          throw new IllegalStateException(e);
-        }
-      }
-      memoizedExtensionRegistry = registry.getUnmodifiable();
-    }
-    return memoizedExtensionRegistry;
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  // Private implementation details below.
-
-  /** The {@link Message} type to be coded. */
-  private final Class<T> protoMessageClass;
-
-  /**
-   * All extension host classes included in this {@link ProtoCoder}. The extensions from these
-   * classes will be included in the {@link ExtensionRegistry} used during encoding and decoding.
-   */
-  private final Set<Class<?>> extensionHostClasses;
-
-  // Constants used to serialize and deserialize
-  private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
-  private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
-
-  // Transient fields that are lazy initialized and then memoized.
-  private transient ExtensionRegistry memoizedExtensionRegistry;
-  private transient Parser<T> memoizedParser;
-
-  /** Private constructor. */
-  private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> extensionHostClasses) {
-    this.protoMessageClass = protoMessageClass;
-    this.extensionHostClasses = extensionHostClasses;
-  }
-
-  /**
-   * @deprecated For JSON deserialization only.
-   */
-  @JsonCreator
-  @Deprecated
-  public static <T extends Message> ProtoCoder<T> of(
-      @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
-      @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) {
-
-    try {
-      @SuppressWarnings("unchecked")
-      Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName);
-      List<Class<?>> extensionHostClasses = Lists.newArrayList();
-      if (extensionHostClassNames != null) {
-        for (String extensionHostClassName : extensionHostClassNames) {
-          extensionHostClasses.add(Class.forName(extensionHostClassName));
-        }
-      }
-      return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
-    List<CloudObject> extensionHostClassNames = Lists.newArrayList();
-    for (String className : getSortedExtensionClasses()) {
-      extensionHostClassNames.add(CloudObject.forString(className));
-    }
-    Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
-    return result;
-  }
-
-  /** Get the memoized {@link Parser}, possibly initializing it lazily. */
-  private Parser<T> getParser() {
-    if (memoizedParser == null) {
-      try {
-        @SuppressWarnings("unchecked")
-        T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null);
-        @SuppressWarnings("unchecked")
-        Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType();
-        memoizedParser = tParser;
-      } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-        throw new IllegalArgumentException(e);
-      }
-    }
-    return memoizedParser;
-  }
-
-  /**
-   * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
-   * {@link #coderProvider()}.
-   */
-  private static final CoderProvider PROVIDER =
-      new CoderProvider() {
-        @Override
-        public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-          if (!type.isSubtypeOf(new TypeDescriptor<Message>() {})) {
-            throw new CannotProvideCoderException(
-                String.format(
-                    "Cannot provide %s because %s is not a subclass of %s",
-                    ProtoCoder.class.getSimpleName(),
-                    type,
-                    Message.class.getName()));
-          }
-
-          @SuppressWarnings("unchecked")
-          TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type;
-          try {
-            @SuppressWarnings("unchecked")
-            Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
-            return coder;
-          } catch (IllegalArgumentException e) {
-            throw new CannotProvideCoderException(e);
-          }
-        }
-      };
-
-  private SortedSet<String> getSortedExtensionClasses() {
-    SortedSet<String> ret = new TreeSet<>();
-    for (Class<?> clazz : extensionHostClasses) {
-      ret.add(clazz.getName());
-    }
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java
deleted file mode 100644
index 24415ac..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders.protobuf;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.protobuf.Descriptors.Descriptor;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
-import com.google.protobuf.Descriptors.GenericDescriptor;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.ExtensionRegistry.ExtensionInfo;
-import com.google.protobuf.Message;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Utility functions for reflecting and analyzing Protocol Buffers classes.
- *
- * <p>Used by {@link ProtoCoder}, but in a separate file for testing and isolation.
- */
-class ProtobufUtil {
-  /**
-   * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message}.
-   *
-   * @throws IllegalArgumentException if there is an error in Java reflection.
-   */
-  static Descriptor getDescriptorForClass(Class<? extends Message> clazz) {
-    try {
-      return (Descriptor) clazz.getMethod("getDescriptor").invoke(null);
-    } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  /**
-   * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message} as well as
-   * every class it can include transitively.
-   *
-   * @throws IllegalArgumentException if there is an error in Java reflection.
-   */
-  static Set<Descriptor> getRecursiveDescriptorsForClass(
-      Class<? extends Message> clazz, ExtensionRegistry registry) {
-    Descriptor root = getDescriptorForClass(clazz);
-    Set<Descriptor> descriptors = new HashSet<>();
-    recursivelyAddDescriptors(root, descriptors, registry);
-    return descriptors;
-  }
-
-  /**
-   * Recursively walks the given {@link Message} class and verifies that every field or message
-   * linked in uses the Protocol Buffers proto2 syntax.
-   */
-  static void checkProto2Syntax(Class<? extends Message> clazz, ExtensionRegistry registry) {
-    for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) {
-      Syntax s = d.getFile().getSyntax();
-      checkArgument(
-          s == Syntax.PROTO2,
-          "Message %s or one of its dependencies does not use proto2 syntax: %s in file %s",
-          clazz.getName(),
-          d.getFullName(),
-          d.getFile().getName());
-    }
-  }
-
-  /**
-   * Recursively checks whether the specified class uses any Protocol Buffers fields that cannot
-   * be deterministically encoded.
-   *
-   * @throws NonDeterministicException if the object cannot be encoded deterministically.
-   */
-  static void verifyDeterministic(ProtoCoder<?> coder) throws NonDeterministicException {
-    Class<? extends Message> message = coder.getMessageType();
-    ExtensionRegistry registry = coder.getExtensionRegistry();
-    Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, registry);
-    for (Descriptor d : descriptors) {
-      for (FieldDescriptor fd : d.getFields()) {
-        // If there is a transitively reachable Protocol Buffers map field, then this object cannot
-        // be encoded deterministically.
-        if (fd.isMapField()) {
-          String reason =
-              String.format(
-                  "Protocol Buffers message %s transitively includes Map field %s (from file %s)."
-                      + " Maps cannot be deterministically encoded.",
-                  message.getName(),
-                  fd.getFullName(),
-                  fd.getFile().getFullName());
-          throw new NonDeterministicException(coder, reason);
-        }
-      }
-    }
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-  // Disable construction of utility class
-  private ProtobufUtil() {}
-
-  private static void recursivelyAddDescriptors(
-      Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry registry) {
-    if (descriptors.contains(message)) {
-      return;
-    }
-    descriptors.add(message);
-
-    for (FieldDescriptor f : message.getFields()) {
-      recursivelyAddDescriptors(f, descriptors, registry);
-    }
-    for (FieldDescriptor f : message.getExtensions()) {
-      recursivelyAddDescriptors(f, descriptors, registry);
-    }
-    for (ExtensionInfo info :
-        registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) {
-      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
-    }
-    for (ExtensionInfo info :
-        registry.getAllMutableExtensionsByExtendedType(message.getFullName())) {
-      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
-    }
-  }
-
-  private static void recursivelyAddDescriptors(
-      FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry registry) {
-    switch (field.getType()) {
-      case BOOL:
-      case BYTES:
-      case DOUBLE:
-      case ENUM:
-      case FIXED32:
-      case FIXED64:
-      case FLOAT:
-      case INT32:
-      case INT64:
-      case SFIXED32:
-      case SFIXED64:
-      case SINT32:
-      case SINT64:
-      case STRING:
-      case UINT32:
-      case UINT64:
-        // Primitive types do not transitively access anything else.
-        break;
-
-      case GROUP:
-      case MESSAGE:
-        // Recursively adds all the fields from this nested Message.
-        recursivelyAddDescriptors(field.getMessageType(), descriptors, registry);
-        break;
-
-      default:
-        throw new UnsupportedOperationException(
-            "Unexpected Protocol Buffers field type: " + field.getType());
-    }
-  }
-}