You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:07 UTC
[20/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
deleted file mode 100644
index 1ce5fe5..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.Structs;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Coder} using Google Protocol Buffers 2 binary format.
- *
- * <p>To learn more about Protocol Buffers, visit:
- * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a>
- *
- * <p>To use, specify the {@link Coder} type on a PCollection containing Protocol Buffers messages.
- *
- * <pre>
- * {@code
- * PCollection<MyProto.Message> records =
- * input.apply(...)
- * .setCoder(Proto2Coder.of(MyProto.Message.class));
- * }
- * </pre>
- *
- * <p>Custom message extensions are also supported, but the coder must be made
- * aware of them explicitly:
- *
- * <pre>
- * {@code
- * PCollection<MyProto.Message> records =
- * input.apply(...)
- * .setCoder(Proto2Coder.of(MyProto.Message.class)
- * .addExtensionsFrom(MyProto.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder, must extend {@code Message}
- * @deprecated Use {@link ProtoCoder}.
- */
-@Deprecated
-public class Proto2Coder<T extends Message> extends AtomicCoder<T> {
-
- /** The class of Protobuf message to be encoded. */
- private final Class<T> protoMessageClass;
-
- /**
- * All extension host classes included in this Proto2Coder. The extensions from
- * these classes will be included in the {@link ExtensionRegistry} used during
- * encoding and decoding.
- */
- private final List<Class<?>> extensionHostClasses;
-
- private Proto2Coder(Class<T> protoMessageClass, List<Class<?>> extensionHostClasses) {
- this.protoMessageClass = protoMessageClass;
- this.extensionHostClasses = extensionHostClasses;
- }
-
- private static final CoderProvider PROVIDER =
- new CoderProvider() {
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- if (type.isSubtypeOf(new TypeDescriptor<Message>() {})) {
- @SuppressWarnings("unchecked")
- TypeDescriptor<? extends Message> messageType =
- (TypeDescriptor<? extends Message>) type;
- @SuppressWarnings("unchecked")
- Coder<T> coder = (Coder<T>) Proto2Coder.of(messageType);
- return coder;
- } else {
- throw new CannotProvideCoderException(
- String.format(
- "Cannot provide Proto2Coder because %s "
- + "is not a subclass of protocol buffer Messsage",
- type));
- }
- }
- };
-
- public static CoderProvider coderProvider() {
- return PROVIDER;
- }
-
- /**
- * Returns a {@code Proto2Coder} for the given Protobuf message class.
- */
- public static <T extends Message> Proto2Coder<T> of(Class<T> protoMessageClass) {
- return new Proto2Coder<T>(protoMessageClass, Collections.<Class<?>>emptyList());
- }
-
- /**
- * Returns a {@code Proto2Coder} for the given Protobuf message class.
- */
- public static <T extends Message> Proto2Coder<T> of(TypeDescriptor<T> protoMessageType) {
- @SuppressWarnings("unchecked")
- Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
- return of(protoMessageClass);
- }
-
- /**
- * Produces a {@code Proto2Coder} like this one, but with the extensions from
- * the given classes registered.
- *
- * @param moreExtensionHosts an iterable of classes that define a static
- * method {@code registerAllExtensions(ExtensionRegistry)}
- */
- public Proto2Coder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) {
- for (Class<?> extensionHost : moreExtensionHosts) {
- // Attempt to access the required method, to make sure it's present.
- try {
- Method registerAllExtensions =
- extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
- checkArgument(
- Modifier.isStatic(registerAllExtensions.getModifiers()),
- "Method registerAllExtensions() must be static for use with Proto2Coder");
- } catch (NoSuchMethodException | SecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- return new Proto2Coder<T>(
- protoMessageClass,
- new ImmutableList.Builder<Class<?>>()
- .addAll(extensionHostClasses)
- .addAll(moreExtensionHosts)
- .build());
- }
-
- /**
- * See {@link #withExtensionsFrom(Iterable)}.
- */
- public Proto2Coder<T> withExtensionsFrom(Class<?>... extensionHosts) {
- return withExtensionsFrom(ImmutableList.copyOf(extensionHosts));
- }
-
- /**
- * Adds custom Protobuf extensions to the coder. Returns {@code this}
- * for method chaining.
- *
- * @param extensionHosts must be a class that defines a static
- * method name {@code registerAllExtensions}
- * @deprecated use {@link #withExtensionsFrom}
- */
- @Deprecated
- public Proto2Coder<T> addExtensionsFrom(Class<?>... extensionHosts) {
- return addExtensionsFrom(ImmutableList.copyOf(extensionHosts));
- }
-
- /**
- * Adds custom Protobuf extensions to the coder. Returns {@code this}
- * for method chaining.
- *
- * @param extensionHosts must be a class that defines a static
- * method name {@code registerAllExtensions}
- * @deprecated use {@link #withExtensionsFrom}
- */
- @Deprecated
- public Proto2Coder<T> addExtensionsFrom(Iterable<Class<?>> extensionHosts) {
- for (Class<?> extensionHost : extensionHosts) {
- try {
- // Attempt to access the declared method, to make sure it's present.
- extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
- } catch (NoSuchMethodException e) {
- throw new IllegalArgumentException(e);
- }
- extensionHostClasses.add(extensionHost);
- }
- // The memoized extension registry needs to be recomputed because we have mutated this object.
- synchronized (this) {
- memoizedExtensionRegistry = null;
- getExtensionRegistry();
- }
- return this;
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context) throws IOException {
- if (value == null) {
- throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
- }
- if (context.isWholeStream) {
- value.writeTo(outStream);
- } else {
- value.writeDelimitedTo(outStream);
- }
- }
-
- @Override
- public T decode(InputStream inStream, Context context) throws IOException {
- if (context.isWholeStream) {
- return getParser().parseFrom(inStream, getExtensionRegistry());
- } else {
- return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
- }
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (!(other instanceof Proto2Coder)) {
- return false;
- }
- Proto2Coder<?> otherCoder = (Proto2Coder<?>) other;
- return protoMessageClass.equals(otherCoder.protoMessageClass)
- && Sets.newHashSet(extensionHostClasses)
- .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(protoMessageClass, extensionHostClasses);
- }
-
- /**
- * The encoding identifier is designed to support evolution as per the design of Protocol
- * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol
- * Buffers documentation at
- * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating
- * A Message Type</a>.
- *
- * <p>In particular, the encoding identifier is guaranteed to be the same for {@code Proto2Coder}
- * instances of the same principal message class, and otherwise distinct. Loaded extensions do not
- * affect the id, nor does it encode the full schema.
- *
- * <p>When modifying a message class, here are the broadest guidelines; see the above link
- * for greater detail.
- *
- * <ul>
- * <li>Do not change the numeric tags for any fields.
- * <li>Never remove a <code>required</code> field.
- * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults.
- * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure
- * the new and old types are interchangeable.
- * </ul>
- *
- * <p>Code consuming this message class should be prepared to support <i>all</i> versions of
- * the class until it is certain that no remaining serialized instances exist.
- *
- * <p>If backwards incompatible changes must be made, the best recourse is to change the name
- * of your Protocol Buffers message class.
- */
- @Override
- public String getEncodingId() {
- return protoMessageClass.getName();
- }
-
- private transient Parser<T> memoizedParser;
-
- private Parser<T> getParser() {
- if (memoizedParser == null) {
- try {
- @SuppressWarnings("unchecked")
- T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null);
- @SuppressWarnings("unchecked")
- Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType();
- memoizedParser = tParser;
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalArgumentException(e);
- }
- }
- return memoizedParser;
- }
-
- private transient ExtensionRegistry memoizedExtensionRegistry;
-
- private synchronized ExtensionRegistry getExtensionRegistry() {
- if (memoizedExtensionRegistry == null) {
- ExtensionRegistry registry = ExtensionRegistry.newInstance();
- for (Class<?> extensionHost : extensionHostClasses) {
- try {
- extensionHost
- .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class)
- .invoke(null, registry);
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalStateException(e);
- }
- }
- memoizedExtensionRegistry = registry.getUnmodifiable();
- }
- return memoizedExtensionRegistry;
- }
-
- ////////////////////////////////////////////////////////////////////////////////////
- // JSON Serialization details below
-
- private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
- private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
-
- /**
- * Constructor for JSON deserialization only.
- */
- @JsonCreator
- public static <T extends Message> Proto2Coder<T> of(
- @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
- @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) {
-
- try {
- @SuppressWarnings("unchecked")
- Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName);
- List<Class<?>> extensionHostClasses = Lists.newArrayList();
- if (extensionHostClassNames != null) {
- for (String extensionHostClassName : extensionHostClassNames) {
- extensionHostClasses.add(Class.forName(extensionHostClassName));
- }
- }
- return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
- Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
- List<CloudObject> extensionHostClassNames = Lists.newArrayList();
- for (Class<?> clazz : extensionHostClasses) {
- extensionHostClassNames.add(CloudObject.forString(clazz.getName()));
- }
- Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java
deleted file mode 100644
index 1590b11..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.ObjectStreamClass;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * A {@link Coder} for Java classes that implement {@link Serializable}.
- *
- * <p>To use, specify the coder type on a PCollection:
- * <pre>
- * {@code
- * PCollection<MyRecord> records =
- * foo.apply(...).setCoder(SerializableCoder.of(MyRecord.class));
- * }
- * </pre>
- *
- * <p>{@link SerializableCoder} does not guarantee a deterministic encoding, as Java
- * serialization may produce different binary encodings for two equivalent
- * objects.
- *
- * @param <T> the type of elements handled by this coder
- */
-public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
-
- /**
- * Returns a {@link SerializableCoder} instance for the provided element type.
- * @param <T> the element type
- */
- public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> type) {
- @SuppressWarnings("unchecked")
- Class<T> clazz = (Class<T>) type.getRawType();
- return of(clazz);
- }
-
- /**
- * Returns a {@link SerializableCoder} instance for the provided element class.
- * @param <T> the element type
- */
- public static <T extends Serializable> SerializableCoder<T> of(Class<T> clazz) {
- return new SerializableCoder<>(clazz);
- }
-
- @JsonCreator
- @SuppressWarnings("unchecked")
- public static SerializableCoder<?> of(@JsonProperty("type") String classType)
- throws ClassNotFoundException {
- Class<?> clazz = Class.forName(classType);
- if (!Serializable.class.isAssignableFrom(clazz)) {
- throw new ClassNotFoundException(
- "Class " + classType + " does not implement Serializable");
- }
- return of((Class<? extends Serializable>) clazz);
- }
-
- /**
- * A {@link CoderProvider} that constructs a {@link SerializableCoder}
- * for any class that implements serializable.
- */
- public static final CoderProvider PROVIDER = new CoderProvider() {
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
- throws CannotProvideCoderException {
- Class<?> clazz = typeDescriptor.getRawType();
- if (Serializable.class.isAssignableFrom(clazz)) {
- @SuppressWarnings("unchecked")
- Class<? extends Serializable> serializableClazz =
- (Class<? extends Serializable>) clazz;
- @SuppressWarnings("unchecked")
- Coder<T> coder = (Coder<T>) SerializableCoder.of(serializableClazz);
- return coder;
- } else {
- throw new CannotProvideCoderException(
- "Cannot provide SerializableCoder because " + typeDescriptor
- + " does not implement Serializable");
- }
- }
- };
-
-
- private final Class<T> type;
-
- protected SerializableCoder(Class<T> type) {
- this.type = type;
- }
-
- public Class<T> getRecordType() {
- return type;
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- try {
- ObjectOutputStream oos = new ObjectOutputStream(outStream);
- oos.writeObject(value);
- oos.flush();
- } catch (IOException exn) {
- throw new CoderException("unable to serialize record " + value, exn);
- }
- }
-
- @Override
- public T decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- try {
- ObjectInputStream ois = new ObjectInputStream(inStream);
- return type.cast(ois.readObject());
- } catch (ClassNotFoundException e) {
- throw new CoderException("unable to deserialize record", e);
- }
- }
-
- @Override
- public String getEncodingId() {
- return String.format("%s:%s",
- type.getName(),
- ObjectStreamClass.lookup(type).getSerialVersionUID());
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
- result.put("type", type.getName());
- return result;
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException always. Java serialization is not
- * deterministic with respect to {@link Object#equals} for all types.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "Java Serialization may be non-deterministic.");
- }
-
- @Override
- public boolean equals(Object other) {
- if (getClass() != other.getClass()) {
- return false;
- }
- return type == ((SerializableCoder<?>) other).type;
- }
-
- @Override
- public int hashCode() {
- return type.hashCode();
- }
-
- // This coder inherits isRegisterByteSizeObserverCheap,
- // getEncodedElementByteSize and registerByteSizeObserver
- // from StandardCoder. Looks like we cannot do much better
- // in this case.
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java
deleted file mode 100644
index 589a372..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A {@link SetCoder} encodes any {@link Set} using the format of {@link IterableLikeCoder}. The
- * elements may not be in a deterministic order, depending on the {@code Set} implementation.
- *
- * @param <T> the type of the elements of the set
- */
-public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
-
- /**
- * Produces a {@link SetCoder} with the given {@code elementCoder}.
- */
- public static <T> SetCoder<T> of(Coder<T> elementCoder) {
- return new SetCoder<>(elementCoder);
- }
-
- /**
- * Dynamically typed constructor for JSON deserialization.
- */
- @JsonCreator
- public static SetCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Object> components) {
- Preconditions.checkArgument(components.size() == 1,
- "Expecting 1 component, got " + components.size());
- return of((Coder<?>) components.get(0));
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException always. Sets are not ordered, but
- * they are encoded in the order of an arbitrary iteration.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "Ordering of elements in a set may be non-deterministic.");
- }
-
- /**
- * Returns the first element in this set if it is non-empty,
- * otherwise returns {@code null}.
- */
- public static <T> List<Object> getInstanceComponents(
- Set<T> exampleValue) {
- return getInstanceComponentsHelper(exampleValue);
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal operations below here.
-
- /**
- * {@inheritDoc}
- *
- * @return A new {@link Set} built from the elements in the {@link List} decoded by
- * {@link IterableLikeCoder}.
- */
- @Override
- protected final Set<T> decodeToIterable(List<T> decodedElements) {
- return new HashSet<>(decodedElements);
- }
-
- protected SetCoder(Coder<T> elemCoder) {
- super(elemCoder, "Set");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java
deleted file mode 100644
index ca189b1..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addList;
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-import static com.google.cloud.dataflow.sdk.util.Structs.addStringList;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
-
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing
- * via the class name and recursively using {@link #getComponents}.
- *
- * <p>To extend {@link StandardCoder}, override the following methods as appropriate:
- *
- * <ul>
- * <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li>
- * <li>{@link #getEncodedElementByteSize} and
- * {@link #isRegisterByteSizeObserverCheap}: the
- * default implementation encodes values to bytes and counts the bytes, which is considered
- * expensive.</li>
- * <li>{@link #getEncodingId} and {@link #getAllowedEncodings}: by default, the encoding id
- * is the empty string, so only the canonical name of the subclass will be used for
- * compatibility checks, and no other encoding ids are allowed.</li>
- * </ul>
- */
-public abstract class StandardCoder<T> implements Coder<T> {
- protected StandardCoder() {}
-
- @Override
- public String getEncodingId() {
- return "";
- }
-
- @Override
- public Collection<String> getAllowedEncodings() {
- return Collections.emptyList();
- }
-
- /**
- * Returns the list of {@link Coder Coders} that are components of this {@link Coder}.
- */
- public List<? extends Coder<?>> getComponents() {
- List<? extends Coder<?>> coderArguments = getCoderArguments();
- if (coderArguments == null) {
- return Collections.emptyList();
- } else {
- return coderArguments;
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} if the two {@link StandardCoder} instances have the
- * same class and equal components.
- */
- @Override
- public boolean equals(Object o) {
- if (o == null || this.getClass() != o.getClass()) {
- return false;
- }
- StandardCoder<?> that = (StandardCoder<?>) o;
- return this.getComponents().equals(that.getComponents());
- }
-
- @Override
- public int hashCode() {
- return getClass().hashCode() * 31 + getComponents().hashCode();
- }
-
- @Override
- public String toString() {
- String s = getClass().getName();
- s = s.substring(s.lastIndexOf('.') + 1);
- List<? extends Coder<?>> componentCoders = getComponents();
- if (!componentCoders.isEmpty()) {
- s += "(";
- boolean first = true;
- for (Coder<?> componentCoder : componentCoders) {
- if (first) {
- first = false;
- } else {
- s += ", ";
- }
- s += componentCoder.toString();
- }
- s += ")";
- }
- return s;
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
-
- List<? extends Coder<?>> components = getComponents();
- if (!components.isEmpty()) {
- List<CloudObject> cloudComponents = new ArrayList<>(components.size());
- for (Coder<?> coder : components) {
- cloudComponents.add(coder.asCloudObject());
- }
- addList(result, PropertyNames.COMPONENT_ENCODINGS, cloudComponents);
- }
-
- String encodingId = getEncodingId();
- checkNotNull(encodingId, "Coder.getEncodingId() must not return null.");
- if (!encodingId.isEmpty()) {
- addString(result, PropertyNames.ENCODING_ID, encodingId);
- }
-
- Collection<String> allowedEncodings = getAllowedEncodings();
- if (!allowedEncodings.isEmpty()) {
- addStringList(result, PropertyNames.ALLOWED_ENCODINGS, Lists.newArrayList(allowedEncodings));
- }
-
- return result;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code false} unless it is overridden. {@link StandardCoder#registerByteSizeObserver}
- * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
- * unless it is overridden. This is considered expensive.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
- return false;
- }
-
- /**
- * Returns the size in bytes of the encoded value using this coder.
- */
- protected long getEncodedElementByteSize(T value, Context context)
- throws Exception {
- try {
- CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream());
- encode(value, os, context);
- return os.getCount();
- } catch (Exception exn) {
- throw new IllegalArgumentException(
- "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>For {@link StandardCoder} subclasses, this notifies {@code observer} about the byte size
- * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
- */
- @Override
- public void registerByteSizeObserver(
- T value, ElementByteSizeObserver observer, Context context)
- throws Exception {
- observer.update(getEncodedElementByteSize(value, context));
- }
-
- protected void verifyDeterministic(String message, Iterable<Coder<?>> coders)
- throws NonDeterministicException {
- for (Coder<?> coder : coders) {
- try {
- coder.verifyDeterministic();
- } catch (NonDeterministicException e) {
- throw new NonDeterministicException(this, message, e);
- }
- }
- }
-
- protected void verifyDeterministic(String message, Coder<?>... coders)
- throws NonDeterministicException {
- verifyDeterministic(message, Arrays.asList(coders));
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code false} for {@link StandardCoder} unless overridden.
- */
- @Override
- public boolean consistentWithEquals() {
- return false;
- }
-
- @Override
- public Object structuralValue(T value) throws Exception {
- if (value != null && consistentWithEquals()) {
- return value;
- } else {
- try {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- encode(value, os, Context.OUTER);
- return new StructuralByteArray(os.toByteArray());
- } catch (Exception exn) {
- throw new IllegalArgumentException(
- "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
deleted file mode 100644
index 3f352d3..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
-
-import java.lang.reflect.InvocationTargetException;
-
-/**
- * A {@link Coder} that wraps a {@code Coder<String>}
- * and encodes/decodes values via string representations.
- *
- * <p>To decode, the input byte stream is decoded to
- * a {@link String}, and this is passed to the single-argument
- * constructor for {@code T}.
- *
- * <p>To encode, the input value is converted via {@code toString()},
- * and this string is encoded.
- *
- * <p>In order for this to operate correctly for a class {@code Clazz},
- * it must be the case for any instance {@code x} that
- * {@code x.equals(new Clazz(x.toString()))}.
- *
- * <p>This method of encoding is not designed for ease of evolution of {@code Clazz};
- * it should only be used in cases where the class is stable or the encoding is not
- * important. If evolution of the class is important, see {@link ProtoCoder}, {@link AvroCoder},
- * or {@link JAXBCoder}.
- *
- * @param <T> The type of objects coded.
- */
-public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
- public static <T> StringDelegateCoder<T> of(Class<T> clazz) {
- return new StringDelegateCoder<T>(clazz);
- }
-
- @Override
- public String toString() {
- return "StringDelegateCoder(" + clazz + ")";
- }
-
- private final Class<T> clazz;
-
- protected StringDelegateCoder(final Class<T> clazz) {
- super(StringUtf8Coder.of(),
- new CodingFunction<T, String>() {
- @Override
- public String apply(T input) {
- return input.toString();
- }
- },
- new CodingFunction<String, T>() {
- @Override
- public T apply(String input) throws
- NoSuchMethodException,
- InstantiationException,
- IllegalAccessException,
- InvocationTargetException {
- return clazz.getConstructor(String.class).newInstance(input);
- }
- });
-
- this.clazz = clazz;
- }
-
- /**
- * The encoding id is the fully qualified name of the encoded/decoded class.
- */
- @Override
- public String getEncodingId() {
- return clazz.getName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java
deleted file mode 100644
index 25b8f5e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.ExposedByteArrayOutputStream;
-import com.google.cloud.dataflow.sdk.util.StreamUtils;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.common.base.Utf8;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-import java.nio.charset.StandardCharsets;
-
-/**
- * A {@link Coder} that encodes {@link String Strings} in UTF-8 encoding.
- * If in a nested context, prefixes the string with an integer length field,
- * encoded via a {@link VarIntCoder}.
- */
-public class StringUtf8Coder extends AtomicCoder<String> {
-
- @JsonCreator
- public static StringUtf8Coder of() {
- return INSTANCE;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final StringUtf8Coder INSTANCE = new StringUtf8Coder();
-
- private static void writeString(String value, DataOutputStream dos)
- throws IOException {
- byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
- VarInt.encode(bytes.length, dos);
- dos.write(bytes);
- }
-
- private static String readString(DataInputStream dis) throws IOException {
- int len = VarInt.decodeInt(dis);
- if (len < 0) {
- throw new CoderException("Invalid encoded string length: " + len);
- }
- byte[] bytes = new byte[len];
- dis.readFully(bytes);
- return new String(bytes, StandardCharsets.UTF_8);
- }
-
- private StringUtf8Coder() {}
-
- @Override
- public void encode(String value, OutputStream outStream, Context context)
- throws IOException {
- if (value == null) {
- throw new CoderException("cannot encode a null String");
- }
- if (context.isWholeStream) {
- byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
- if (outStream instanceof ExposedByteArrayOutputStream) {
- ((ExposedByteArrayOutputStream) outStream).writeAndOwn(bytes);
- } else {
- outStream.write(bytes);
- }
- } else {
- writeString(value, new DataOutputStream(outStream));
- }
- }
-
- @Override
- public String decode(InputStream inStream, Context context)
- throws IOException {
- if (context.isWholeStream) {
- byte[] bytes = StreamUtils.getBytes(inStream);
- return new String(bytes, StandardCharsets.UTF_8);
- } else {
- try {
- return readString(new DataInputStream(inStream));
- } catch (EOFException | UTFDataFormatException exn) {
- // These exceptions correspond to decoding problems, so change
- // what kind of exception they're branded as.
- throw new CoderException(exn);
- }
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. This coder is injective.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the byte size of the UTF-8 encoding of the a string or, in a nested context,
- * the byte size of the encoding plus the encoded length prefix.
- */
- @Override
- protected long getEncodedElementByteSize(String value, Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null String");
- }
- if (context.isWholeStream) {
- return Utf8.encodedLength(value);
- } else {
- CountingOutputStream countingStream =
- new CountingOutputStream(ByteStreams.nullOutputStream());
- DataOutputStream stream = new DataOutputStream(countingStream);
- writeString(value, stream);
- return countingStream.getCount();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java
deleted file mode 100644
index aa44456..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.api.client.util.Base64.encodeBase64String;
-
-import java.util.Arrays;
-
-/**
- * A wrapper around a byte[] that uses structural, value-based
- * equality rather than byte[]'s normal object identity.
- */
-public class StructuralByteArray {
- byte[] value;
-
- public StructuralByteArray(byte[] value) {
- this.value = value;
- }
-
- public byte[] getValue() {
- return value;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof StructuralByteArray) {
- StructuralByteArray that = (StructuralByteArray) o;
- return Arrays.equals(this.value, that.value);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(value);
- }
-
- @Override
- public String toString() {
- return "base64:" + encodeBase64String(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java
deleted file mode 100644
index b02fb08..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.api.services.bigquery.model.TableRow;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format.
- */
-public class TableRowJsonCoder extends AtomicCoder<TableRow> {
-
- @JsonCreator
- public static TableRowJsonCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(TableRow value, OutputStream outStream, Context context)
- throws IOException {
- String strValue = MAPPER.writeValueAsString(value);
- StringUtf8Coder.of().encode(strValue, outStream, context);
- }
-
- @Override
- public TableRow decode(InputStream inStream, Context context)
- throws IOException {
- String strValue = StringUtf8Coder.of().decode(inStream, context);
- return MAPPER.readValue(strValue, TableRow.class);
- }
-
- @Override
- protected long getEncodedElementByteSize(TableRow value, Context context)
- throws Exception {
- String strValue = MAPPER.writeValueAsString(value);
- return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context);
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
- // TableRow.
- private static final ObjectMapper MAPPER =
- new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
-
- private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
-
- private TableRowJsonCoder() { }
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException always. A {@link TableRow} can hold arbitrary
- * {@link Object} instances, which makes the encoding non-deterministic.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "TableCell can hold arbitrary instances, which may be non-deterministic.");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java
deleted file mode 100644
index 539c56a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of
- * their textual, decimal, representation.
- */
-public class TextualIntegerCoder extends AtomicCoder<Integer> {
-
- @JsonCreator
- public static TextualIntegerCoder of() {
- return new TextualIntegerCoder();
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- protected TextualIntegerCoder() {}
-
- @Override
- public void encode(Integer value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null Integer");
- }
- String textualValue = value.toString();
- StringUtf8Coder.of().encode(textualValue, outStream, context);
- }
-
- @Override
- public Integer decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- String textualValue = StringUtf8Coder.of().decode(inStream, context);
- try {
- return Integer.valueOf(textualValue);
- } catch (NumberFormatException exn) {
- throw new CoderException("error when decoding a textual integer", exn);
- }
- }
-
- @Override
- protected long getEncodedElementByteSize(Integer value, Context context) throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null Integer");
- }
- String textualValue = value.toString();
- return StringUtf8Coder.of().getEncodedElementByteSize(textualValue, context);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java
deleted file mode 100644
index 42862bb..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.VarInt;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link Coder} that encodes {@link Integer Integers} using between 1 and 5 bytes. Negative
- * numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for
- * integers that are known to often be large or negative.
- */
-public class VarIntCoder extends AtomicCoder<Integer> {
-
- @JsonCreator
- public static VarIntCoder of() {
- return INSTANCE;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final VarIntCoder INSTANCE =
- new VarIntCoder();
-
- private VarIntCoder() {}
-
- @Override
- public void encode(Integer value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null Integer");
- }
- VarInt.encode(value.intValue(), outStream);
- }
-
- @Override
- public Integer decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- try {
- return VarInt.decodeInt(inStream);
- } catch (EOFException | UTFDataFormatException exn) {
- // These exceptions correspond to decoding problems, so change
- // what kind of exception they're branded as.
- throw new CoderException(exn);
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. {@link VarIntCoder} is injective.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. {@link #getEncodedElementByteSize} is cheap.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) {
- return true;
- }
-
- @Override
- protected long getEncodedElementByteSize(Integer value, Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null Integer");
- }
- return VarInt.getLength(value.longValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java
deleted file mode 100644
index 669453e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.VarInt;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link Coder} that encodes {@link Long Longs} using between 1 and 10 bytes. Negative
- * numbers always take 10 bytes, so {@link BigEndianLongCoder} may be preferable for
- * longs that are known to often be large or negative.
- */
-public class VarLongCoder extends AtomicCoder<Long> {
-
- @JsonCreator
- public static VarLongCoder of() {
- return INSTANCE;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final VarLongCoder INSTANCE = new VarLongCoder();
-
- private VarLongCoder() {}
-
- @Override
- public void encode(Long value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null Long");
- }
- VarInt.encode(value.longValue(), outStream);
- }
-
- @Override
- public Long decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- try {
- return VarInt.decodeLong(inStream);
- } catch (EOFException | UTFDataFormatException exn) {
- // These exceptions correspond to decoding problems, so change
- // what kind of exception they're branded as.
- throw new CoderException(exn);
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. {@link VarLongCoder} is injective.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. {@link #getEncodedElementByteSize} is cheap.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(Long value, Context context) {
- return true;
- }
-
- @Override
- protected long getEncodedElementByteSize(Long value, Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null Long");
- }
- return VarInt.getLength(value.longValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java
deleted file mode 100644
index 813ee2f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}.
- */
-public class VoidCoder extends AtomicCoder<Void> {
-
- @JsonCreator
- public static VoidCoder of() {
- return INSTANCE;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final VoidCoder INSTANCE = new VoidCoder();
-
- private VoidCoder() {}
-
- @Override
- public void encode(Void value, OutputStream outStream, Context context) {
- // Nothing to write!
- }
-
- @Override
- public Void decode(InputStream inStream, Context context) {
- // Nothing to read!
- return null;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. {@link VoidCoder} is (vacuously) injective.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. {@link VoidCoder#getEncodedElementByteSize} runs in constant time.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(Void value, Context context) {
- return true;
- }
-
- @Override
- protected long getEncodedElementByteSize(Void value, Context context)
- throws Exception {
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java
deleted file mode 100644
index a3bc150..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines {@link com.google.cloud.dataflow.sdk.coders.Coder Coders}
- * to specify how data is encoded to and decoded from byte strings.
- *
- * <p>During execution of a Pipeline, elements in a
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}
- * may need to be encoded into byte strings.
- * This happens both at the beginning and end of a pipeline when data is read from and written to
- * persistent storage and also during execution of a pipeline when elements are communicated between
- * machines.
- *
- * <p>Exactly when PCollection elements are encoded during execution depends on which
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner} is being used and how that runner
- * chooses to execute the pipeline. As such, Dataflow requires that all PCollections have an
- * appropriate Coder in case it becomes necessary. In many cases, the Coder can be inferred from
- * the available Java type
- * information and the Pipeline's {@link com.google.cloud.dataflow.sdk.coders.CoderRegistry}. It
- * can be specified per PCollection via
- * {@link com.google.cloud.dataflow.sdk.values.PCollection#setCoder(Coder)} or per type using the
- * {@link com.google.cloud.dataflow.sdk.coders.DefaultCoder} annotation.
- *
- * <p>This package provides a number of coders for common types like {@code Integer},
- * {@code String}, and {@code List}, as well as coders like
- * {@link com.google.cloud.dataflow.sdk.coders.AvroCoder} that can be used to encode many custom
- * types.
- *
- */
-package com.google.cloud.dataflow.sdk.coders;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
deleted file mode 100644
index 88d13e9..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders.protobuf;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.CoderProvider;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.Structs;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Coder} using Google Protocol Buffers binary format. {@link ProtoCoder} supports both
- * Protocol Buffers syntax versions 2 and 3.
- *
- * <p>To learn more about Protocol Buffers, visit:
- * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a>
- *
- * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as the default
- * {@link Coder} for any {@link Message} object. Custom message extensions are also supported, but
- * these extensions must be registered for a particular {@link ProtoCoder} instance and that
- * instance must be registered on the {@link PCollection} that needs the extensions:
- *
- * <pre>{@code
- * import MyProtoFile;
- * import MyProtoFile.MyMessage;
- *
- * Coder<MyMessage> coder = ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class);
- * PCollection<MyMessage> records = input.apply(...).setCoder(coder);
- * }</pre>
- *
- * <h3>Versioning</h3>
- *
- * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol Buffers syntax. However,
- * the Java runtime version of the <code>google.com.protobuf</code> library must match exactly the
- * version of <code>protoc</code> that was used to produce the JAR files containing the compiled
- * <code>.proto</code> messages.
- *
- * <p>For more information, see the
- * <a href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types">Protocol Buffers documentation</a>.
- *
- * <h3>{@link ProtoCoder} and Determinism</h3>
- *
- * <p>In general, Protocol Buffers messages can be encoded deterministically within a single
- * pipeline as long as:
- *
- * <ul>
- * <li>The encoded messages (and any transitively linked messages) do not use <code>map</code>
- * fields.</li>
- * <li>Every Java VM that encodes or decodes the messages use the same runtime version of the
- * Protocol Buffers library and the same compiled <code>.proto</code> file JAR.</li>
- * </ul>
- *
- * <h3>{@link ProtoCoder} and Encoding Stability</h3>
- *
- * <p>When changing Protocol Buffers messages, follow the rules in the Protocol Buffers language
- * guides for
- * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">{@code proto2}</a>
- * and
- * <a href="https://developers.google.com/protocol-buffers/docs/proto3#updating">{@code proto3}</a>
- * syntaxes, depending on your message type. Following these guidelines will ensure that the
- * old encoded data can be read by new versions of the code.
- *
- * <p>Generally, any change to the message type, registered extensions, runtime library, or
- * compiled proto JARs may change the encoding. Thus even if both the original and updated messages
- * can be encoded deterministically within a single job, these deterministic encodings may not be
- * the same across jobs.
- *
- * @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}.
- */
-public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
-
- /**
- * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
- * {@link ExtensionRegistry}.
- */
- public static CoderProvider coderProvider() {
- return PROVIDER;
- }
-
- /**
- * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
- */
- public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) {
- return new ProtoCoder<T>(protoMessageClass, ImmutableSet.<Class<?>>of());
- }
-
- /**
- * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} indicated by the given
- * {@link TypeDescriptor}.
- */
- public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> protoMessageType) {
- @SuppressWarnings("unchecked")
- Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
- return of(protoMessageClass);
- }
-
- /**
- * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes
- * registered.
- *
- * <p>Each of the extension host classes must be an class automatically generated by the
- * Protocol Buffers compiler, {@code protoc}, that contains messages.
- *
- * <p>Does not modify this object.
- */
- public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) {
- for (Class<?> extensionHost : moreExtensionHosts) {
- // Attempt to access the required method, to make sure it's present.
- try {
- Method registerAllExtensions =
- extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
- checkArgument(
- Modifier.isStatic(registerAllExtensions.getModifiers()),
- "Method registerAllExtensions() must be static");
- } catch (NoSuchMethodException | SecurityException e) {
- throw new IllegalArgumentException(
- String.format("Unable to register extensions for %s", extensionHost.getCanonicalName()),
- e);
- }
- }
-
- return new ProtoCoder<T>(
- protoMessageClass,
- new ImmutableSet.Builder<Class<?>>()
- .addAll(extensionHostClasses)
- .addAll(moreExtensionHosts)
- .build());
- }
-
- /**
- * See {@link #withExtensionsFrom(Iterable)}.
- *
- * <p>Does not modify this object.
- */
- public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) {
- return withExtensionsFrom(Arrays.asList(moreExtensionHosts));
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context) throws IOException {
- if (value == null) {
- throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
- }
- if (context.isWholeStream) {
- value.writeTo(outStream);
- } else {
- value.writeDelimitedTo(outStream);
- }
- }
-
- @Override
- public T decode(InputStream inStream, Context context) throws IOException {
- if (context.isWholeStream) {
- return getParser().parseFrom(inStream, getExtensionRegistry());
- } else {
- return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
- }
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (!(other instanceof ProtoCoder)) {
- return false;
- }
- ProtoCoder<?> otherCoder = (ProtoCoder<?>) other;
- return protoMessageClass.equals(otherCoder.protoMessageClass)
- && Sets.newHashSet(extensionHostClasses)
- .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(protoMessageClass, extensionHostClasses);
- }
-
- /**
- * The encoding identifier is designed to support evolution as per the design of Protocol
- * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol
- * Buffers documentation at
- * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating
- * A Message Type</a>.
- *
- * <p>In particular, the encoding identifier is guaranteed to be the same for {@link ProtoCoder}
- * instances of the same principal message class, with the same registered extension host classes,
- * and otherwise distinct. Note that the encoding ID does not encode any version of the message
- * or extensions, nor does it include the message schema.
- *
- * <p>When modifying a message class, here are the broadest guidelines; see the above link
- * for greater detail.
- *
- * <ul>
- * <li>Do not change the numeric tags for any fields.
- * <li>Never remove a <code>required</code> field.
- * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults.
- * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure
- * the new and old types are interchangeable.
- * </ul>
- *
- * <p>Code consuming this message class should be prepared to support <i>all</i> versions of
- * the class until it is certain that no remaining serialized instances exist.
- *
- * <p>If backwards incompatible changes must be made, the best recourse is to change the name
- * of your Protocol Buffers message class.
- */
- @Override
- public String getEncodingId() {
- return protoMessageClass.getName() + getSortedExtensionClasses().toString();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- ProtobufUtil.verifyDeterministic(this);
- }
-
- /**
- * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} supports.
- */
- public Class<T> getMessageType() {
- return protoMessageClass;
- }
-
- /**
- * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages
- * to {@code T} registered with this {@link ProtoCoder}.
- */
- public ExtensionRegistry getExtensionRegistry() {
- if (memoizedExtensionRegistry == null) {
- ExtensionRegistry registry = ExtensionRegistry.newInstance();
- for (Class<?> extensionHost : extensionHostClasses) {
- try {
- extensionHost
- .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class)
- .invoke(null, registry);
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalStateException(e);
- }
- }
- memoizedExtensionRegistry = registry.getUnmodifiable();
- }
- return memoizedExtensionRegistry;
- }
-
- ////////////////////////////////////////////////////////////////////////////////////
- // Private implementation details below.
-
- /** The {@link Message} type to be coded. */
- private final Class<T> protoMessageClass;
-
- /**
- * All extension host classes included in this {@link ProtoCoder}. The extensions from these
- * classes will be included in the {@link ExtensionRegistry} used during encoding and decoding.
- */
- private final Set<Class<?>> extensionHostClasses;
-
- // Constants used to serialize and deserialize
- private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
- private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
-
- // Transient fields that are lazy initialized and then memoized.
- private transient ExtensionRegistry memoizedExtensionRegistry;
- private transient Parser<T> memoizedParser;
-
- /** Private constructor. */
- private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> extensionHostClasses) {
- this.protoMessageClass = protoMessageClass;
- this.extensionHostClasses = extensionHostClasses;
- }
-
- /**
- * @deprecated For JSON deserialization only.
- */
- @JsonCreator
- @Deprecated
- public static <T extends Message> ProtoCoder<T> of(
- @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
- @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) {
-
- try {
- @SuppressWarnings("unchecked")
- Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName);
- List<Class<?>> extensionHostClasses = Lists.newArrayList();
- if (extensionHostClassNames != null) {
- for (String extensionHostClassName : extensionHostClassNames) {
- extensionHostClasses.add(Class.forName(extensionHostClassName));
- }
- }
- return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
- Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
- List<CloudObject> extensionHostClassNames = Lists.newArrayList();
- for (String className : getSortedExtensionClasses()) {
- extensionHostClassNames.add(CloudObject.forString(className));
- }
- Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
- return result;
- }
-
- /** Get the memoized {@link Parser}, possibly initializing it lazily. */
- private Parser<T> getParser() {
- if (memoizedParser == null) {
- try {
- @SuppressWarnings("unchecked")
- T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null);
- @SuppressWarnings("unchecked")
- Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType();
- memoizedParser = tParser;
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalArgumentException(e);
- }
- }
- return memoizedParser;
- }
-
- /**
- * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
- * {@link #coderProvider()}.
- */
- private static final CoderProvider PROVIDER =
- new CoderProvider() {
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- if (!type.isSubtypeOf(new TypeDescriptor<Message>() {})) {
- throw new CannotProvideCoderException(
- String.format(
- "Cannot provide %s because %s is not a subclass of %s",
- ProtoCoder.class.getSimpleName(),
- type,
- Message.class.getName()));
- }
-
- @SuppressWarnings("unchecked")
- TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type;
- try {
- @SuppressWarnings("unchecked")
- Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
- return coder;
- } catch (IllegalArgumentException e) {
- throw new CannotProvideCoderException(e);
- }
- }
- };
-
- private SortedSet<String> getSortedExtensionClasses() {
- SortedSet<String> ret = new TreeSet<>();
- for (Class<?> clazz : extensionHostClasses) {
- ret.add(clazz.getName());
- }
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java
deleted file mode 100644
index 24415ac..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders.protobuf;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.protobuf.Descriptors.Descriptor;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
-import com.google.protobuf.Descriptors.GenericDescriptor;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.ExtensionRegistry.ExtensionInfo;
-import com.google.protobuf.Message;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Utility functions for reflecting and analyzing Protocol Buffers classes.
- *
- * <p>Used by {@link ProtoCoder}, but in a separate file for testing and isolation.
- */
-class ProtobufUtil {
- /**
- * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message}.
- *
- * @throws IllegalArgumentException if there is an error in Java reflection.
- */
- static Descriptor getDescriptorForClass(Class<? extends Message> clazz) {
- try {
- return (Descriptor) clazz.getMethod("getDescriptor").invoke(null);
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- /**
- * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message} as well as
- * every class it can include transitively.
- *
- * @throws IllegalArgumentException if there is an error in Java reflection.
- */
- static Set<Descriptor> getRecursiveDescriptorsForClass(
- Class<? extends Message> clazz, ExtensionRegistry registry) {
- Descriptor root = getDescriptorForClass(clazz);
- Set<Descriptor> descriptors = new HashSet<>();
- recursivelyAddDescriptors(root, descriptors, registry);
- return descriptors;
- }
-
- /**
- * Recursively walks the given {@link Message} class and verifies that every field or message
- * linked in uses the Protocol Buffers proto2 syntax.
- */
- static void checkProto2Syntax(Class<? extends Message> clazz, ExtensionRegistry registry) {
- for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) {
- Syntax s = d.getFile().getSyntax();
- checkArgument(
- s == Syntax.PROTO2,
- "Message %s or one of its dependencies does not use proto2 syntax: %s in file %s",
- clazz.getName(),
- d.getFullName(),
- d.getFile().getName());
- }
- }
-
- /**
- * Recursively checks whether the specified class uses any Protocol Buffers fields that cannot
- * be deterministically encoded.
- *
- * @throws NonDeterministicException if the object cannot be encoded deterministically.
- */
- static void verifyDeterministic(ProtoCoder<?> coder) throws NonDeterministicException {
- Class<? extends Message> message = coder.getMessageType();
- ExtensionRegistry registry = coder.getExtensionRegistry();
- Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, registry);
- for (Descriptor d : descriptors) {
- for (FieldDescriptor fd : d.getFields()) {
- // If there is a transitively reachable Protocol Buffers map field, then this object cannot
- // be encoded deterministically.
- if (fd.isMapField()) {
- String reason =
- String.format(
- "Protocol Buffers message %s transitively includes Map field %s (from file %s)."
- + " Maps cannot be deterministically encoded.",
- message.getName(),
- fd.getFullName(),
- fd.getFile().getFullName());
- throw new NonDeterministicException(coder, reason);
- }
- }
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////
- // Disable construction of utility class
- private ProtobufUtil() {}
-
- private static void recursivelyAddDescriptors(
- Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry registry) {
- if (descriptors.contains(message)) {
- return;
- }
- descriptors.add(message);
-
- for (FieldDescriptor f : message.getFields()) {
- recursivelyAddDescriptors(f, descriptors, registry);
- }
- for (FieldDescriptor f : message.getExtensions()) {
- recursivelyAddDescriptors(f, descriptors, registry);
- }
- for (ExtensionInfo info :
- registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) {
- recursivelyAddDescriptors(info.descriptor, descriptors, registry);
- }
- for (ExtensionInfo info :
- registry.getAllMutableExtensionsByExtendedType(message.getFullName())) {
- recursivelyAddDescriptors(info.descriptor, descriptors, registry);
- }
- }
-
- private static void recursivelyAddDescriptors(
- FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry registry) {
- switch (field.getType()) {
- case BOOL:
- case BYTES:
- case DOUBLE:
- case ENUM:
- case FIXED32:
- case FIXED64:
- case FLOAT:
- case INT32:
- case INT64:
- case SFIXED32:
- case SFIXED64:
- case SINT32:
- case SINT64:
- case STRING:
- case UINT32:
- case UINT64:
- // Primitive types do not transitively access anything else.
- break;
-
- case GROUP:
- case MESSAGE:
- // Recursively adds all the fields from this nested Message.
- recursivelyAddDescriptors(field.getMessageType(), descriptors, registry);
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Unexpected Protocol Buffers field type: " + field.getType());
- }
- }
-}