You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/24 22:04:44 UTC
[2/3] beam git commit: [BEAM-1871] Move ProtoCoder to new
sdks/java/extensions/protobuf package.
[BEAM-1871] Move ProtoCoder to new sdks/java/extensions/protobuf package.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff1fe7fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff1fe7fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff1fe7fa
Branch: refs/heads/master
Commit: ff1fe7fa53816fe4327c7572c13a616fc4243dc9
Parents: d7e7af8
Author: Luke Cwik <lc...@google.com>
Authored: Mon Apr 24 14:32:21 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Apr 24 15:04:13 2017 -0700
----------------------------------------------------------------------
sdks/java/core/pom.xml | 6 -
.../apache/beam/sdk/coders/CoderRegistry.java | 10 +-
.../beam/sdk/coders/StringDelegateCoder.java | 4 +-
.../beam/sdk/coders/protobuf/ProtoCoder.java | 405 -------------------
.../beam/sdk/coders/protobuf/ProtobufUtil.java | 171 --------
.../beam/sdk/coders/protobuf/package-info.java | 24 --
sdks/java/core/src/main/proto/README.md | 45 ---
.../main/proto/proto2_coder_test_messages.proto | 53 ---
.../beam/sdk/coders/CoderRegistryTest.java | 14 -
.../sdk/coders/protobuf/ProtoCoderTest.java | 182 ---------
.../sdk/coders/protobuf/ProtobufUtilTest.java | 192 ---------
sdks/java/extensions/pom.xml | 1 +
sdks/java/extensions/protobuf/pom.xml | 142 +++++++
.../sdk/extensions/protobuf/ProtoCoder.java | 405 +++++++++++++++++++
.../sdk/extensions/protobuf/ProtobufUtil.java | 171 ++++++++
.../sdk/extensions/protobuf/package-info.java | 24 ++
.../sdk/extensions/protobuf/ProtoCoderTest.java | 181 +++++++++
.../extensions/protobuf/ProtobufUtilTest.java | 191 +++++++++
.../test/proto/proto2_coder_test_messages.proto | 53 +++
19 files changed, 1173 insertions(+), 1101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index ac7a3bb..6c46453 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -294,12 +294,6 @@
</dependency>
<dependency>
- <groupId>com.google.cloud.dataflow</groupId>
- <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.21</version>
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 4238293..e0b2b3a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -44,7 +44,6 @@ import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.CoderUtils;
@@ -77,9 +76,7 @@ import org.slf4j.LoggerFactory;
* the default {@code Coder} type. The {@link Coder} class must satisfy the requirements
* of {@link CoderProviders#fromStaticMethods}.
* <li>Fallback: A fallback {@link CoderProvider} is used to attempt to provide a {@link Coder}
- * for any type. By default, there are two chained fallback coders:
- * {@link ProtoCoder#coderProvider}, which can provide a coder to efficiently serialize any
- * Protocol Buffers message, and then {@link SerializableCoder#PROVIDER}, which can provide a
+ * for any type. By default, there is {@link SerializableCoder#PROVIDER}, which can provide a
* {@link Coder} for any type that is serializable via Java serialization. The fallback
* {@link CoderProvider} can be get and set respectively using
* {@link #getFallbackCoderProvider()} and {@link #setFallbackCoderProvider}. Multiple
@@ -165,7 +162,7 @@ public class CoderRegistry implements CoderProvider {
private CoderRegistry() {
coderFactoryMap = new HashMap<>(REGISTERED_CODER_FACTORIES_PER_CLASS);
setFallbackCoderProvider(
- CoderProviders.firstOf(ProtoCoder.coderProvider(), SerializableCoder.PROVIDER));
+ CoderProviders.firstOf(SerializableCoder.PROVIDER));
}
/**
@@ -423,8 +420,7 @@ public class CoderRegistry implements CoderProvider {
* providing a {@code Coder<T>} for a type {@code T}, then the registry will attempt to create
* a {@link Coder} using this {@link CoderProvider}.
*
- * <p>By default, this is set to the chain of {@link ProtoCoder#coderProvider()} and
- * {@link SerializableCoder#PROVIDER}.
+ * <p>By default, this is set to {@link SerializableCoder#PROVIDER}.
*
* <p>See {@link #getFallbackCoderProvider}.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index f86369c..51ead3c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -23,7 +23,6 @@ import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
@@ -43,7 +42,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
*
* <p>This method of encoding is not designed for ease of evolution of {@code Clazz};
* it should only be used in cases where the class is stable or the encoding is not
- * important. If evolution of the class is important, see {@link ProtoCoder} or {@link AvroCoder}.
+ * important. If evolution of the class is important, see {@link AvroCoder} or any other
+ * evolution safe encoding.
*
* @param <T> The type of objects coded.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
deleted file mode 100644
index a5f53ff..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Structs;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * A {@link Coder} using Google Protocol Buffers binary format. {@link ProtoCoder} supports both
- * Protocol Buffers syntax versions 2 and 3.
- *
- * <p>To learn more about Protocol Buffers, visit:
- * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a>
- *
- * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as the default
- * {@link Coder} for any {@link Message} object. Custom message extensions are also supported, but
- * these extensions must be registered for a particular {@link ProtoCoder} instance and that
- * instance must be registered on the {@link PCollection} that needs the extensions:
- *
- * <pre>{@code
- * import MyProtoFile;
- * import MyProtoFile.MyMessage;
- *
- * Coder<MyMessage> coder = ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class);
- * PCollection<MyMessage> records = input.apply(...).setCoder(coder);
- * }</pre>
- *
- * <h3>Versioning</h3>
- *
- * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol Buffers syntax. However,
- * the Java runtime version of the <code>google.com.protobuf</code> library must match exactly the
- * version of <code>protoc</code> that was used to produce the JAR files containing the compiled
- * <code>.proto</code> messages.
- *
- * <p>For more information, see the
- * <a href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types">Protocol Buffers documentation</a>.
- *
- * <h3>{@link ProtoCoder} and Determinism</h3>
- *
- * <p>In general, Protocol Buffers messages can be encoded deterministically within a single
- * pipeline as long as:
- *
- * <ul>
- * <li>The encoded messages (and any transitively linked messages) do not use <code>map</code>
- * fields.</li>
- * <li>Every Java VM that encodes or decodes the messages use the same runtime version of the
- * Protocol Buffers library and the same compiled <code>.proto</code> file JAR.</li>
- * </ul>
- *
- * <h3>{@link ProtoCoder} and Encoding Stability</h3>
- *
- * <p>When changing Protocol Buffers messages, follow the rules in the Protocol Buffers language
- * guides for
- * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">{@code proto2}</a>
- * and
- * <a href="https://developers.google.com/protocol-buffers/docs/proto3#updating">{@code proto3}</a>
- * syntaxes, depending on your message type. Following these guidelines will ensure that the
- * old encoded data can be read by new versions of the code.
- *
- * <p>Generally, any change to the message type, registered extensions, runtime library, or
- * compiled proto JARs may change the encoding. Thus even if both the original and updated messages
- * can be encoded deterministically within a single job, these deterministic encodings may not be
- * the same across jobs.
- *
- * @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}.
- */
-public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
-
- /**
- * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
- * {@link ExtensionRegistry}.
- */
- public static CoderProvider coderProvider() {
- return PROVIDER;
- }
-
- /**
- * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
- */
- public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) {
- return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of());
- }
-
- /**
- * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} indicated by the given
- * {@link TypeDescriptor}.
- */
- public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> protoMessageType) {
- @SuppressWarnings("unchecked")
- Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
- return of(protoMessageClass);
- }
-
- /**
- * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes
- * registered.
- *
- * <p>Each of the extension host classes must be an class automatically generated by the
- * Protocol Buffers compiler, {@code protoc}, that contains messages.
- *
- * <p>Does not modify this object.
- */
- public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) {
- for (Class<?> extensionHost : moreExtensionHosts) {
- // Attempt to access the required method, to make sure it's present.
- try {
- Method registerAllExtensions =
- extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
- checkArgument(
- Modifier.isStatic(registerAllExtensions.getModifiers()),
- "Method registerAllExtensions() must be static");
- } catch (NoSuchMethodException | SecurityException e) {
- throw new IllegalArgumentException(
- String.format("Unable to register extensions for %s", extensionHost.getCanonicalName()),
- e);
- }
- }
-
- return new ProtoCoder<>(
- protoMessageClass,
- new ImmutableSet.Builder<Class<?>>()
- .addAll(extensionHostClasses)
- .addAll(moreExtensionHosts)
- .build());
- }
-
- /**
- * See {@link #withExtensionsFrom(Iterable)}.
- *
- * <p>Does not modify this object.
- */
- public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) {
- return withExtensionsFrom(Arrays.asList(moreExtensionHosts));
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context) throws IOException {
- if (value == null) {
- throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
- }
- if (context.isWholeStream) {
- value.writeTo(outStream);
- } else {
- value.writeDelimitedTo(outStream);
- }
- }
-
- @Override
- public T decode(InputStream inStream, Context context) throws IOException {
- if (context.isWholeStream) {
- return getParser().parseFrom(inStream, getExtensionRegistry());
- } else {
- return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
- }
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (!(other instanceof ProtoCoder)) {
- return false;
- }
- ProtoCoder<?> otherCoder = (ProtoCoder<?>) other;
- return protoMessageClass.equals(otherCoder.protoMessageClass)
- && Sets.newHashSet(extensionHostClasses)
- .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(protoMessageClass, extensionHostClasses);
- }
-
- /**
- * The encoding identifier is designed to support evolution as per the design of Protocol
- * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol
- * Buffers documentation at
- * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating
- * A Message Type</a>.
- *
- * <p>In particular, the encoding identifier is guaranteed to be the same for {@link ProtoCoder}
- * instances of the same principal message class, with the same registered extension host classes,
- * and otherwise distinct. Note that the encoding ID does not encode any version of the message
- * or extensions, nor does it include the message schema.
- *
- * <p>When modifying a message class, here are the broadest guidelines; see the above link
- * for greater detail.
- *
- * <ul>
- * <li>Do not change the numeric tags for any fields.
- * <li>Never remove a <code>required</code> field.
- * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults.
- * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure
- * the new and old types are interchangeable.
- * </ul>
- *
- * <p>Code consuming this message class should be prepared to support <i>all</i> versions of
- * the class until it is certain that no remaining serialized instances exist.
- *
- * <p>If backwards incompatible changes must be made, the best recourse is to change the name
- * of your Protocol Buffers message class.
- */
- @Override
- public String getEncodingId() {
- return protoMessageClass.getName() + getSortedExtensionClasses().toString();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- ProtobufUtil.verifyDeterministic(this);
- }
-
- /**
- * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} supports.
- */
- public Class<T> getMessageType() {
- return protoMessageClass;
- }
-
- /**
- * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages
- * to {@code T} registered with this {@link ProtoCoder}.
- */
- public ExtensionRegistry getExtensionRegistry() {
- if (memoizedExtensionRegistry == null) {
- ExtensionRegistry registry = ExtensionRegistry.newInstance();
- for (Class<?> extensionHost : extensionHostClasses) {
- try {
- extensionHost
- .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class)
- .invoke(null, registry);
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalStateException(e);
- }
- }
- memoizedExtensionRegistry = registry.getUnmodifiable();
- }
- return memoizedExtensionRegistry;
- }
-
- ////////////////////////////////////////////////////////////////////////////////////
- // Private implementation details below.
-
- /** The {@link Message} type to be coded. */
- private final Class<T> protoMessageClass;
-
- /**
- * All extension host classes included in this {@link ProtoCoder}. The extensions from these
- * classes will be included in the {@link ExtensionRegistry} used during encoding and decoding.
- */
- private final Set<Class<?>> extensionHostClasses;
-
- // Constants used to serialize and deserialize
- private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
- private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
-
- // Transient fields that are lazy initialized and then memoized.
- private transient ExtensionRegistry memoizedExtensionRegistry;
- private transient Parser<T> memoizedParser;
-
- /** Private constructor. */
- private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> extensionHostClasses) {
- this.protoMessageClass = protoMessageClass;
- this.extensionHostClasses = extensionHostClasses;
- }
-
- /**
- * @deprecated For JSON deserialization only.
- */
- @JsonCreator
- @Deprecated
- public static <T extends Message> ProtoCoder<T> of(
- @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
- @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) {
-
- try {
- @SuppressWarnings("unchecked")
- Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName);
- List<Class<?>> extensionHostClasses = Lists.newArrayList();
- if (extensionHostClassNames != null) {
- for (String extensionHostClassName : extensionHostClassNames) {
- extensionHostClasses.add(Class.forName(extensionHostClassName));
- }
- }
- return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public CloudObject initializeCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
- Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
- List<CloudObject> extensionHostClassNames = Lists.newArrayList();
- for (String className : getSortedExtensionClasses()) {
- extensionHostClassNames.add(CloudObject.forString(className));
- }
- Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
- return result;
- }
-
- /** Get the memoized {@link Parser}, possibly initializing it lazily. */
- private Parser<T> getParser() {
- if (memoizedParser == null) {
- try {
- @SuppressWarnings("unchecked")
- T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null);
- @SuppressWarnings("unchecked")
- Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType();
- memoizedParser = tParser;
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalArgumentException(e);
- }
- }
- return memoizedParser;
- }
-
- static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {};
-
- /**
- * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
- * {@link #coderProvider()}.
- */
- private static final CoderProvider PROVIDER =
- new CoderProvider() {
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- if (!type.isSubtypeOf(CHECK)) {
- throw new CannotProvideCoderException(
- String.format(
- "Cannot provide %s because %s is not a subclass of %s",
- ProtoCoder.class.getSimpleName(),
- type,
- Message.class.getName()));
- }
-
- @SuppressWarnings("unchecked")
- TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type;
- try {
- @SuppressWarnings("unchecked")
- Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
- return coder;
- } catch (IllegalArgumentException e) {
- throw new CannotProvideCoderException(e);
- }
- }
- };
-
- private SortedSet<String> getSortedExtensionClasses() {
- SortedSet<String> ret = new TreeSet<>();
- for (Class<?> clazz : extensionHostClasses) {
- ret.add(clazz.getName());
- }
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java
deleted file mode 100644
index 77afb47..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.protobuf.Descriptors.Descriptor;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
-import com.google.protobuf.Descriptors.GenericDescriptor;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.ExtensionRegistry.ExtensionInfo;
-import com.google.protobuf.Message;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-
-/**
- * Utility functions for reflecting and analyzing Protocol Buffers classes.
- *
- * <p>Used by {@link ProtoCoder}, but in a separate file for testing and isolation.
- */
-class ProtobufUtil {
- /**
- * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message}.
- *
- * @throws IllegalArgumentException if there is an error in Java reflection.
- */
- static Descriptor getDescriptorForClass(Class<? extends Message> clazz) {
- try {
- return (Descriptor) clazz.getMethod("getDescriptor").invoke(null);
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- /**
- * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message} as well as
- * every class it can include transitively.
- *
- * @throws IllegalArgumentException if there is an error in Java reflection.
- */
- static Set<Descriptor> getRecursiveDescriptorsForClass(
- Class<? extends Message> clazz, ExtensionRegistry registry) {
- Descriptor root = getDescriptorForClass(clazz);
- Set<Descriptor> descriptors = new HashSet<>();
- recursivelyAddDescriptors(root, descriptors, registry);
- return descriptors;
- }
-
- /**
- * Recursively walks the given {@link Message} class and verifies that every field or message
- * linked in uses the Protocol Buffers proto2 syntax.
- */
- static void checkProto2Syntax(Class<? extends Message> clazz, ExtensionRegistry registry) {
- for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) {
- Syntax s = d.getFile().getSyntax();
- checkArgument(
- s == Syntax.PROTO2,
- "Message %s or one of its dependencies does not use proto2 syntax: %s in file %s",
- clazz.getName(),
- d.getFullName(),
- d.getFile().getName());
- }
- }
-
- /**
- * Recursively checks whether the specified class uses any Protocol Buffers fields that cannot
- * be deterministically encoded.
- *
- * @throws NonDeterministicException if the object cannot be encoded deterministically.
- */
- static void verifyDeterministic(ProtoCoder<?> coder) throws NonDeterministicException {
- Class<? extends Message> message = coder.getMessageType();
- ExtensionRegistry registry = coder.getExtensionRegistry();
- Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, registry);
- for (Descriptor d : descriptors) {
- for (FieldDescriptor fd : d.getFields()) {
- // If there is a transitively reachable Protocol Buffers map field, then this object cannot
- // be encoded deterministically.
- if (fd.isMapField()) {
- String reason =
- String.format(
- "Protocol Buffers message %s transitively includes Map field %s (from file %s)."
- + " Maps cannot be deterministically encoded.",
- message.getName(),
- fd.getFullName(),
- fd.getFile().getFullName());
- throw new NonDeterministicException(coder, reason);
- }
- }
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////
- // Disable construction of utility class
- private ProtobufUtil() {}
-
- private static void recursivelyAddDescriptors(
- Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry registry) {
- if (descriptors.contains(message)) {
- return;
- }
- descriptors.add(message);
-
- for (FieldDescriptor f : message.getFields()) {
- recursivelyAddDescriptors(f, descriptors, registry);
- }
- for (FieldDescriptor f : message.getExtensions()) {
- recursivelyAddDescriptors(f, descriptors, registry);
- }
- for (ExtensionInfo info :
- registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) {
- recursivelyAddDescriptors(info.descriptor, descriptors, registry);
- }
- for (ExtensionInfo info :
- registry.getAllMutableExtensionsByExtendedType(message.getFullName())) {
- recursivelyAddDescriptors(info.descriptor, descriptors, registry);
- }
- }
-
- private static void recursivelyAddDescriptors(
- FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry registry) {
- switch (field.getType()) {
- case BOOL:
- case BYTES:
- case DOUBLE:
- case ENUM:
- case FIXED32:
- case FIXED64:
- case FLOAT:
- case INT32:
- case INT64:
- case SFIXED32:
- case SFIXED64:
- case SINT32:
- case SINT64:
- case STRING:
- case UINT32:
- case UINT64:
- // Primitive types do not transitively access anything else.
- break;
-
- case GROUP:
- case MESSAGE:
- // Recursively adds all the fields from this nested Message.
- recursivelyAddDescriptors(field.getMessageType(), descriptors, registry);
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Unexpected Protocol Buffers field type: " + field.getType());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java
deleted file mode 100644
index bd16484..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines a {@link org.apache.beam.sdk.coders.Coder}
- * for Protocol Buffers messages, {@code ProtoCoder}.
- *
- * @see org.apache.beam.sdk.coders.protobuf.ProtoCoder
- */
-package org.apache.beam.sdk.coders.protobuf;
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/proto/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/proto/README.md b/sdks/java/core/src/main/proto/README.md
deleted file mode 100644
index b6d91df..0000000
--- a/sdks/java/core/src/main/proto/README.md
+++ /dev/null
@@ -1,45 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-## Protocol Buffers in Apache Beam
-
-This directory contains the Protocol Buffer messages used in Apache Beam.
-
-They aren't, however, used during the Maven build process, and are included here
-for completeness only. Instead, the following artifact on Maven Central contains
-the binary version of the generated code from these Protocol Buffers:
-
- <dependency>
- <groupId>com.google.cloud.dataflow</groupId>
- <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
- <version>LATEST</version>
- </dependency>
-
-Please follow this process for testing changes:
-
-* Make changes to the Protocol Buffer messages in this directory.
-* Use `protoc` to generate the new code, and compile it into a new Java library.
-* Install that Java library into your local Maven repository.
-* Update SDK's `pom.xml` to pick up the newly installed library, instead of
-downloading it from Maven Central.
-
-Once the changes are ready for submission, please separate them into two
-commits. The first commit should update the Protocol Buffer messages only. After
-that, we need to update the generated artifact on Maven Central. Finally,
-changes that make use of the Protocol Buffer changes may be committed.
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto b/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto
deleted file mode 100644
index b1abe46..0000000
--- a/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffer messages used for testing Proto2Coder implementation.
- */
-
-syntax = "proto2";
-
-package proto2_coder_test_messages;
-
-option java_package = "org.apache.beam.sdk.coders";
-
-message MessageA {
- optional string field1 = 1;
- repeated MessageB field2 = 2;
-}
-
-message MessageB {
- optional bool field1 = 1;
-}
-
-message MessageC {
- extensions 100 to 105;
-}
-
-extend MessageC {
- optional MessageA field1 = 101;
- optional MessageB field2 = 102;
-}
-
-message MessageWithMap {
- map<string, MessageA> field1 = 1;
-}
-
-message ReferencesMessageWithMap {
- repeated MessageWithMap field1 = 1;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 10e011f..616e88e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -22,10 +22,8 @@ import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import com.google.auto.service.AutoService;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.Duration;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -38,7 +36,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.coders.CoderRegistry.IncompatibleCoderException;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -87,17 +84,6 @@ public class CoderRegistryTest {
}
@Test
- public void testProtoCoderFallbackCoderProvider() throws Exception {
- CoderRegistry registry = CoderRegistry.createDefault();
-
- // MessageA is a Protocol Buffers test message with syntax 2
- assertEquals(registry.getDefaultCoder(MessageA.class), ProtoCoder.of(MessageA.class));
-
- // Duration is a Protocol Buffers default type with syntax 3
- assertEquals(registry.getDefaultCoder(Duration.class), ProtoCoder.of(Duration.class));
- }
-
- @Test
public void testAvroFallbackCoderProvider() throws Exception {
CoderRegistry registry = CoderRegistry.createDefault();
registry.setFallbackCoderProvider(AvroCoder.PROVIDER);
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java
deleted file mode 100644
index 8b889da..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageWithMap;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ProtoCoder}.
- */
-@RunWith(JUnit4.class)
-public class ProtoCoderTest {
-
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testFactoryMethodAgreement() throws Exception {
- assertEquals(ProtoCoder.of(new TypeDescriptor<MessageA>() {}), ProtoCoder.of(MessageA.class));
-
- assertEquals(
- ProtoCoder.of(new TypeDescriptor<MessageA>() {}),
- ProtoCoder.coderProvider().getCoder(new TypeDescriptor<MessageA>() {}));
- }
-
- @Test
- public void testProviderCannotProvideCoder() throws Exception {
- thrown.expect(CannotProvideCoderException.class);
- thrown.expectMessage("java.lang.Integer is not a subclass of com.google.protobuf.Message");
-
- ProtoCoder.coderProvider().getCoder(new TypeDescriptor<Integer>() {});
- }
-
- @Test
- public void testCoderEncodeDecodeEqual() throws Exception {
- MessageA value =
- MessageA.newBuilder()
- .setField1("hello")
- .addField2(MessageB.newBuilder().setField1(true).build())
- .addField2(MessageB.newBuilder().setField1(false).build())
- .build();
- CoderProperties.coderDecodeEncodeEqual(ProtoCoder.of(MessageA.class), value);
- }
-
- @Test
- public void testCoderEncodeDecodeEqualNestedContext() throws Exception {
- MessageA value1 =
- MessageA.newBuilder()
- .setField1("hello")
- .addField2(MessageB.newBuilder().setField1(true).build())
- .addField2(MessageB.newBuilder().setField1(false).build())
- .build();
- MessageA value2 =
- MessageA.newBuilder()
- .setField1("world")
- .addField2(MessageB.newBuilder().setField1(false).build())
- .addField2(MessageB.newBuilder().setField1(true).build())
- .build();
- CoderProperties.coderDecodeEncodeEqual(
- ListCoder.of(ProtoCoder.of(MessageA.class)), ImmutableList.of(value1, value2));
- }
-
- @Test
- public void testCoderEncodeDecodeExtensionsEqual() throws Exception {
- MessageC value =
- MessageC.newBuilder()
- .setExtension(
- Proto2CoderTestMessages.field1,
- MessageA.newBuilder()
- .setField1("hello")
- .addField2(MessageB.newBuilder().setField1(true).build())
- .build())
- .setExtension(
- Proto2CoderTestMessages.field2, MessageB.newBuilder().setField1(false).build())
- .build();
- CoderProperties.coderDecodeEncodeEqual(
- ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class), value);
- }
-
- @Test
- public void testCoderSerialization() throws Exception {
- ProtoCoder<MessageA> coder = ProtoCoder.of(MessageA.class);
- CoderProperties.coderSerializable(coder);
- }
-
- @Test
- public void testCoderExtensionsSerialization() throws Exception {
- ProtoCoder<MessageC> coder =
- ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class);
- CoderProperties.coderSerializable(coder);
- }
-
- @Test
- public void testEncodingId() throws Exception {
- Coder<MessageA> coderA = ProtoCoder.of(MessageA.class);
- CoderProperties.coderHasEncodingId(coderA, MessageA.class.getName() + "[]");
-
- ProtoCoder<MessageC> coder =
- ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class);
- CoderProperties.coderHasEncodingId(
- coder,
- String.format("%s[%s]", MessageC.class.getName(), Proto2CoderTestMessages.class.getName()));
- }
-
- @Test
- public void encodeNullThrowsCoderException() throws Exception {
- thrown.expect(CoderException.class);
- thrown.expectMessage("cannot encode a null MessageA");
-
- CoderUtils.encodeToBase64(ProtoCoder.of(MessageA.class), null);
- }
-
- @Test
- public void testDeterministicCoder() throws NonDeterministicException {
- Coder<MessageA> coder = ProtoCoder.of(MessageA.class);
- coder.verifyDeterministic();
- }
-
- @Test
- public void testNonDeterministicCoder() throws NonDeterministicException {
- thrown.expect(NonDeterministicException.class);
- thrown.expectMessage(MessageWithMap.class.getName() + " transitively includes Map field");
-
- Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class);
- coder.verifyDeterministic();
- }
-
- @Test
- public void testNonDeterministicProperty() throws CoderException {
- MessageWithMap.Builder msg1B = MessageWithMap.newBuilder();
- MessageWithMap.Builder msg2B = MessageWithMap.newBuilder();
-
- // Built in reverse order but with equal contents.
- for (int i = 0; i < 10; ++i) {
- msg1B.getMutableField1().put("key" + i, MessageA.getDefaultInstance());
- msg2B.getMutableField1().put("key" + (9 - i), MessageA.getDefaultInstance());
- }
-
- // Assert the messages are equal.
- MessageWithMap msg1 = msg1B.build();
- MessageWithMap msg2 = msg2B.build();
- assertEquals(msg2, msg1);
-
- // Assert the encoded messages are not equal.
- Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class);
- assertNotEquals(CoderUtils.encodeToBase64(coder, msg2), CoderUtils.encodeToBase64(coder, msg1));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
deleted file mode 100644
index 1408048..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static org.apache.beam.sdk.coders.protobuf.ProtobufUtil.checkProto2Syntax;
-import static org.apache.beam.sdk.coders.protobuf.ProtobufUtil.getRecursiveDescriptorsForClass;
-import static org.apache.beam.sdk.coders.protobuf.ProtobufUtil.verifyDeterministic;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageWithMap;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.ReferencesMessageWithMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.protobuf.Any;
-import com.google.protobuf.Descriptors.GenericDescriptor;
-import com.google.protobuf.Duration;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ProtobufUtil}.
- */
-@RunWith(JUnit4.class)
-public class ProtobufUtilTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- private static final Set<String> MESSAGE_A_ONLY =
- ImmutableSet.of("proto2_coder_test_messages.MessageA");
-
- private static final Set<String> MESSAGE_B_ONLY =
- ImmutableSet.of("proto2_coder_test_messages.MessageB");
-
- private static final Set<String> MESSAGE_C_ONLY =
- ImmutableSet.of("proto2_coder_test_messages.MessageC");
-
- // map fields are actually represented as a nested Message in generated Java code.
- private static final Set<String> WITH_MAP_ONLY =
- ImmutableSet.of(
- "proto2_coder_test_messages.MessageWithMap",
- "proto2_coder_test_messages.MessageWithMap.Field1Entry");
-
- private static final Set<String> REFERS_MAP_ONLY =
- ImmutableSet.of("proto2_coder_test_messages.ReferencesMessageWithMap");
-
- // A references A and B.
- private static final Set<String> MESSAGE_A_ALL = Sets.union(MESSAGE_A_ONLY, MESSAGE_B_ONLY);
-
- // C, only with registered extensions, references A.
- private static final Set<String> MESSAGE_C_EXT = Sets.union(MESSAGE_C_ONLY, MESSAGE_A_ALL);
-
- // MessageWithMap references A.
- private static final Set<String> WITH_MAP_ALL = Sets.union(WITH_MAP_ONLY, MESSAGE_A_ALL);
-
- // ReferencesMessageWithMap references MessageWithMap.
- private static final Set<String> REFERS_MAP_ALL = Sets.union(REFERS_MAP_ONLY, WITH_MAP_ALL);
-
- @Test
- public void testRecursiveDescriptorsMessageA() {
- assertThat(getRecursiveDescriptorFullNames(MessageA.class), equalTo(MESSAGE_A_ALL));
- }
-
- @Test
- public void testRecursiveDescriptorsMessageB() {
- assertThat(getRecursiveDescriptorFullNames(MessageB.class), equalTo(MESSAGE_B_ONLY));
- }
-
- @Test
- public void testRecursiveDescriptorsMessageC() {
- assertThat(getRecursiveDescriptorFullNames(MessageC.class), equalTo(MESSAGE_C_ONLY));
- }
-
- @Test
- public void testRecursiveDescriptorsMessageCWithExtensions() {
- // With extensions, Message C has a reference to Message A and Message B.
- ExtensionRegistry registry = ExtensionRegistry.newInstance();
- Proto2CoderTestMessages.registerAllExtensions(registry);
- assertThat(getRecursiveDescriptorFullNames(MessageC.class, registry), equalTo(MESSAGE_C_EXT));
- }
-
- @Test
- public void testRecursiveDescriptorsMessageWithMap() {
- assertThat(getRecursiveDescriptorFullNames(MessageWithMap.class), equalTo(WITH_MAP_ALL));
- }
-
- @Test
- public void testRecursiveDescriptorsReferencesMessageWithMap() {
- assertThat(
- getRecursiveDescriptorFullNames(ReferencesMessageWithMap.class), equalTo(REFERS_MAP_ALL));
- }
-
- @Test
- public void testVerifyProto2() {
- checkProto2Syntax(MessageA.class, ExtensionRegistry.getEmptyRegistry());
- checkProto2Syntax(MessageB.class, ExtensionRegistry.getEmptyRegistry());
- checkProto2Syntax(MessageC.class, ExtensionRegistry.getEmptyRegistry());
- checkProto2Syntax(MessageWithMap.class, ExtensionRegistry.getEmptyRegistry());
- checkProto2Syntax(ReferencesMessageWithMap.class, ExtensionRegistry.getEmptyRegistry());
- }
-
- @Test
- public void testAnyIsNotProto2() {
- // Any is a core Protocol Buffers type that uses proto3 syntax.
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(Any.class.getCanonicalName());
- thrown.expectMessage("in file " + Any.getDescriptor().getFile().getName());
-
- checkProto2Syntax(Any.class, ExtensionRegistry.getEmptyRegistry());
- }
-
- @Test
- public void testDurationIsNotProto2() {
- // Duration is a core Protocol Buffers type that uses proto3 syntax.
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(Duration.class.getCanonicalName());
- thrown.expectMessage("in file " + Duration.getDescriptor().getFile().getName());
-
- checkProto2Syntax(Duration.class, ExtensionRegistry.getEmptyRegistry());
- }
-
- @Test
- public void testDurationIsDeterministic() throws NonDeterministicException {
- // Duration can be encoded deterministically.
- verifyDeterministic(ProtoCoder.of(Duration.class));
- }
-
- @Test
- public void testMessageWithMapIsNotDeterministic() throws NonDeterministicException {
- String mapFieldName = MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName();
- thrown.expect(NonDeterministicException.class);
- thrown.expectMessage(MessageWithMap.class.getName());
- thrown.expectMessage("transitively includes Map field " + mapFieldName);
- thrown.expectMessage("file " + MessageWithMap.getDescriptor().getFile().getName());
-
- verifyDeterministic(ProtoCoder.of(MessageWithMap.class));
- }
-
- @Test
- public void testMessageWithTransitiveMapIsNotDeterministic() throws NonDeterministicException {
- String mapFieldName = MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName();
- thrown.expect(NonDeterministicException.class);
- thrown.expectMessage(ReferencesMessageWithMap.class.getName());
- thrown.expectMessage("transitively includes Map field " + mapFieldName);
- thrown.expectMessage("file " + MessageWithMap.getDescriptor().getFile().getName());
-
- verifyDeterministic(ProtoCoder.of(ReferencesMessageWithMap.class));
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- /** Helper used to test the recursive class traversal and print good error messages. */
- private static Set<String> getRecursiveDescriptorFullNames(Class<? extends Message> clazz) {
- return getRecursiveDescriptorFullNames(clazz, ExtensionRegistry.getEmptyRegistry());
- }
-
- /** Helper used to test the recursive class traversal and print good error messages. */
- private static Set<String> getRecursiveDescriptorFullNames(
- Class<? extends Message> clazz, ExtensionRegistry registry) {
- Set<String> result = new HashSet<>();
- for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) {
- result.add(d.getFullName());
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index dde8be5..8a48eca 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -35,6 +35,7 @@
<module>gcp-core</module>
<module>jackson</module>
<module>join-library</module>
+ <module>protobuf</module>
<module>sorter</module>
</modules>
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/pom.xml b/sdks/java/extensions/protobuf/pom.xml
new file mode 100644
index 0000000..9a54254
--- /dev/null
+++ b/sdks/java/extensions/protobuf/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-extensions-protobuf</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: Extensions :: Protobuf</name>
+ <description>Add support to Apache Beam for Google Protobuf.</description>
+
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Coverage analysis for unit tests. -->
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <!-- build dependencies -->
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
new file mode 100644
index 0000000..99a0838
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.Structs;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link Coder} using Google Protocol Buffers binary format. {@link ProtoCoder} supports both
+ * Protocol Buffers syntax versions 2 and 3.
+ *
+ * <p>To learn more about Protocol Buffers, visit:
+ * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a>
+ *
+ * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as the default
+ * {@link Coder} for any {@link Message} object. Custom message extensions are also supported, but
+ * these extensions must be registered for a particular {@link ProtoCoder} instance and that
+ * instance must be registered on the {@link PCollection} that needs the extensions:
+ *
+ * <pre>{@code
+ * import MyProtoFile;
+ * import MyProtoFile.MyMessage;
+ *
+ * Coder<MyMessage> coder = ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class);
+ * PCollection<MyMessage> records = input.apply(...).setCoder(coder);
+ * }</pre>
+ *
+ * <h3>Versioning</h3>
+ *
+ * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol Buffers syntax. However,
+ * the Java runtime version of the <code>google.com.protobuf</code> library must match exactly the
+ * version of <code>protoc</code> that was used to produce the JAR files containing the compiled
+ * <code>.proto</code> messages.
+ *
+ * <p>For more information, see the
+ * <a href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types">Protocol Buffers documentation</a>.
+ *
+ * <h3>{@link ProtoCoder} and Determinism</h3>
+ *
+ * <p>In general, Protocol Buffers messages can be encoded deterministically within a single
+ * pipeline as long as:
+ *
+ * <ul>
+ * <li>The encoded messages (and any transitively linked messages) do not use <code>map</code>
+ * fields.</li>
+ * <li>Every Java VM that encodes or decodes the messages use the same runtime version of the
+ * Protocol Buffers library and the same compiled <code>.proto</code> file JAR.</li>
+ * </ul>
+ *
+ * <h3>{@link ProtoCoder} and Encoding Stability</h3>
+ *
+ * <p>When changing Protocol Buffers messages, follow the rules in the Protocol Buffers language
+ * guides for
+ * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">{@code proto2}</a>
+ * and
+ * <a href="https://developers.google.com/protocol-buffers/docs/proto3#updating">{@code proto3}</a>
+ * syntaxes, depending on your message type. Following these guidelines will ensure that the
+ * old encoded data can be read by new versions of the code.
+ *
+ * <p>Generally, any change to the message type, registered extensions, runtime library, or
+ * compiled proto JARs may change the encoding. Thus even if both the original and updated messages
+ * can be encoded deterministically within a single job, these deterministic encodings may not be
+ * the same across jobs.
+ *
+ * @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}.
+ */
+public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
+
+ /**
+ * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
+ * {@link ExtensionRegistry}.
+ */
+ public static CoderProvider coderProvider() {
+ return PROVIDER;
+ }
+
+ /**
+ * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
+ */
+ public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) {
+ return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of());
+ }
+
+ /**
+ * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} indicated by the given
+ * {@link TypeDescriptor}.
+ */
+ public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> protoMessageType) {
+ @SuppressWarnings("unchecked")
+ Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
+ return of(protoMessageClass);
+ }
+
+ /**
+ * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes
+ * registered.
+ *
+ * <p>Each of the extension host classes must be an class automatically generated by the
+ * Protocol Buffers compiler, {@code protoc}, that contains messages.
+ *
+ * <p>Does not modify this object.
+ */
+ public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) {
+ for (Class<?> extensionHost : moreExtensionHosts) {
+ // Attempt to access the required method, to make sure it's present.
+ try {
+ Method registerAllExtensions =
+ extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
+ checkArgument(
+ Modifier.isStatic(registerAllExtensions.getModifiers()),
+ "Method registerAllExtensions() must be static");
+ } catch (NoSuchMethodException | SecurityException e) {
+ throw new IllegalArgumentException(
+ String.format("Unable to register extensions for %s", extensionHost.getCanonicalName()),
+ e);
+ }
+ }
+
+ return new ProtoCoder<>(
+ protoMessageClass,
+ new ImmutableSet.Builder<Class<?>>()
+ .addAll(extensionHostClasses)
+ .addAll(moreExtensionHosts)
+ .build());
+ }
+
+ /**
+ * See {@link #withExtensionsFrom(Iterable)}.
+ *
+ * <p>Does not modify this object.
+ */
+ public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) {
+ return withExtensionsFrom(Arrays.asList(moreExtensionHosts));
+ }
+
+ @Override
+ public void encode(T value, OutputStream outStream, Context context) throws IOException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
+ }
+ if (context.isWholeStream) {
+ value.writeTo(outStream);
+ } else {
+ value.writeDelimitedTo(outStream);
+ }
+ }
+
+ @Override
+ public T decode(InputStream inStream, Context context) throws IOException {
+ if (context.isWholeStream) {
+ return getParser().parseFrom(inStream, getExtensionRegistry());
+ } else {
+ return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof ProtoCoder)) {
+ return false;
+ }
+ ProtoCoder<?> otherCoder = (ProtoCoder<?>) other;
+ return protoMessageClass.equals(otherCoder.protoMessageClass)
+ && Sets.newHashSet(extensionHostClasses)
+ .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(protoMessageClass, extensionHostClasses);
+ }
+
+ /**
+ * The encoding identifier is designed to support evolution as per the design of Protocol
+ * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol
+ * Buffers documentation at
+ * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating
+ * A Message Type</a>.
+ *
+ * <p>In particular, the encoding identifier is guaranteed to be the same for {@link ProtoCoder}
+ * instances of the same principal message class, with the same registered extension host classes,
+ * and otherwise distinct. Note that the encoding ID does not encode any version of the message
+ * or extensions, nor does it include the message schema.
+ *
+ * <p>When modifying a message class, here are the broadest guidelines; see the above link
+ * for greater detail.
+ *
+ * <ul>
+ * <li>Do not change the numeric tags for any fields.
+ * <li>Never remove a <code>required</code> field.
+ * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults.
+ * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure
+ * the new and old types are interchangeable.
+ * </ul>
+ *
+ * <p>Code consuming this message class should be prepared to support <i>all</i> versions of
+ * the class until it is certain that no remaining serialized instances exist.
+ *
+ * <p>If backwards incompatible changes must be made, the best recourse is to change the name
+ * of your Protocol Buffers message class.
+ */
+ @Override
+ public String getEncodingId() {
+ return protoMessageClass.getName() + getSortedExtensionClasses().toString();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ ProtobufUtil.verifyDeterministic(this);
+ }
+
+ /**
+ * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} supports.
+ */
+ public Class<T> getMessageType() {
+ return protoMessageClass;
+ }
+
+ /**
+ * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages
+ * to {@code T} registered with this {@link ProtoCoder}.
+ */
+ public ExtensionRegistry getExtensionRegistry() {
+ if (memoizedExtensionRegistry == null) {
+ ExtensionRegistry registry = ExtensionRegistry.newInstance();
+ for (Class<?> extensionHost : extensionHostClasses) {
+ try {
+ extensionHost
+ .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class)
+ .invoke(null, registry);
+ } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ memoizedExtensionRegistry = registry.getUnmodifiable();
+ }
+ return memoizedExtensionRegistry;
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////
+ // Private implementation details below.
+
+ /** The {@link Message} type to be coded. */
+ private final Class<T> protoMessageClass;
+
+ /**
+ * All extension host classes included in this {@link ProtoCoder}. The extensions from these
+ * classes will be included in the {@link ExtensionRegistry} used during encoding and decoding.
+ */
+ private final Set<Class<?>> extensionHostClasses;
+
+ // Constants used to serialize and deserialize
+ private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
+ private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
+
+ // Transient fields that are lazy initialized and then memoized.
+ private transient ExtensionRegistry memoizedExtensionRegistry;
+ private transient Parser<T> memoizedParser;
+
+ /** Private constructor. */
+ private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> extensionHostClasses) {
+ this.protoMessageClass = protoMessageClass;
+ this.extensionHostClasses = extensionHostClasses;
+ }
+
+ /**
+ * @deprecated For JSON deserialization only.
+ */
+ @JsonCreator
+ @Deprecated
+ public static <T extends Message> ProtoCoder<T> of(
+ @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
+ @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) {
+
+ try {
+ @SuppressWarnings("unchecked")
+ Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName);
+ List<Class<?>> extensionHostClasses = Lists.newArrayList();
+ if (extensionHostClassNames != null) {
+ for (String extensionHostClassName : extensionHostClassNames) {
+ extensionHostClasses.add(Class.forName(extensionHostClassName));
+ }
+ }
+ return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
+ Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
+ List<CloudObject> extensionHostClassNames = Lists.newArrayList();
+ for (String className : getSortedExtensionClasses()) {
+ extensionHostClassNames.add(CloudObject.forString(className));
+ }
+ Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
+ return result;
+ }
+
+ /** Get the memoized {@link Parser}, possibly initializing it lazily. */
+ private Parser<T> getParser() {
+ if (memoizedParser == null) {
+ try {
+ @SuppressWarnings("unchecked")
+ T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null);
+ @SuppressWarnings("unchecked")
+ Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType();
+ memoizedParser = tParser;
+ } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ return memoizedParser;
+ }
+
+ static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {};
+
+ /**
+ * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
+ * {@link #coderProvider()}.
+ */
+ private static final CoderProvider PROVIDER =
+ new CoderProvider() {
+ @Override
+ public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
+ if (!type.isSubtypeOf(CHECK)) {
+ throw new CannotProvideCoderException(
+ String.format(
+ "Cannot provide %s because %s is not a subclass of %s",
+ ProtoCoder.class.getSimpleName(),
+ type,
+ Message.class.getName()));
+ }
+
+ @SuppressWarnings("unchecked")
+ TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type;
+ try {
+ @SuppressWarnings("unchecked")
+ Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
+ return coder;
+ } catch (IllegalArgumentException e) {
+ throw new CannotProvideCoderException(e);
+ }
+ }
+ };
+
+ private SortedSet<String> getSortedExtensionClasses() {
+ SortedSet<String> ret = new TreeSet<>();
+ for (Class<?> clazz : extensionHostClasses) {
+ ret.add(clazz.getName());
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java
new file mode 100644
index 0000000..68a775a
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
+import com.google.protobuf.Descriptors.GenericDescriptor;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.ExtensionRegistry.ExtensionInfo;
+import com.google.protobuf.Message;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+
+/**
+ * Utility functions for reflecting and analyzing Protocol Buffers classes.
+ *
+ * <p>Used by {@link ProtoCoder}, but in a separate file for testing and isolation.
+ */
+class ProtobufUtil {
+ /**
+ * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message}.
+ *
+ * @throws IllegalArgumentException if there is an error in Java reflection.
+ */
+ static Descriptor getDescriptorForClass(Class<? extends Message> clazz) {
+ try {
+ return (Descriptor) clazz.getMethod("getDescriptor").invoke(null);
+ } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message} as well as
+ * every class it can include transitively.
+ *
+ * @throws IllegalArgumentException if there is an error in Java reflection.
+ */
+ static Set<Descriptor> getRecursiveDescriptorsForClass(
+ Class<? extends Message> clazz, ExtensionRegistry registry) {
+ Descriptor root = getDescriptorForClass(clazz);
+ Set<Descriptor> descriptors = new HashSet<>();
+ recursivelyAddDescriptors(root, descriptors, registry);
+ return descriptors;
+ }
+
+ /**
+ * Recursively walks the given {@link Message} class and verifies that every field or message
+ * linked in uses the Protocol Buffers proto2 syntax.
+ */
+ static void checkProto2Syntax(Class<? extends Message> clazz, ExtensionRegistry registry) {
+ for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) {
+ Syntax s = d.getFile().getSyntax();
+ checkArgument(
+ s == Syntax.PROTO2,
+ "Message %s or one of its dependencies does not use proto2 syntax: %s in file %s",
+ clazz.getName(),
+ d.getFullName(),
+ d.getFile().getName());
+ }
+ }
+
+ /**
+ * Recursively checks whether the specified class uses any Protocol Buffers fields that cannot
+ * be deterministically encoded.
+ *
+ * @throws NonDeterministicException if the object cannot be encoded deterministically.
+ */
+ static void verifyDeterministic(ProtoCoder<?> coder) throws NonDeterministicException {
+ Class<? extends Message> message = coder.getMessageType();
+ ExtensionRegistry registry = coder.getExtensionRegistry();
+ Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, registry);
+ for (Descriptor d : descriptors) {
+ for (FieldDescriptor fd : d.getFields()) {
+ // If there is a transitively reachable Protocol Buffers map field, then this object cannot
+ // be encoded deterministically.
+ if (fd.isMapField()) {
+ String reason =
+ String.format(
+ "Protocol Buffers message %s transitively includes Map field %s (from file %s)."
+ + " Maps cannot be deterministically encoded.",
+ message.getName(),
+ fd.getFullName(),
+ fd.getFile().getFullName());
+ throw new NonDeterministicException(coder, reason);
+ }
+ }
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////
+ // Disable construction of utility class
+ private ProtobufUtil() {}
+
+ private static void recursivelyAddDescriptors(
+ Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry registry) {
+ if (descriptors.contains(message)) {
+ return;
+ }
+ descriptors.add(message);
+
+ for (FieldDescriptor f : message.getFields()) {
+ recursivelyAddDescriptors(f, descriptors, registry);
+ }
+ for (FieldDescriptor f : message.getExtensions()) {
+ recursivelyAddDescriptors(f, descriptors, registry);
+ }
+ for (ExtensionInfo info :
+ registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) {
+ recursivelyAddDescriptors(info.descriptor, descriptors, registry);
+ }
+ for (ExtensionInfo info :
+ registry.getAllMutableExtensionsByExtendedType(message.getFullName())) {
+ recursivelyAddDescriptors(info.descriptor, descriptors, registry);
+ }
+ }
+
+ private static void recursivelyAddDescriptors(
+ FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry registry) {
+ switch (field.getType()) {
+ case BOOL:
+ case BYTES:
+ case DOUBLE:
+ case ENUM:
+ case FIXED32:
+ case FIXED64:
+ case FLOAT:
+ case INT32:
+ case INT64:
+ case SFIXED32:
+ case SFIXED64:
+ case SINT32:
+ case SINT64:
+ case STRING:
+ case UINT32:
+ case UINT64:
+ // Primitive types do not transitively access anything else.
+ break;
+
+ case GROUP:
+ case MESSAGE:
+ // Recursively adds all the fields from this nested Message.
+ recursivelyAddDescriptors(field.getMessageType(), descriptors, registry);
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ "Unexpected Protocol Buffers field type: " + field.getType());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java
new file mode 100644
index 0000000..b69bc8b
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines a {@link org.apache.beam.sdk.coders.Coder}
+ * for Protocol Buffers messages, {@code ProtoCoder}.
+ *
+ * @see org.apache.beam.sdk.extensions.protobuf.ProtoCoder
+ */
+package org.apache.beam.sdk.extensions.protobuf;