You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/17 16:47:30 UTC
[1/2] incubator-beam git commit: Proto2Coder: remove deprecated coder
Repository: incubator-beam
Updated Branches:
refs/heads/master 6901dc09a -> 7646384e2
Proto2Coder: remove deprecated coder
Fix the single usage to use ProtoCoder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af775569
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af775569
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af775569
Branch: refs/heads/master
Commit: af775569eae60ee1843ce3a1252fca4f5e813654
Parents: 6901dc0
Author: Daniel Halperin <da...@halper.in>
Authored: Sat Apr 16 17:45:14 2016 -0700
Committer: Daniel Halperin <da...@halper.in>
Committed: Sat Apr 16 17:45:14 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/Proto2Coder.java | 364 -------------------
.../apache/beam/sdk/io/bigtable/BigtableIO.java | 4 +-
.../apache/beam/sdk/coders/Proto2CoderTest.java | 148 --------
3 files changed, 2 insertions(+), 514 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af775569/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Proto2Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Proto2Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Proto2Coder.java
deleted file mode 100644
index 20bc821..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Proto2Coder.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Structs;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Coder} using Google Protocol Buffers 2 binary format.
- *
- * <p>To learn more about Protocol Buffers, visit:
- * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a>
- *
- * <p>To use, specify the {@link Coder} type on a PCollection containing Protocol Buffers messages.
- *
- * <pre>
- * {@code
- * PCollection<MyProto.Message> records =
- * input.apply(...)
- * .setCoder(Proto2Coder.of(MyProto.Message.class));
- * }
- * </pre>
- *
- * <p>Custom message extensions are also supported, but the coder must be made
- * aware of them explicitly:
- *
- * <pre>
- * {@code
- * PCollection<MyProto.Message> records =
- * input.apply(...)
- * .setCoder(Proto2Coder.of(MyProto.Message.class)
- * .addExtensionsFrom(MyProto.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder, must extend {@code Message}
- * @deprecated Use {@link ProtoCoder}.
- */
-@Deprecated
-public class Proto2Coder<T extends Message> extends AtomicCoder<T> {
-
- /** The class of Protobuf message to be encoded. */
- private final Class<T> protoMessageClass;
-
- /**
- * All extension host classes included in this Proto2Coder. The extensions from
- * these classes will be included in the {@link ExtensionRegistry} used during
- * encoding and decoding.
- */
- private final List<Class<?>> extensionHostClasses;
-
- private Proto2Coder(Class<T> protoMessageClass, List<Class<?>> extensionHostClasses) {
- this.protoMessageClass = protoMessageClass;
- this.extensionHostClasses = extensionHostClasses;
- }
-
- private static final CoderProvider PROVIDER =
- new CoderProvider() {
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- if (type.isSubtypeOf(new TypeDescriptor<Message>() {})) {
- @SuppressWarnings("unchecked")
- TypeDescriptor<? extends Message> messageType =
- (TypeDescriptor<? extends Message>) type;
- @SuppressWarnings("unchecked")
- Coder<T> coder = (Coder<T>) Proto2Coder.of(messageType);
- return coder;
- } else {
- throw new CannotProvideCoderException(
- String.format(
- "Cannot provide Proto2Coder because %s "
- + "is not a subclass of protocol buffer Messsage",
- type));
- }
- }
- };
-
- public static CoderProvider coderProvider() {
- return PROVIDER;
- }
-
- /**
- * Returns a {@code Proto2Coder} for the given Protobuf message class.
- */
- public static <T extends Message> Proto2Coder<T> of(Class<T> protoMessageClass) {
- return new Proto2Coder<T>(protoMessageClass, Collections.<Class<?>>emptyList());
- }
-
- /**
- * Returns a {@code Proto2Coder} for the given Protobuf message class.
- */
- public static <T extends Message> Proto2Coder<T> of(TypeDescriptor<T> protoMessageType) {
- @SuppressWarnings("unchecked")
- Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
- return of(protoMessageClass);
- }
-
- /**
- * Produces a {@code Proto2Coder} like this one, but with the extensions from
- * the given classes registered.
- *
- * @param moreExtensionHosts an iterable of classes that define a static
- * method {@code registerAllExtensions(ExtensionRegistry)}
- */
- public Proto2Coder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) {
- for (Class<?> extensionHost : moreExtensionHosts) {
- // Attempt to access the required method, to make sure it's present.
- try {
- Method registerAllExtensions =
- extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
- checkArgument(
- Modifier.isStatic(registerAllExtensions.getModifiers()),
- "Method registerAllExtensions() must be static for use with Proto2Coder");
- } catch (NoSuchMethodException | SecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- return new Proto2Coder<T>(
- protoMessageClass,
- new ImmutableList.Builder<Class<?>>()
- .addAll(extensionHostClasses)
- .addAll(moreExtensionHosts)
- .build());
- }
-
- /**
- * See {@link #withExtensionsFrom(Iterable)}.
- */
- public Proto2Coder<T> withExtensionsFrom(Class<?>... extensionHosts) {
- return withExtensionsFrom(ImmutableList.copyOf(extensionHosts));
- }
-
- /**
- * Adds custom Protobuf extensions to the coder. Returns {@code this}
- * for method chaining.
- *
- * @param extensionHosts must be a class that defines a static
- * method name {@code registerAllExtensions}
- * @deprecated use {@link #withExtensionsFrom}
- */
- @Deprecated
- public Proto2Coder<T> addExtensionsFrom(Class<?>... extensionHosts) {
- return addExtensionsFrom(ImmutableList.copyOf(extensionHosts));
- }
-
- /**
- * Adds custom Protobuf extensions to the coder. Returns {@code this}
- * for method chaining.
- *
- * @param extensionHosts must be a class that defines a static
- * method name {@code registerAllExtensions}
- * @deprecated use {@link #withExtensionsFrom}
- */
- @Deprecated
- public Proto2Coder<T> addExtensionsFrom(Iterable<Class<?>> extensionHosts) {
- for (Class<?> extensionHost : extensionHosts) {
- try {
- // Attempt to access the declared method, to make sure it's present.
- extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
- } catch (NoSuchMethodException e) {
- throw new IllegalArgumentException(e);
- }
- extensionHostClasses.add(extensionHost);
- }
- // The memoized extension registry needs to be recomputed because we have mutated this object.
- synchronized (this) {
- memoizedExtensionRegistry = null;
- getExtensionRegistry();
- }
- return this;
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context) throws IOException {
- if (value == null) {
- throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
- }
- if (context.isWholeStream) {
- value.writeTo(outStream);
- } else {
- value.writeDelimitedTo(outStream);
- }
- }
-
- @Override
- public T decode(InputStream inStream, Context context) throws IOException {
- if (context.isWholeStream) {
- return getParser().parseFrom(inStream, getExtensionRegistry());
- } else {
- return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
- }
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (!(other instanceof Proto2Coder)) {
- return false;
- }
- Proto2Coder<?> otherCoder = (Proto2Coder<?>) other;
- return protoMessageClass.equals(otherCoder.protoMessageClass)
- && Sets.newHashSet(extensionHostClasses)
- .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(protoMessageClass, extensionHostClasses);
- }
-
- /**
- * The encoding identifier is designed to support evolution as per the design of Protocol
- * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol
- * Buffers documentation at
- * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating
- * A Message Type</a>.
- *
- * <p>In particular, the encoding identifier is guaranteed to be the same for {@code Proto2Coder}
- * instances of the same principal message class, and otherwise distinct. Loaded extensions do not
- * affect the id, nor does it encode the full schema.
- *
- * <p>When modifying a message class, here are the broadest guidelines; see the above link
- * for greater detail.
- *
- * <ul>
- * <li>Do not change the numeric tags for any fields.
- * <li>Never remove a <code>required</code> field.
- * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults.
- * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure
- * the new and old types are interchangeable.
- * </ul>
- *
- * <p>Code consuming this message class should be prepared to support <i>all</i> versions of
- * the class until it is certain that no remaining serialized instances exist.
- *
- * <p>If backwards incompatible changes must be made, the best recourse is to change the name
- * of your Protocol Buffers message class.
- */
- @Override
- public String getEncodingId() {
- return protoMessageClass.getName();
- }
-
- private transient Parser<T> memoizedParser;
-
- private Parser<T> getParser() {
- if (memoizedParser == null) {
- try {
- @SuppressWarnings("unchecked")
- T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null);
- @SuppressWarnings("unchecked")
- Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType();
- memoizedParser = tParser;
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalArgumentException(e);
- }
- }
- return memoizedParser;
- }
-
- private transient ExtensionRegistry memoizedExtensionRegistry;
-
- private synchronized ExtensionRegistry getExtensionRegistry() {
- if (memoizedExtensionRegistry == null) {
- ExtensionRegistry registry = ExtensionRegistry.newInstance();
- for (Class<?> extensionHost : extensionHostClasses) {
- try {
- extensionHost
- .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class)
- .invoke(null, registry);
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalStateException(e);
- }
- }
- memoizedExtensionRegistry = registry.getUnmodifiable();
- }
- return memoizedExtensionRegistry;
- }
-
- ////////////////////////////////////////////////////////////////////////////////////
- // JSON Serialization details below
-
- private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
- private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
-
- /**
- * Constructor for JSON deserialization only.
- */
- @JsonCreator
- public static <T extends Message> Proto2Coder<T> of(
- @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
- @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) {
-
- try {
- @SuppressWarnings("unchecked")
- Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName);
- List<Class<?>> extensionHostClasses = Lists.newArrayList();
- if (extensionHostClassNames != null) {
- for (String extensionHostClassName : extensionHostClassNames) {
- extensionHostClasses.add(Class.forName(extensionHostClassName));
- }
- }
- return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
- Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
- List<CloudObject> extensionHostClassNames = Lists.newArrayList();
- for (Class<?> clazz : extensionHostClasses) {
- extensionHostClassNames.add(CloudObject.forString(clazz.getName()));
- }
- Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af775569/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
index 01a59e6..b2d9cb3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
@@ -23,8 +23,8 @@ import static com.google.common.base.Preconditions.checkState;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Proto2Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.Sink.WriteOperation;
@@ -660,7 +660,7 @@ public class BigtableIO {
@Override
public Coder<Row> getDefaultOutputCoder() {
- return Proto2Coder.of(Row.class);
+ return ProtoCoder.of(Row.class);
}
/** Helper that splits the specified range in this source into bundles. */
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af775569/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/Proto2CoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/Proto2CoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/Proto2CoderTest.java
deleted file mode 100644
index 65a392c..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/Proto2CoderTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC;
-import com.google.common.collect.ImmutableList;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for Proto2Coder.
- */
-@SuppressWarnings("deprecation") // test of a deprecated coder.
-@RunWith(JUnit4.class)
-public class Proto2CoderTest {
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testFactoryMethodAgreement() throws Exception {
- assertEquals(
- Proto2Coder.of(new TypeDescriptor<MessageA>() {}),
- Proto2Coder.of(MessageA.class));
-
- assertEquals(
- Proto2Coder.of(new TypeDescriptor<MessageA>() {}),
- Proto2Coder.coderProvider().getCoder(new TypeDescriptor<MessageA>() {}));
- }
-
- @Test
- public void testProviderCannotProvideCoder() throws Exception {
- thrown.expect(CannotProvideCoderException.class);
- Proto2Coder.coderProvider().getCoder(new TypeDescriptor<Integer>() {});
- }
-
- @Test
- public void testCoderEncodeDecodeEqual() throws Exception {
- MessageA value = MessageA.newBuilder()
- .setField1("hello")
- .addField2(MessageB.newBuilder()
- .setField1(true).build())
- .addField2(MessageB.newBuilder()
- .setField1(false).build())
- .build();
- CoderProperties.coderDecodeEncodeEqual(Proto2Coder.of(MessageA.class), value);
- }
-
- @Test
- public void testCoderEncodeDecodeEqualNestedContext() throws Exception {
- MessageA value1 = MessageA.newBuilder()
- .setField1("hello")
- .addField2(MessageB.newBuilder()
- .setField1(true).build())
- .addField2(MessageB.newBuilder()
- .setField1(false).build())
- .build();
- MessageA value2 = MessageA.newBuilder()
- .setField1("world")
- .addField2(MessageB.newBuilder()
- .setField1(false).build())
- .addField2(MessageB.newBuilder()
- .setField1(true).build())
- .build();
- CoderProperties.coderDecodeEncodeEqual(
- ListCoder.of(Proto2Coder.of(MessageA.class)),
- ImmutableList.of(value1, value2));
- }
-
- @Test
- public void testCoderEncodeDecodeExtensionsEqual() throws Exception {
- MessageC value = MessageC.newBuilder()
- .setExtension(Proto2CoderTestMessages.field1,
- MessageA.newBuilder()
- .setField1("hello")
- .addField2(MessageB.newBuilder()
- .setField1(true)
- .build())
- .build())
- .setExtension(Proto2CoderTestMessages.field2,
- MessageB.newBuilder()
- .setField1(false)
- .build())
- .build();
- CoderProperties.coderDecodeEncodeEqual(
- Proto2Coder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class),
- value);
- }
-
- @Test
- public void testCoderSerialization() throws Exception {
- Proto2Coder<MessageA> coder = Proto2Coder.of(MessageA.class);
- CoderProperties.coderSerializable(coder);
- }
-
- @Test
- public void testCoderExtensionsSerialization() throws Exception {
- Proto2Coder<MessageC> coder = Proto2Coder.of(MessageC.class)
- .withExtensionsFrom(Proto2CoderTestMessages.class);
- CoderProperties.coderSerializable(coder);
- }
-
- @Test
- public void testEncodingId() throws Exception {
- Coder<MessageA> coderA = Proto2Coder.of(MessageA.class);
- CoderProperties.coderHasEncodingId(coderA, MessageA.class.getName());
-
- Proto2Coder<MessageC> coder = Proto2Coder.of(MessageC.class)
- .withExtensionsFrom(Proto2CoderTestMessages.class);
- CoderProperties.coderHasEncodingId(coder, MessageC.class.getName());
- }
-
- @Test
- public void encodeNullThrowsCoderException() throws Exception {
- thrown.expect(CoderException.class);
- thrown.expectMessage("cannot encode a null MessageA");
-
- CoderUtils.encodeToBase64(Proto2Coder.of(MessageA.class), null);
- }
-}
[2/2] incubator-beam git commit: Closes #199
Posted by dh...@apache.org.
Closes #199
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7646384e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7646384e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7646384e
Branch: refs/heads/master
Commit: 7646384e2c2c45a384dfde6bb1ba20014ff4f733
Parents: 6901dc0 af77556
Author: Dan Halperin <dh...@google.com>
Authored: Sun Apr 17 07:46:09 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Sun Apr 17 07:46:09 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/Proto2Coder.java | 364 -------------------
.../apache/beam/sdk/io/bigtable/BigtableIO.java | 4 +-
.../apache/beam/sdk/coders/Proto2CoderTest.java | 148 --------
3 files changed, 2 insertions(+), 514 deletions(-)
----------------------------------------------------------------------