You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/24 22:04:44 UTC

[2/3] beam git commit: [BEAM-1871] Move ProtoCoder to new sdks/java/extensions/protobuf package.

[BEAM-1871] Move ProtoCoder to new sdks/java/extensions/protobuf package.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff1fe7fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff1fe7fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff1fe7fa

Branch: refs/heads/master
Commit: ff1fe7fa53816fe4327c7572c13a616fc4243dc9
Parents: d7e7af8
Author: Luke Cwik <lc...@google.com>
Authored: Mon Apr 24 14:32:21 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Apr 24 15:04:13 2017 -0700

----------------------------------------------------------------------
 sdks/java/core/pom.xml                          |   6 -
 .../apache/beam/sdk/coders/CoderRegistry.java   |  10 +-
 .../beam/sdk/coders/StringDelegateCoder.java    |   4 +-
 .../beam/sdk/coders/protobuf/ProtoCoder.java    | 405 -------------------
 .../beam/sdk/coders/protobuf/ProtobufUtil.java  | 171 --------
 .../beam/sdk/coders/protobuf/package-info.java  |  24 --
 sdks/java/core/src/main/proto/README.md         |  45 ---
 .../main/proto/proto2_coder_test_messages.proto |  53 ---
 .../beam/sdk/coders/CoderRegistryTest.java      |  14 -
 .../sdk/coders/protobuf/ProtoCoderTest.java     | 182 ---------
 .../sdk/coders/protobuf/ProtobufUtilTest.java   | 192 ---------
 sdks/java/extensions/pom.xml                    |   1 +
 sdks/java/extensions/protobuf/pom.xml           | 142 +++++++
 .../sdk/extensions/protobuf/ProtoCoder.java     | 405 +++++++++++++++++++
 .../sdk/extensions/protobuf/ProtobufUtil.java   | 171 ++++++++
 .../sdk/extensions/protobuf/package-info.java   |  24 ++
 .../sdk/extensions/protobuf/ProtoCoderTest.java | 181 +++++++++
 .../extensions/protobuf/ProtobufUtilTest.java   | 191 +++++++++
 .../test/proto/proto2_coder_test_messages.proto |  53 +++
 19 files changed, 1173 insertions(+), 1101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index ac7a3bb..6c46453 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -294,12 +294,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.cloud.dataflow</groupId>
-      <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
       <groupId>com.esotericsoftware.kryo</groupId>
       <artifactId>kryo</artifactId>
       <version>2.21</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 4238293..e0b2b3a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -44,7 +44,6 @@ import java.util.Set;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -77,9 +76,7 @@ import org.slf4j.LoggerFactory;
  *       the default {@code Coder} type. The {@link Coder} class must satisfy the requirements
  *       of {@link CoderProviders#fromStaticMethods}.
  *   <li>Fallback: A fallback {@link CoderProvider} is used to attempt to provide a {@link Coder}
- *       for any type. By default, there are two chained fallback coders:
- *       {@link ProtoCoder#coderProvider}, which can provide a coder to efficiently serialize any
- *       Protocol Buffers message, and then {@link SerializableCoder#PROVIDER}, which can provide a
+ *       for any type. By default, there is {@link SerializableCoder#PROVIDER}, which can provide a
  *       {@link Coder} for any type that is serializable via Java serialization. The fallback
  *       {@link CoderProvider} can be get and set respectively using
  *       {@link #getFallbackCoderProvider()} and {@link #setFallbackCoderProvider}. Multiple
@@ -165,7 +162,7 @@ public class CoderRegistry implements CoderProvider {
   private CoderRegistry() {
     coderFactoryMap = new HashMap<>(REGISTERED_CODER_FACTORIES_PER_CLASS);
     setFallbackCoderProvider(
-        CoderProviders.firstOf(ProtoCoder.coderProvider(), SerializableCoder.PROVIDER));
+        CoderProviders.firstOf(SerializableCoder.PROVIDER));
   }
 
   /**
@@ -423,8 +420,7 @@ public class CoderRegistry implements CoderProvider {
    * providing a {@code Coder<T>} for a type {@code T}, then the registry will attempt to create
    * a {@link Coder} using this {@link CoderProvider}.
    *
-   * <p>By default, this is set to the chain of {@link ProtoCoder#coderProvider()} and
-   * {@link SerializableCoder#PROVIDER}.
+   * <p>By default, this is set to {@link SerializableCoder#PROVIDER}.
    *
    * <p>See {@link #getFallbackCoderProvider}.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index f86369c..51ead3c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -23,7 +23,6 @@ import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
 import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -43,7 +42,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * <p>This method of encoding is not designed for ease of evolution of {@code Clazz};
  * it should only be used in cases where the class is stable or the encoding is not
- * important. If evolution of the class is important, see {@link ProtoCoder} or {@link AvroCoder}.
+ * important. If evolution of the class is important, see {@link AvroCoder} or any other
+ * evolution safe encoding.
  *
  * @param <T> The type of objects coded.
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
deleted file mode 100644
index a5f53ff..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Structs;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * A {@link Coder} using Google Protocol Buffers binary format. {@link ProtoCoder} supports both
- * Protocol Buffers syntax versions 2 and 3.
- *
- * <p>To learn more about Protocol Buffers, visit:
- * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a>
- *
- * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as the default
- * {@link Coder} for any {@link Message} object. Custom message extensions are also supported, but
- * these extensions must be registered for a particular {@link ProtoCoder} instance and that
- * instance must be registered on the {@link PCollection} that needs the extensions:
- *
- * <pre>{@code
- * import MyProtoFile;
- * import MyProtoFile.MyMessage;
- *
- * Coder<MyMessage> coder = ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class);
- * PCollection<MyMessage> records =  input.apply(...).setCoder(coder);
- * }</pre>
- *
- * <h3>Versioning</h3>
- *
- * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol Buffers syntax. However,
- * the Java runtime version of the <code>google.com.protobuf</code> library must match exactly the
- * version of <code>protoc</code> that was used to produce the JAR files containing the compiled
- * <code>.proto</code> messages.
- *
- * <p>For more information, see the
- * <a href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types">Protocol Buffers documentation</a>.
- *
- * <h3>{@link ProtoCoder} and Determinism</h3>
- *
- * <p>In general, Protocol Buffers messages can be encoded deterministically within a single
- * pipeline as long as:
- *
- * <ul>
- * <li>The encoded messages (and any transitively linked messages) do not use <code>map</code>
- *     fields.</li>
- * <li>Every Java VM that encodes or decodes the messages use the same runtime version of the
- *     Protocol Buffers library and the same compiled <code>.proto</code> file JAR.</li>
- * </ul>
- *
- * <h3>{@link ProtoCoder} and Encoding Stability</h3>
- *
- * <p>When changing Protocol Buffers messages, follow the rules in the Protocol Buffers language
- * guides for
- * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">{@code proto2}</a>
- * and
- * <a href="https://developers.google.com/protocol-buffers/docs/proto3#updating">{@code proto3}</a>
- * syntaxes, depending on your message type. Following these guidelines will ensure that the
- * old encoded data can be read by new versions of the code.
- *
- * <p>Generally, any change to the message type, registered extensions, runtime library, or
- * compiled proto JARs may change the encoding. Thus even if both the original and updated messages
- * can be encoded deterministically within a single job, these deterministic encodings may not be
- * the same across jobs.
- *
- * @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}.
- */
-public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
-
-  /**
-   * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
-   * {@link ExtensionRegistry}.
-   */
-  public static CoderProvider coderProvider() {
-    return PROVIDER;
-  }
-
-  /**
-   * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
-   */
-  public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) {
-    return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of());
-  }
-
-  /**
-   * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} indicated by the given
-   * {@link TypeDescriptor}.
-   */
-  public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> protoMessageType) {
-    @SuppressWarnings("unchecked")
-    Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
-    return of(protoMessageClass);
-  }
-
-  /**
-   * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes
-   * registered.
-   *
-   * <p>Each of the extension host classes must be an class automatically generated by the
-   * Protocol Buffers compiler, {@code protoc}, that contains messages.
-   *
-   * <p>Does not modify this object.
-   */
-  public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) {
-    for (Class<?> extensionHost : moreExtensionHosts) {
-      // Attempt to access the required method, to make sure it's present.
-      try {
-        Method registerAllExtensions =
-            extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
-        checkArgument(
-            Modifier.isStatic(registerAllExtensions.getModifiers()),
-            "Method registerAllExtensions() must be static");
-      } catch (NoSuchMethodException | SecurityException e) {
-        throw new IllegalArgumentException(
-            String.format("Unable to register extensions for %s", extensionHost.getCanonicalName()),
-            e);
-      }
-    }
-
-    return new ProtoCoder<>(
-        protoMessageClass,
-        new ImmutableSet.Builder<Class<?>>()
-            .addAll(extensionHostClasses)
-            .addAll(moreExtensionHosts)
-            .build());
-  }
-
-  /**
-   * See {@link #withExtensionsFrom(Iterable)}.
-   *
-   * <p>Does not modify this object.
-   */
-  public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) {
-    return withExtensionsFrom(Arrays.asList(moreExtensionHosts));
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
-    }
-    if (context.isWholeStream) {
-      value.writeTo(outStream);
-    } else {
-      value.writeDelimitedTo(outStream);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
-    if (context.isWholeStream) {
-      return getParser().parseFrom(inStream, getExtensionRegistry());
-    } else {
-      return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
-    }
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-    if (!(other instanceof ProtoCoder)) {
-      return false;
-    }
-    ProtoCoder<?> otherCoder = (ProtoCoder<?>) other;
-    return protoMessageClass.equals(otherCoder.protoMessageClass)
-        && Sets.newHashSet(extensionHostClasses)
-            .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(protoMessageClass, extensionHostClasses);
-  }
-
-  /**
-   * The encoding identifier is designed to support evolution as per the design of Protocol
-   * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol
-   * Buffers documentation at
-   * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating
-   * A Message Type</a>.
-   *
-   * <p>In particular, the encoding identifier is guaranteed to be the same for {@link ProtoCoder}
-   * instances of the same principal message class, with the same registered extension host classes,
-   * and otherwise distinct. Note that the encoding ID does not encode any version of the message
-   * or extensions, nor does it include the message schema.
-   *
-   * <p>When modifying a message class, here are the broadest guidelines; see the above link
-   * for greater detail.
-   *
-   * <ul>
-   * <li>Do not change the numeric tags for any fields.
-   * <li>Never remove a <code>required</code> field.
-   * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults.
-   * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure
-   * the new and old types are interchangeable.
-   * </ul>
-   *
-   * <p>Code consuming this message class should be prepared to support <i>all</i> versions of
-   * the class until it is certain that no remaining serialized instances exist.
-   *
-   * <p>If backwards incompatible changes must be made, the best recourse is to change the name
-   * of your Protocol Buffers message class.
-   */
-  @Override
-  public String getEncodingId() {
-    return protoMessageClass.getName() + getSortedExtensionClasses().toString();
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    ProtobufUtil.verifyDeterministic(this);
-  }
-
-  /**
-   * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} supports.
-   */
-  public Class<T> getMessageType() {
-    return protoMessageClass;
-  }
-
-  /**
-   * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages
-   * to {@code T} registered with this {@link ProtoCoder}.
-   */
-  public ExtensionRegistry getExtensionRegistry() {
-    if (memoizedExtensionRegistry == null) {
-      ExtensionRegistry registry = ExtensionRegistry.newInstance();
-      for (Class<?> extensionHost : extensionHostClasses) {
-        try {
-          extensionHost
-              .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class)
-              .invoke(null, registry);
-        } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-          throw new IllegalStateException(e);
-        }
-      }
-      memoizedExtensionRegistry = registry.getUnmodifiable();
-    }
-    return memoizedExtensionRegistry;
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  // Private implementation details below.
-
-  /** The {@link Message} type to be coded. */
-  private final Class<T> protoMessageClass;
-
-  /**
-   * All extension host classes included in this {@link ProtoCoder}. The extensions from these
-   * classes will be included in the {@link ExtensionRegistry} used during encoding and decoding.
-   */
-  private final Set<Class<?>> extensionHostClasses;
-
-  // Constants used to serialize and deserialize
-  private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
-  private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
-
-  // Transient fields that are lazy initialized and then memoized.
-  private transient ExtensionRegistry memoizedExtensionRegistry;
-  private transient Parser<T> memoizedParser;
-
-  /** Private constructor. */
-  private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> extensionHostClasses) {
-    this.protoMessageClass = protoMessageClass;
-    this.extensionHostClasses = extensionHostClasses;
-  }
-
-  /**
-   * @deprecated For JSON deserialization only.
-   */
-  @JsonCreator
-  @Deprecated
-  public static <T extends Message> ProtoCoder<T> of(
-      @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
-      @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) {
-
-    try {
-      @SuppressWarnings("unchecked")
-      Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName);
-      List<Class<?>> extensionHostClasses = Lists.newArrayList();
-      if (extensionHostClassNames != null) {
-        for (String extensionHostClassName : extensionHostClassNames) {
-          extensionHostClasses.add(Class.forName(extensionHostClassName));
-        }
-      }
-      return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  @Override
-  public CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
-    List<CloudObject> extensionHostClassNames = Lists.newArrayList();
-    for (String className : getSortedExtensionClasses()) {
-      extensionHostClassNames.add(CloudObject.forString(className));
-    }
-    Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
-    return result;
-  }
-
-  /** Get the memoized {@link Parser}, possibly initializing it lazily. */
-  private Parser<T> getParser() {
-    if (memoizedParser == null) {
-      try {
-        @SuppressWarnings("unchecked")
-        T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null);
-        @SuppressWarnings("unchecked")
-        Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType();
-        memoizedParser = tParser;
-      } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-        throw new IllegalArgumentException(e);
-      }
-    }
-    return memoizedParser;
-  }
-
-  static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {};
-
-  /**
-   * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
-   * {@link #coderProvider()}.
-   */
-  private static final CoderProvider PROVIDER =
-      new CoderProvider() {
-        @Override
-        public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-          if (!type.isSubtypeOf(CHECK)) {
-            throw new CannotProvideCoderException(
-                String.format(
-                    "Cannot provide %s because %s is not a subclass of %s",
-                    ProtoCoder.class.getSimpleName(),
-                    type,
-                    Message.class.getName()));
-          }
-
-          @SuppressWarnings("unchecked")
-          TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type;
-          try {
-            @SuppressWarnings("unchecked")
-            Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
-            return coder;
-          } catch (IllegalArgumentException e) {
-            throw new CannotProvideCoderException(e);
-          }
-        }
-      };
-
-  private SortedSet<String> getSortedExtensionClasses() {
-    SortedSet<String> ret = new TreeSet<>();
-    for (Class<?> clazz : extensionHostClasses) {
-      ret.add(clazz.getName());
-    }
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java
deleted file mode 100644
index 77afb47..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.protobuf.Descriptors.Descriptor;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
-import com.google.protobuf.Descriptors.GenericDescriptor;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.ExtensionRegistry.ExtensionInfo;
-import com.google.protobuf.Message;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-
-/**
- * Utility functions for reflecting and analyzing Protocol Buffers classes.
- *
- * <p>Used by {@link ProtoCoder}, but in a separate file for testing and isolation.
- */
-class ProtobufUtil {
-  /**
-   * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message}.
-   *
-   * @throws IllegalArgumentException if there is an error in Java reflection.
-   */
-  static Descriptor getDescriptorForClass(Class<? extends Message> clazz) {
-    try {
-      return (Descriptor) clazz.getMethod("getDescriptor").invoke(null);
-    } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  /**
-   * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message} as well as
-   * every class it can include transitively.
-   *
-   * @throws IllegalArgumentException if there is an error in Java reflection.
-   */
-  static Set<Descriptor> getRecursiveDescriptorsForClass(
-      Class<? extends Message> clazz, ExtensionRegistry registry) {
-    Descriptor root = getDescriptorForClass(clazz);
-    Set<Descriptor> descriptors = new HashSet<>();
-    recursivelyAddDescriptors(root, descriptors, registry);
-    return descriptors;
-  }
-
-  /**
-   * Recursively walks the given {@link Message} class and verifies that every field or message
-   * linked in uses the Protocol Buffers proto2 syntax.
-   */
-  static void checkProto2Syntax(Class<? extends Message> clazz, ExtensionRegistry registry) {
-    for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) {
-      Syntax s = d.getFile().getSyntax();
-      checkArgument(
-          s == Syntax.PROTO2,
-          "Message %s or one of its dependencies does not use proto2 syntax: %s in file %s",
-          clazz.getName(),
-          d.getFullName(),
-          d.getFile().getName());
-    }
-  }
-
-  /**
-   * Recursively checks whether the specified class uses any Protocol Buffers fields that cannot
-   * be deterministically encoded.
-   *
-   * @throws NonDeterministicException if the object cannot be encoded deterministically.
-   */
-  static void verifyDeterministic(ProtoCoder<?> coder) throws NonDeterministicException {
-    Class<? extends Message> message = coder.getMessageType();
-    ExtensionRegistry registry = coder.getExtensionRegistry();
-    Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, registry);
-    for (Descriptor d : descriptors) {
-      for (FieldDescriptor fd : d.getFields()) {
-        // If there is a transitively reachable Protocol Buffers map field, then this object cannot
-        // be encoded deterministically.
-        if (fd.isMapField()) {
-          String reason =
-              String.format(
-                  "Protocol Buffers message %s transitively includes Map field %s (from file %s)."
-                      + " Maps cannot be deterministically encoded.",
-                  message.getName(),
-                  fd.getFullName(),
-                  fd.getFile().getFullName());
-          throw new NonDeterministicException(coder, reason);
-        }
-      }
-    }
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-  // Disable construction of utility class
-  private ProtobufUtil() {}
-
-  private static void recursivelyAddDescriptors(
-      Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry registry) {
-    if (descriptors.contains(message)) {
-      return;
-    }
-    descriptors.add(message);
-
-    for (FieldDescriptor f : message.getFields()) {
-      recursivelyAddDescriptors(f, descriptors, registry);
-    }
-    for (FieldDescriptor f : message.getExtensions()) {
-      recursivelyAddDescriptors(f, descriptors, registry);
-    }
-    for (ExtensionInfo info :
-        registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) {
-      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
-    }
-    for (ExtensionInfo info :
-        registry.getAllMutableExtensionsByExtendedType(message.getFullName())) {
-      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
-    }
-  }
-
-  private static void recursivelyAddDescriptors(
-      FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry registry) {
-    switch (field.getType()) {
-      case BOOL:
-      case BYTES:
-      case DOUBLE:
-      case ENUM:
-      case FIXED32:
-      case FIXED64:
-      case FLOAT:
-      case INT32:
-      case INT64:
-      case SFIXED32:
-      case SFIXED64:
-      case SINT32:
-      case SINT64:
-      case STRING:
-      case UINT32:
-      case UINT64:
-        // Primitive types do not transitively access anything else.
-        break;
-
-      case GROUP:
-      case MESSAGE:
-        // Recursively adds all the fields from this nested Message.
-        recursivelyAddDescriptors(field.getMessageType(), descriptors, registry);
-        break;
-
-      default:
-        throw new UnsupportedOperationException(
-            "Unexpected Protocol Buffers field type: " + field.getType());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java
deleted file mode 100644
index bd16484..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines a {@link org.apache.beam.sdk.coders.Coder}
- * for Protocol Buffers messages, {@code ProtoCoder}.
- *
- * @see org.apache.beam.sdk.coders.protobuf.ProtoCoder
- */
-package org.apache.beam.sdk.coders.protobuf;

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/proto/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/proto/README.md b/sdks/java/core/src/main/proto/README.md
deleted file mode 100644
index b6d91df..0000000
--- a/sdks/java/core/src/main/proto/README.md
+++ /dev/null
@@ -1,45 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-
-## Protocol Buffers in Apache Beam
-
-This directory contains the Protocol Buffer messages used in Apache Beam.
-
-They aren't, however, used during the Maven build process, and are included here
-for completeness only. Instead, the following artifact on Maven Central contains
-the binary version of the generated code from these Protocol Buffers:
-
-    <dependency>
-      <groupId>com.google.cloud.dataflow</groupId>
-      <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
-      <version>LATEST</version>
-    </dependency>
-
-Please follow this process for testing changes:
-
-* Make changes to the Protocol Buffer messages in this directory.
-* Use `protoc` to generate the new code, and compile it into a new Java library.
-* Install that Java library into your local Maven repository.
-* Update SDK's `pom.xml` to pick up the newly installed library, instead of
-downloading it from Maven Central.
-
-Once the changes are ready for submission, please separate them into two
-commits. The first commit should update the Protocol Buffer messages only. After
-that, we need to update the generated artifact on Maven Central. Finally,
-changes that make use of the Protocol Buffer changes may be committed.

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto b/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto
deleted file mode 100644
index b1abe46..0000000
--- a/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffer messages used for testing Proto2Coder implementation.
- */
-
-syntax = "proto2";
-
-package proto2_coder_test_messages;
-
-option java_package = "org.apache.beam.sdk.coders";
-
-message MessageA {
-  optional string field1 = 1;
-  repeated MessageB field2 = 2;
-}
-
-message MessageB {
-  optional bool field1 = 1;
-}
-
-message MessageC {
-  extensions 100 to 105;
-}
-
-extend MessageC {
-  optional MessageA field1 = 101;
-  optional MessageB field2 = 102;
-}
-
-message MessageWithMap {
-  map<string, MessageA> field1 = 1;
-}
-
-message ReferencesMessageWithMap {
-  repeated MessageWithMap field1 = 1;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 10e011f..616e88e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -22,10 +22,8 @@ import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 
 import com.google.auto.service.AutoService;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.Duration;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -38,7 +36,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.coders.CoderRegistry.IncompatibleCoderException;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -87,17 +84,6 @@ public class CoderRegistryTest {
   }
 
   @Test
-  public void testProtoCoderFallbackCoderProvider() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-
-    // MessageA is a Protocol Buffers test message with syntax 2
-    assertEquals(registry.getDefaultCoder(MessageA.class), ProtoCoder.of(MessageA.class));
-
-    // Duration is a Protocol Buffers default type with syntax 3
-    assertEquals(registry.getDefaultCoder(Duration.class), ProtoCoder.of(Duration.class));
-  }
-
-  @Test
   public void testAvroFallbackCoderProvider() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
     registry.setFallbackCoderProvider(AvroCoder.PROVIDER);

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java
deleted file mode 100644
index 8b889da..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageWithMap;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ProtoCoder}.
- */
-@RunWith(JUnit4.class)
-public class ProtoCoderTest {
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testFactoryMethodAgreement() throws Exception {
-    assertEquals(ProtoCoder.of(new TypeDescriptor<MessageA>() {}), ProtoCoder.of(MessageA.class));
-
-    assertEquals(
-        ProtoCoder.of(new TypeDescriptor<MessageA>() {}),
-        ProtoCoder.coderProvider().getCoder(new TypeDescriptor<MessageA>() {}));
-  }
-
-  @Test
-  public void testProviderCannotProvideCoder() throws Exception {
-    thrown.expect(CannotProvideCoderException.class);
-    thrown.expectMessage("java.lang.Integer is not a subclass of com.google.protobuf.Message");
-
-    ProtoCoder.coderProvider().getCoder(new TypeDescriptor<Integer>() {});
-  }
-
-  @Test
-  public void testCoderEncodeDecodeEqual() throws Exception {
-    MessageA value =
-        MessageA.newBuilder()
-            .setField1("hello")
-            .addField2(MessageB.newBuilder().setField1(true).build())
-            .addField2(MessageB.newBuilder().setField1(false).build())
-            .build();
-    CoderProperties.coderDecodeEncodeEqual(ProtoCoder.of(MessageA.class), value);
-  }
-
-  @Test
-  public void testCoderEncodeDecodeEqualNestedContext() throws Exception {
-    MessageA value1 =
-        MessageA.newBuilder()
-            .setField1("hello")
-            .addField2(MessageB.newBuilder().setField1(true).build())
-            .addField2(MessageB.newBuilder().setField1(false).build())
-            .build();
-    MessageA value2 =
-        MessageA.newBuilder()
-            .setField1("world")
-            .addField2(MessageB.newBuilder().setField1(false).build())
-            .addField2(MessageB.newBuilder().setField1(true).build())
-            .build();
-    CoderProperties.coderDecodeEncodeEqual(
-        ListCoder.of(ProtoCoder.of(MessageA.class)), ImmutableList.of(value1, value2));
-  }
-
-  @Test
-  public void testCoderEncodeDecodeExtensionsEqual() throws Exception {
-    MessageC value =
-        MessageC.newBuilder()
-            .setExtension(
-                Proto2CoderTestMessages.field1,
-                MessageA.newBuilder()
-                    .setField1("hello")
-                    .addField2(MessageB.newBuilder().setField1(true).build())
-                    .build())
-            .setExtension(
-                Proto2CoderTestMessages.field2, MessageB.newBuilder().setField1(false).build())
-            .build();
-    CoderProperties.coderDecodeEncodeEqual(
-        ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class), value);
-  }
-
-  @Test
-  public void testCoderSerialization() throws Exception {
-    ProtoCoder<MessageA> coder = ProtoCoder.of(MessageA.class);
-    CoderProperties.coderSerializable(coder);
-  }
-
-  @Test
-  public void testCoderExtensionsSerialization() throws Exception {
-    ProtoCoder<MessageC> coder =
-        ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class);
-    CoderProperties.coderSerializable(coder);
-  }
-
-  @Test
-  public void testEncodingId() throws Exception {
-    Coder<MessageA> coderA = ProtoCoder.of(MessageA.class);
-    CoderProperties.coderHasEncodingId(coderA, MessageA.class.getName() + "[]");
-
-    ProtoCoder<MessageC> coder =
-        ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class);
-    CoderProperties.coderHasEncodingId(
-        coder,
-        String.format("%s[%s]", MessageC.class.getName(), Proto2CoderTestMessages.class.getName()));
-  }
-
-  @Test
-  public void encodeNullThrowsCoderException() throws Exception {
-    thrown.expect(CoderException.class);
-    thrown.expectMessage("cannot encode a null MessageA");
-
-    CoderUtils.encodeToBase64(ProtoCoder.of(MessageA.class), null);
-  }
-
-  @Test
-  public void testDeterministicCoder() throws NonDeterministicException {
-    Coder<MessageA> coder = ProtoCoder.of(MessageA.class);
-    coder.verifyDeterministic();
-  }
-
-  @Test
-  public void testNonDeterministicCoder() throws NonDeterministicException {
-    thrown.expect(NonDeterministicException.class);
-    thrown.expectMessage(MessageWithMap.class.getName() + " transitively includes Map field");
-
-    Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class);
-    coder.verifyDeterministic();
-  }
-
-  @Test
-  public void testNonDeterministicProperty() throws CoderException {
-    MessageWithMap.Builder msg1B = MessageWithMap.newBuilder();
-    MessageWithMap.Builder msg2B = MessageWithMap.newBuilder();
-
-    // Built in reverse order but with equal contents.
-    for (int i = 0; i < 10; ++i) {
-      msg1B.getMutableField1().put("key" + i, MessageA.getDefaultInstance());
-      msg2B.getMutableField1().put("key" + (9 - i), MessageA.getDefaultInstance());
-    }
-
-    // Assert the messages are equal.
-    MessageWithMap msg1 = msg1B.build();
-    MessageWithMap msg2 = msg2B.build();
-    assertEquals(msg2, msg1);
-
-    // Assert the encoded messages are not equal.
-    Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class);
-    assertNotEquals(CoderUtils.encodeToBase64(coder, msg2), CoderUtils.encodeToBase64(coder, msg1));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
deleted file mode 100644
index 1408048..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static org.apache.beam.sdk.coders.protobuf.ProtobufUtil.checkProto2Syntax;
-import static org.apache.beam.sdk.coders.protobuf.ProtobufUtil.getRecursiveDescriptorsForClass;
-import static org.apache.beam.sdk.coders.protobuf.ProtobufUtil.verifyDeterministic;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageWithMap;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.ReferencesMessageWithMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.protobuf.Any;
-import com.google.protobuf.Descriptors.GenericDescriptor;
-import com.google.protobuf.Duration;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ProtobufUtil}.
- */
-@RunWith(JUnit4.class)
-public class ProtobufUtilTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  private static final Set<String> MESSAGE_A_ONLY =
-      ImmutableSet.of("proto2_coder_test_messages.MessageA");
-
-  private static final Set<String> MESSAGE_B_ONLY =
-      ImmutableSet.of("proto2_coder_test_messages.MessageB");
-
-  private static final Set<String> MESSAGE_C_ONLY =
-      ImmutableSet.of("proto2_coder_test_messages.MessageC");
-
-  // map fields are actually represented as a nested Message in generated Java code.
-  private static final Set<String> WITH_MAP_ONLY =
-      ImmutableSet.of(
-          "proto2_coder_test_messages.MessageWithMap",
-          "proto2_coder_test_messages.MessageWithMap.Field1Entry");
-
-  private static final Set<String> REFERS_MAP_ONLY =
-      ImmutableSet.of("proto2_coder_test_messages.ReferencesMessageWithMap");
-
-  // A references A and B.
-  private static final Set<String> MESSAGE_A_ALL = Sets.union(MESSAGE_A_ONLY, MESSAGE_B_ONLY);
-
-  // C, only with registered extensions, references A.
-  private static final Set<String> MESSAGE_C_EXT = Sets.union(MESSAGE_C_ONLY, MESSAGE_A_ALL);
-
-  // MessageWithMap references A.
-  private static final Set<String> WITH_MAP_ALL = Sets.union(WITH_MAP_ONLY, MESSAGE_A_ALL);
-
-  // ReferencesMessageWithMap references MessageWithMap.
-  private static final Set<String> REFERS_MAP_ALL = Sets.union(REFERS_MAP_ONLY, WITH_MAP_ALL);
-
-  @Test
-  public void testRecursiveDescriptorsMessageA() {
-    assertThat(getRecursiveDescriptorFullNames(MessageA.class), equalTo(MESSAGE_A_ALL));
-  }
-
-  @Test
-  public void testRecursiveDescriptorsMessageB() {
-    assertThat(getRecursiveDescriptorFullNames(MessageB.class), equalTo(MESSAGE_B_ONLY));
-  }
-
-  @Test
-  public void testRecursiveDescriptorsMessageC() {
-    assertThat(getRecursiveDescriptorFullNames(MessageC.class), equalTo(MESSAGE_C_ONLY));
-  }
-
-  @Test
-  public void testRecursiveDescriptorsMessageCWithExtensions() {
-    // With extensions, Message C has a reference to Message A and Message B.
-    ExtensionRegistry registry = ExtensionRegistry.newInstance();
-    Proto2CoderTestMessages.registerAllExtensions(registry);
-    assertThat(getRecursiveDescriptorFullNames(MessageC.class, registry), equalTo(MESSAGE_C_EXT));
-  }
-
-  @Test
-  public void testRecursiveDescriptorsMessageWithMap() {
-    assertThat(getRecursiveDescriptorFullNames(MessageWithMap.class), equalTo(WITH_MAP_ALL));
-  }
-
-  @Test
-  public void testRecursiveDescriptorsReferencesMessageWithMap() {
-    assertThat(
-        getRecursiveDescriptorFullNames(ReferencesMessageWithMap.class), equalTo(REFERS_MAP_ALL));
-  }
-
-  @Test
-  public void testVerifyProto2() {
-    checkProto2Syntax(MessageA.class, ExtensionRegistry.getEmptyRegistry());
-    checkProto2Syntax(MessageB.class, ExtensionRegistry.getEmptyRegistry());
-    checkProto2Syntax(MessageC.class, ExtensionRegistry.getEmptyRegistry());
-    checkProto2Syntax(MessageWithMap.class, ExtensionRegistry.getEmptyRegistry());
-    checkProto2Syntax(ReferencesMessageWithMap.class, ExtensionRegistry.getEmptyRegistry());
-  }
-
-  @Test
-  public void testAnyIsNotProto2() {
-    // Any is a core Protocol Buffers type that uses proto3 syntax.
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(Any.class.getCanonicalName());
-    thrown.expectMessage("in file " + Any.getDescriptor().getFile().getName());
-
-    checkProto2Syntax(Any.class, ExtensionRegistry.getEmptyRegistry());
-  }
-
-  @Test
-  public void testDurationIsNotProto2() {
-    // Duration is a core Protocol Buffers type that uses proto3 syntax.
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(Duration.class.getCanonicalName());
-    thrown.expectMessage("in file " + Duration.getDescriptor().getFile().getName());
-
-    checkProto2Syntax(Duration.class, ExtensionRegistry.getEmptyRegistry());
-  }
-
-  @Test
-  public void testDurationIsDeterministic() throws NonDeterministicException {
-    // Duration can be encoded deterministically.
-    verifyDeterministic(ProtoCoder.of(Duration.class));
-  }
-
-  @Test
-  public void testMessageWithMapIsNotDeterministic() throws NonDeterministicException {
-    String mapFieldName = MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName();
-    thrown.expect(NonDeterministicException.class);
-    thrown.expectMessage(MessageWithMap.class.getName());
-    thrown.expectMessage("transitively includes Map field " + mapFieldName);
-    thrown.expectMessage("file " + MessageWithMap.getDescriptor().getFile().getName());
-
-    verifyDeterministic(ProtoCoder.of(MessageWithMap.class));
-  }
-
-  @Test
-  public void testMessageWithTransitiveMapIsNotDeterministic() throws NonDeterministicException {
-    String mapFieldName = MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName();
-    thrown.expect(NonDeterministicException.class);
-    thrown.expectMessage(ReferencesMessageWithMap.class.getName());
-    thrown.expectMessage("transitively includes Map field " + mapFieldName);
-    thrown.expectMessage("file " + MessageWithMap.getDescriptor().getFile().getName());
-
-    verifyDeterministic(ProtoCoder.of(ReferencesMessageWithMap.class));
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////
-
-  /** Helper used to test the recursive class traversal and print good error messages. */
-  private static Set<String> getRecursiveDescriptorFullNames(Class<? extends Message> clazz) {
-    return getRecursiveDescriptorFullNames(clazz, ExtensionRegistry.getEmptyRegistry());
-  }
-
-  /** Helper used to test the recursive class traversal and print good error messages. */
-  private static Set<String> getRecursiveDescriptorFullNames(
-      Class<? extends Message> clazz, ExtensionRegistry registry) {
-    Set<String> result = new HashSet<>();
-    for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) {
-      result.add(d.getFullName());
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index dde8be5..8a48eca 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -35,6 +35,7 @@
     <module>gcp-core</module>
     <module>jackson</module>
     <module>join-library</module>
+    <module>protobuf</module>
     <module>sorter</module>
   </modules>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/pom.xml b/sdks/java/extensions/protobuf/pom.xml
new file mode 100644
index 0000000..9a54254
--- /dev/null
+++ b/sdks/java/extensions/protobuf/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-extensions-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-extensions-protobuf</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Extensions :: Protobuf</name>
+  <description>Add support to Apache Beam for Google Protobuf.</description>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.xolstice.maven.plugins</groupId>
+        <artifactId>protobuf-maven-plugin</artifactId>
+        <configuration>
+          <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <!-- build dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
new file mode 100644
index 0000000..99a0838
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.Structs;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link Coder} using Google Protocol Buffers binary format. {@link ProtoCoder} supports both
+ * Protocol Buffers syntax versions 2 and 3.
+ *
+ * <p>To learn more about Protocol Buffers, visit:
+ * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a>
+ *
+ * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as the default
+ * {@link Coder} for any {@link Message} object. Custom message extensions are also supported, but
+ * these extensions must be registered for a particular {@link ProtoCoder} instance and that
+ * instance must be registered on the {@link PCollection} that needs the extensions:
+ *
+ * <pre>{@code
+ * import MyProtoFile;
+ * import MyProtoFile.MyMessage;
+ *
+ * Coder<MyMessage> coder = ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class);
+ * PCollection<MyMessage> records =  input.apply(...).setCoder(coder);
+ * }</pre>
+ *
+ * <h3>Versioning</h3>
+ *
+ * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol Buffers syntax. However,
+ * the Java runtime version of the <code>google.com.protobuf</code> library must match exactly the
+ * version of <code>protoc</code> that was used to produce the JAR files containing the compiled
+ * <code>.proto</code> messages.
+ *
+ * <p>For more information, see the
+ * <a href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types">Protocol Buffers documentation</a>.
+ *
+ * <h3>{@link ProtoCoder} and Determinism</h3>
+ *
+ * <p>In general, Protocol Buffers messages can be encoded deterministically within a single
+ * pipeline as long as:
+ *
+ * <ul>
+ * <li>The encoded messages (and any transitively linked messages) do not use <code>map</code>
+ *     fields.</li>
+ * <li>Every Java VM that encodes or decodes the messages use the same runtime version of the
+ *     Protocol Buffers library and the same compiled <code>.proto</code> file JAR.</li>
+ * </ul>
+ *
+ * <h3>{@link ProtoCoder} and Encoding Stability</h3>
+ *
+ * <p>When changing Protocol Buffers messages, follow the rules in the Protocol Buffers language
+ * guides for
+ * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">{@code proto2}</a>
+ * and
+ * <a href="https://developers.google.com/protocol-buffers/docs/proto3#updating">{@code proto3}</a>
+ * syntaxes, depending on your message type. Following these guidelines will ensure that the
+ * old encoded data can be read by new versions of the code.
+ *
+ * <p>Generally, any change to the message type, registered extensions, runtime library, or
+ * compiled proto JARs may change the encoding. Thus even if both the original and updated messages
+ * can be encoded deterministically within a single job, these deterministic encodings may not be
+ * the same across jobs.
+ *
+ * @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}.
+ */
+public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
+
+  /**
+   * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
+   * {@link ExtensionRegistry}.
+   */
+  public static CoderProvider coderProvider() {
+    return PROVIDER;
+  }
+
+  /**
+   * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
+   */
+  public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) {
+    return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of());
+  }
+
+  /**
+   * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} indicated by the given
+   * {@link TypeDescriptor}.
+   */
+  public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> protoMessageType) {
+    @SuppressWarnings("unchecked")
+    Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
+    return of(protoMessageClass);
+  }
+
+  /**
+   * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes
+   * registered.
+   *
+   * <p>Each of the extension host classes must be an class automatically generated by the
+   * Protocol Buffers compiler, {@code protoc}, that contains messages.
+   *
+   * <p>Does not modify this object.
+   */
+  public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) {
+    for (Class<?> extensionHost : moreExtensionHosts) {
+      // Attempt to access the required method, to make sure it's present.
+      try {
+        Method registerAllExtensions =
+            extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
+        checkArgument(
+            Modifier.isStatic(registerAllExtensions.getModifiers()),
+            "Method registerAllExtensions() must be static");
+      } catch (NoSuchMethodException | SecurityException e) {
+        throw new IllegalArgumentException(
+            String.format("Unable to register extensions for %s", extensionHost.getCanonicalName()),
+            e);
+      }
+    }
+
+    return new ProtoCoder<>(
+        protoMessageClass,
+        new ImmutableSet.Builder<Class<?>>()
+            .addAll(extensionHostClasses)
+            .addAll(moreExtensionHosts)
+            .build());
+  }
+
+  /**
+   * See {@link #withExtensionsFrom(Iterable)}.
+   *
+   * <p>Does not modify this object.
+   */
+  public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) {
+    return withExtensionsFrom(Arrays.asList(moreExtensionHosts));
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream, Context context) throws IOException {
+    if (value == null) {
+      throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
+    }
+    if (context.isWholeStream) {
+      value.writeTo(outStream);
+    } else {
+      value.writeDelimitedTo(outStream);
+    }
+  }
+
+  @Override
+  public T decode(InputStream inStream, Context context) throws IOException {
+    if (context.isWholeStream) {
+      return getParser().parseFrom(inStream, getExtensionRegistry());
+    } else {
+      return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
+    }
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (!(other instanceof ProtoCoder)) {
+      return false;
+    }
+    ProtoCoder<?> otherCoder = (ProtoCoder<?>) other;
+    return protoMessageClass.equals(otherCoder.protoMessageClass)
+        && Sets.newHashSet(extensionHostClasses)
+            .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(protoMessageClass, extensionHostClasses);
+  }
+
+  /**
+   * The encoding identifier is designed to support evolution as per the design of Protocol
+   * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol
+   * Buffers documentation at
+   * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating
+   * A Message Type</a>.
+   *
+   * <p>In particular, the encoding identifier is guaranteed to be the same for {@link ProtoCoder}
+   * instances of the same principal message class, with the same registered extension host classes,
+   * and otherwise distinct. Note that the encoding ID does not encode any version of the message
+   * or extensions, nor does it include the message schema.
+   *
+   * <p>When modifying a message class, here are the broadest guidelines; see the above link
+   * for greater detail.
+   *
+   * <ul>
+   * <li>Do not change the numeric tags for any fields.
+   * <li>Never remove a <code>required</code> field.
+   * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults.
+   * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure
+   * the new and old types are interchangeable.
+   * </ul>
+   *
+   * <p>Code consuming this message class should be prepared to support <i>all</i> versions of
+   * the class until it is certain that no remaining serialized instances exist.
+   *
+   * <p>If backwards incompatible changes must be made, the best recourse is to change the name
+   * of your Protocol Buffers message class.
+   */
+  @Override
+  public String getEncodingId() {
+    return protoMessageClass.getName() + getSortedExtensionClasses().toString();
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    ProtobufUtil.verifyDeterministic(this);
+  }
+
+  /**
+   * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} supports.
+   */
+  public Class<T> getMessageType() {
+    return protoMessageClass;
+  }
+
+  /**
+   * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages
+   * to {@code T} registered with this {@link ProtoCoder}.
+   */
+  public ExtensionRegistry getExtensionRegistry() {
+    if (memoizedExtensionRegistry == null) {
+      ExtensionRegistry registry = ExtensionRegistry.newInstance();
+      for (Class<?> extensionHost : extensionHostClasses) {
+        try {
+          extensionHost
+              .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class)
+              .invoke(null, registry);
+        } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+          throw new IllegalStateException(e);
+        }
+      }
+      memoizedExtensionRegistry = registry.getUnmodifiable();
+    }
+    return memoizedExtensionRegistry;
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////
+  // Private implementation details below.
+
+  /** The {@link Message} type to be coded. */
+  private final Class<T> protoMessageClass;
+
+  /**
+   * All extension host classes included in this {@link ProtoCoder}. The extensions from these
+   * classes will be included in the {@link ExtensionRegistry} used during encoding and decoding.
+   */
+  private final Set<Class<?>> extensionHostClasses;
+
+  // Constants used to serialize and deserialize
+  private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
+  private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
+
+  // Transient fields that are lazy initialized and then memoized.
+  private transient ExtensionRegistry memoizedExtensionRegistry;
+  private transient Parser<T> memoizedParser;
+
+  /** Private constructor. */
+  private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> extensionHostClasses) {
+    this.protoMessageClass = protoMessageClass;
+    this.extensionHostClasses = extensionHostClasses;
+  }
+
+  /**
+   * @deprecated For JSON deserialization only.
+   */
+  @JsonCreator
+  @Deprecated
+  public static <T extends Message> ProtoCoder<T> of(
+      @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
+      @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) {
+
+    try {
+      @SuppressWarnings("unchecked")
+      Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName);
+      List<Class<?>> extensionHostClasses = Lists.newArrayList();
+      if (extensionHostClassNames != null) {
+        for (String extensionHostClassName : extensionHostClassNames) {
+          extensionHostClasses.add(Class.forName(extensionHostClassName));
+        }
+      }
+      return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  @Override
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
+    Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
+    List<CloudObject> extensionHostClassNames = Lists.newArrayList();
+    for (String className : getSortedExtensionClasses()) {
+      extensionHostClassNames.add(CloudObject.forString(className));
+    }
+    Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
+    return result;
+  }
+
+  /** Get the memoized {@link Parser}, possibly initializing it lazily. */
+  private Parser<T> getParser() {
+    if (memoizedParser == null) {
+      try {
+        @SuppressWarnings("unchecked")
+        T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null);
+        @SuppressWarnings("unchecked")
+        Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType();
+        memoizedParser = tParser;
+      } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
+    return memoizedParser;
+  }
+
+  static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {};
+
+  /**
+   * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
+   * {@link #coderProvider()}.
+   */
+  private static final CoderProvider PROVIDER =
+      new CoderProvider() {
+        @Override
+        public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
+          if (!type.isSubtypeOf(CHECK)) {
+            throw new CannotProvideCoderException(
+                String.format(
+                    "Cannot provide %s because %s is not a subclass of %s",
+                    ProtoCoder.class.getSimpleName(),
+                    type,
+                    Message.class.getName()));
+          }
+
+          @SuppressWarnings("unchecked")
+          TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type;
+          try {
+            @SuppressWarnings("unchecked")
+            Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
+            return coder;
+          } catch (IllegalArgumentException e) {
+            throw new CannotProvideCoderException(e);
+          }
+        }
+      };
+
+  private SortedSet<String> getSortedExtensionClasses() {
+    SortedSet<String> ret = new TreeSet<>();
+    for (Class<?> clazz : extensionHostClasses) {
+      ret.add(clazz.getName());
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java
new file mode 100644
index 0000000..68a775a
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
+import com.google.protobuf.Descriptors.GenericDescriptor;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.ExtensionRegistry.ExtensionInfo;
+import com.google.protobuf.Message;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+
+/**
+ * Utility functions for reflecting and analyzing Protocol Buffers classes.
+ *
+ * <p>Used by {@link ProtoCoder}, but in a separate file for testing and isolation.
+ */
+class ProtobufUtil {
+  /**
+   * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message}.
+   *
+   * @throws IllegalArgumentException if there is an error in Java reflection.
+   */
+  static Descriptor getDescriptorForClass(Class<? extends Message> clazz) {
+    try {
+      return (Descriptor) clazz.getMethod("getDescriptor").invoke(null);
+    } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  /**
+   * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message} as well as
+   * every class it can include transitively.
+   *
+   * @throws IllegalArgumentException if there is an error in Java reflection.
+   */
+  static Set<Descriptor> getRecursiveDescriptorsForClass(
+      Class<? extends Message> clazz, ExtensionRegistry registry) {
+    Descriptor root = getDescriptorForClass(clazz);
+    Set<Descriptor> descriptors = new HashSet<>();
+    recursivelyAddDescriptors(root, descriptors, registry);
+    return descriptors;
+  }
+
+  /**
+   * Recursively walks the given {@link Message} class and verifies that every field or message
+   * linked in uses the Protocol Buffers proto2 syntax.
+   */
+  static void checkProto2Syntax(Class<? extends Message> clazz, ExtensionRegistry registry) {
+    for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) {
+      Syntax s = d.getFile().getSyntax();
+      checkArgument(
+          s == Syntax.PROTO2,
+          "Message %s or one of its dependencies does not use proto2 syntax: %s in file %s",
+          clazz.getName(),
+          d.getFullName(),
+          d.getFile().getName());
+    }
+  }
+
+  /**
+   * Recursively checks whether the specified class uses any Protocol Buffers fields that cannot
+   * be deterministically encoded.
+   *
+   * @throws NonDeterministicException if the object cannot be encoded deterministically.
+   */
+  static void verifyDeterministic(ProtoCoder<?> coder) throws NonDeterministicException {
+    Class<? extends Message> message = coder.getMessageType();
+    ExtensionRegistry registry = coder.getExtensionRegistry();
+    Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, registry);
+    for (Descriptor d : descriptors) {
+      for (FieldDescriptor fd : d.getFields()) {
+        // If there is a transitively reachable Protocol Buffers map field, then this object cannot
+        // be encoded deterministically.
+        if (fd.isMapField()) {
+          String reason =
+              String.format(
+                  "Protocol Buffers message %s transitively includes Map field %s (from file %s)."
+                      + " Maps cannot be deterministically encoded.",
+                  message.getName(),
+                  fd.getFullName(),
+                  fd.getFile().getFullName());
+          throw new NonDeterministicException(coder, reason);
+        }
+      }
+    }
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////////
+  // Disable construction of utility class
+  private ProtobufUtil() {}
+
+  private static void recursivelyAddDescriptors(
+      Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry registry) {
+    if (descriptors.contains(message)) {
+      return;
+    }
+    descriptors.add(message);
+
+    for (FieldDescriptor f : message.getFields()) {
+      recursivelyAddDescriptors(f, descriptors, registry);
+    }
+    for (FieldDescriptor f : message.getExtensions()) {
+      recursivelyAddDescriptors(f, descriptors, registry);
+    }
+    for (ExtensionInfo info :
+        registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) {
+      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
+    }
+    for (ExtensionInfo info :
+        registry.getAllMutableExtensionsByExtendedType(message.getFullName())) {
+      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
+    }
+  }
+
+  private static void recursivelyAddDescriptors(
+      FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry registry) {
+    switch (field.getType()) {
+      case BOOL:
+      case BYTES:
+      case DOUBLE:
+      case ENUM:
+      case FIXED32:
+      case FIXED64:
+      case FLOAT:
+      case INT32:
+      case INT64:
+      case SFIXED32:
+      case SFIXED64:
+      case SINT32:
+      case SINT64:
+      case STRING:
+      case UINT32:
+      case UINT64:
+        // Primitive types do not transitively access anything else.
+        break;
+
+      case GROUP:
+      case MESSAGE:
+        // Recursively adds all the fields from this nested Message.
+        recursivelyAddDescriptors(field.getMessageType(), descriptors, registry);
+        break;
+
+      default:
+        throw new UnsupportedOperationException(
+            "Unexpected Protocol Buffers field type: " + field.getType());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java
new file mode 100644
index 0000000..b69bc8b
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines a {@link org.apache.beam.sdk.coders.Coder}
+ * for Protocol Buffers messages, {@code ProtoCoder}.
+ *
+ * @see org.apache.beam.sdk.extensions.protobuf.ProtoCoder
+ */
+package org.apache.beam.sdk.extensions.protobuf;