You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:13 UTC
[49/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/BigEndianIntegerCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/BigEndianIntegerCoder.java
deleted file mode 100644
index 24f6a45..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/BigEndianIntegerCoder.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link BigEndianIntegerCoder} encodes {@link Integer Integers} in 4 bytes, big-endian.
- */
-public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
-
- @JsonCreator
- public static BigEndianIntegerCoder of() {
- return INSTANCE;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final BigEndianIntegerCoder INSTANCE = new BigEndianIntegerCoder();
-
- private BigEndianIntegerCoder() {}
-
- @Override
- public void encode(Integer value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null Integer");
- }
- new DataOutputStream(outStream).writeInt(value);
- }
-
- @Override
- public Integer decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- try {
- return new DataInputStream(inStream).readInt();
- } catch (EOFException | UTFDataFormatException exn) {
- // These exceptions correspond to decoding problems, so change
- // what kind of exception they're branded as.
- throw new CoderException(exn);
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. This coder is injective.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}, because {@link #getEncodedElementByteSize} runs in constant time.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code 4}, the size in bytes of an integer's big endian encoding.
- */
- @Override
- protected long getEncodedElementByteSize(Integer value, Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null Integer");
- }
- return 4;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/BigEndianLongCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/BigEndianLongCoder.java
deleted file mode 100644
index 4196608..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/BigEndianLongCoder.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link BigEndianLongCoder} encodes {@link Long}s in 8 bytes, big-endian.
- */
-public class BigEndianLongCoder extends AtomicCoder<Long> {
-
- @JsonCreator
- public static BigEndianLongCoder of() {
- return INSTANCE;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final BigEndianLongCoder INSTANCE = new BigEndianLongCoder();
-
- private BigEndianLongCoder() {}
-
- @Override
- public void encode(Long value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null Long");
- }
- new DataOutputStream(outStream).writeLong(value);
- }
-
- @Override
- public Long decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- try {
- return new DataInputStream(inStream).readLong();
- } catch (EOFException | UTFDataFormatException exn) {
- // These exceptions correspond to decoding problems, so change
- // what kind of exception they're branded as.
- throw new CoderException(exn);
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. This coder is injective.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}, since {@link #getEncodedElementByteSize} returns a constant.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(Long value, Context context) {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code 8}, the byte size of a big-endian encoded {@code Long}.
- */
- @Override
- protected long getEncodedElementByteSize(Long value, Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null Long");
- }
- return 8;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteArrayCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteArrayCoder.java
deleted file mode 100644
index 1e555c6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteArrayCoder.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.ExposedByteArrayOutputStream;
-import com.google.cloud.dataflow.sdk.util.StreamUtils;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.common.io.ByteStreams;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for {@code byte[]}.
- *
- * <p>The encoding format is as follows:
- * <ul>
- * <li>If in a non-nested context (the {@code byte[]} is the only value in the stream), the
- * bytes are read/written directly.</li>
- * <li>If in a nested context, the bytes are prefixed with the length of the array,
- * encoded via a {@link VarIntCoder}.</li>
- * </ul>
- */
-public class ByteArrayCoder extends AtomicCoder<byte[]> {
-
- @JsonCreator
- public static ByteArrayCoder of() {
- return INSTANCE;
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final ByteArrayCoder INSTANCE = new ByteArrayCoder();
-
- private ByteArrayCoder() {}
-
- @Override
- public void encode(byte[] value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null byte[]");
- }
- if (!context.isWholeStream) {
- VarInt.encode(value.length, outStream);
- outStream.write(value);
- } else {
- outStream.write(value);
- }
- }
-
- /**
- * Encodes the provided {@code value} with the identical encoding to {@link #encode}, but with
- * optimizations that take ownership of the value.
- *
- * <p>Once passed to this method, {@code value} should never be observed or mutated again.
- */
- public void encodeAndOwn(byte[] value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (!context.isWholeStream) {
- VarInt.encode(value.length, outStream);
- outStream.write(value);
- } else {
- if (outStream instanceof ExposedByteArrayOutputStream) {
- ((ExposedByteArrayOutputStream) outStream).writeAndOwn(value);
- } else {
- outStream.write(value);
- }
- }
- }
-
- @Override
- public byte[] decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- if (context.isWholeStream) {
- return StreamUtils.getBytes(inStream);
- } else {
- int length = VarInt.decodeInt(inStream);
- if (length < 0) {
- throw new IOException("invalid length " + length);
- }
- byte[] value = new byte[length];
- ByteStreams.readFully(inStream, value);
- return value;
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @return objects that are equal if the two arrays contain the same bytes.
- */
- @Override
- public Object structuralValue(byte[] value) {
- return new StructuralByteArray(value);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} since {@link #getEncodedElementByteSize} runs in
- * constant time using the {@code length} of the provided array.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(byte[] value, Context context) {
- return true;
- }
-
- @Override
- protected long getEncodedElementByteSize(byte[] value, Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null byte[]");
- }
- long size = 0;
- if (!context.isWholeStream) {
- size += VarInt.getLength(value.length);
- }
- return size + value.length;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java
deleted file mode 100644
index 9f17497..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link ByteCoder} encodes {@link Byte} values in 1 byte using Java serialization.
- */
-public class ByteCoder extends AtomicCoder<Byte> {
-
- @JsonCreator
- public static ByteCoder of() {
- return INSTANCE;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final ByteCoder INSTANCE = new ByteCoder();
-
- private ByteCoder() {}
-
- @Override
- public void encode(Byte value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null Byte");
- }
- outStream.write(value.byteValue());
- }
-
- @Override
- public Byte decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- try {
- // value will be between 0-255, -1 for EOF
- int value = inStream.read();
- if (value == -1) {
- throw new EOFException("EOF encountered decoding 1 byte from input stream");
- }
- return (byte) value;
- } catch (EOFException | UTFDataFormatException exn) {
- // These exceptions correspond to decoding problems, so change
- // what kind of exception they're branded as.
- throw new CoderException(exn);
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * {@link ByteCoder} will never throw a {@link Coder.NonDeterministicException}; bytes can always
- * be encoded deterministically.
- */
- @Override
- public void verifyDeterministic() {}
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. This coder is injective.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. {@link ByteCoder#getEncodedElementByteSize} returns a constant.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(Byte value, Context context) {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code 1}, the byte size of a {@link Byte} encoded using Java serialization.
- */
- @Override
- protected long getEncodedElementByteSize(Byte value, Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot estimate size for unsupported null value");
- }
- return 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java
deleted file mode 100644
index b7c1a3c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.common.io.ByteStreams;
-import com.google.protobuf.ByteString;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for {@link ByteString} objects based on their encoded Protocol Buffer form.
- *
- * <p>When this code is used in a nested {@link Coder.Context}, the serialized {@link ByteString}
- * objects are first delimited by their size.
- */
-public class ByteStringCoder extends AtomicCoder<ByteString> {
-
- @JsonCreator
- public static ByteStringCoder of() {
- return INSTANCE;
- }
-
- /***************************/
-
- private static final ByteStringCoder INSTANCE = new ByteStringCoder();
-
- private ByteStringCoder() {}
-
- @Override
- public void encode(ByteString value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null ByteString");
- }
-
- if (!context.isWholeStream) {
- // ByteString is not delimited, so write its size before its contents.
- VarInt.encode(value.size(), outStream);
- }
- value.writeTo(outStream);
- }
-
- @Override
- public ByteString decode(InputStream inStream, Context context) throws IOException {
- if (context.isWholeStream) {
- return ByteString.readFrom(inStream);
- }
-
- int size = VarInt.decodeInt(inStream);
- // ByteString reads to the end of the input stream, so give it a limited stream of exactly
- // the right length. Also set its chunk size so that the ByteString will contain exactly
- // one chunk.
- return ByteString.readFrom(ByteStreams.limit(inStream, size), size);
- }
-
- @Override
- protected long getEncodedElementByteSize(ByteString value, Context context) throws Exception {
- int size = value.size();
-
- if (context.isWholeStream) {
- return size;
- }
- return VarInt.getLength(size) + size;
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>Returns true; the encoded output of two invocations of {@link ByteStringCoder} in the same
- * {@link Coder.Context} will be identical if and only if the original {@link ByteString} objects
- * are equal according to {@link Object#equals}.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>Returns true. {@link ByteString#size} returns the size of an array and a {@link VarInt}.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(ByteString value, Context context) {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java
deleted file mode 100644
index 97b5e23..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-/**
- * The exception thrown when a {@link CoderProvider} cannot
- * provide a {@link Coder} that has been requested.
- */
-public class CannotProvideCoderException extends Exception {
- private final ReasonCode reason;
-
- public CannotProvideCoderException(String message) {
- this(message, ReasonCode.UNKNOWN);
- }
-
- public CannotProvideCoderException(String message, ReasonCode reason) {
- super(message);
- this.reason = reason;
- }
-
- public CannotProvideCoderException(String message, Throwable cause) {
- this(message, cause, ReasonCode.UNKNOWN);
- }
-
- public CannotProvideCoderException(String message, Throwable cause, ReasonCode reason) {
- super(message, cause);
- this.reason = reason;
- }
-
- public CannotProvideCoderException(Throwable cause) {
- this(cause, ReasonCode.UNKNOWN);
- }
-
- public CannotProvideCoderException(Throwable cause, ReasonCode reason) {
- super(cause);
- this.reason = reason;
- }
-
- /**
- * @return the reason that Coder inference failed.
- */
- public ReasonCode getReason() {
- return reason;
- }
-
- /**
- * Returns the inner-most {@link CannotProvideCoderException} when they are deeply nested.
- *
- * <p>For example, if a coder for {@code List<KV<Integer, Whatsit>>} cannot be provided because
- * there is no known coder for {@code Whatsit}, the root cause of the exception should be a
- * CannotProvideCoderException with details pertinent to {@code Whatsit}, suppressing the
- * intermediate layers.
- */
- public Throwable getRootCause() {
- Throwable cause = getCause();
- if (cause == null) {
- return this;
- } else if (!(cause instanceof CannotProvideCoderException)) {
- return cause;
- } else {
- return ((CannotProvideCoderException) cause).getRootCause();
- }
- }
-
- /**
- * Indicates the reason that {@link Coder} inference failed.
- */
- public static enum ReasonCode {
- /**
- * The reason a coder could not be provided is unknown or does have an established
- * {@link ReasonCode}.
- */
- UNKNOWN,
-
- /**
- * The reason a coder could not be provided is type erasure, for example when requesting
- * coder inference for a {@code List<T>} where {@code T} is unknown.
- */
- TYPE_ERASURE
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java
deleted file mode 100644
index f3a8bec..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Joiner;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Coder Coder<T>} defines how to encode and decode values of type {@code T} into
- * byte streams.
- *
- * <p>{@link Coder} instances are serialized during job creation and deserialized
- * before use, via JSON serialization. See {@link SerializableCoder} for an example of a
- * {@link Coder} that adds a custom field to
- * the {@link Coder} serialization. It provides a constructor annotated with
- * {@link com.fasterxml.jackson.annotation.JsonCreator}, which is a factory method used when
- * deserializing a {@link Coder} instance.
- *
- * <p>{@link Coder} classes for compound types are often composed from coder classes for types
- * contains therein. The composition of {@link Coder} instances into a coder for the compound
- * class is the subject of the {@link CoderFactory} type, which enables automatic generic
- * composition of {@link Coder} classes within the {@link CoderRegistry}. With particular
- * static methods on a compound {@link Coder} class, a {@link CoderFactory} can be automatically
- * inferred. See {@link KvCoder} for an example of a simple compound {@link Coder} that supports
- * automatic composition in the {@link CoderRegistry}.
- *
- * <p>The binary format of a {@link Coder} is identified by {@link #getEncodingId()}; be sure to
- * understand the requirements for evolving coder formats.
- *
- * <p>All methods of a {@link Coder} are required to be thread safe.
- *
- * @param <T> the type of the values being transcoded
- */
-public interface Coder<T> extends Serializable {
- /** The context in which encoding or decoding is being done. */
- public static class Context {
- /**
- * The outer context: the value being encoded or decoded takes
- * up the remainder of the record/stream contents.
- */
- public static final Context OUTER = new Context(true);
-
- /**
- * The nested context: the value being encoded or decoded is
- * (potentially) a part of a larger record/stream contents, and
- * may have other parts encoded or decoded after it.
- */
- public static final Context NESTED = new Context(false);
-
- /**
- * Whether the encoded or decoded value fills the remainder of the
- * output or input (resp.) record/stream contents. If so, then
- * the size of the decoded value can be determined from the
- * remaining size of the record/stream contents, and so explicit
- * lengths aren't required.
- */
- public final boolean isWholeStream;
-
- public Context(boolean isWholeStream) {
- this.isWholeStream = isWholeStream;
- }
-
- public Context nested() {
- return NESTED;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof Context)) {
- return false;
- }
- return Objects.equal(isWholeStream, ((Context) obj).isWholeStream);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(isWholeStream);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Context.class)
- .addValue(isWholeStream ? "OUTER" : "NESTED").toString();
- }
- }
-
- /**
- * Encodes the given value of type {@code T} onto the given output stream
- * in the given context.
- *
- * @throws IOException if writing to the {@code OutputStream} fails
- * for some reason
- * @throws CoderException if the value could not be encoded for some reason
- */
- public void encode(T value, OutputStream outStream, Context context)
- throws CoderException, IOException;
-
- /**
- * Decodes a value of type {@code T} from the given input stream in
- * the given context. Returns the decoded value.
- *
- * @throws IOException if reading from the {@code InputStream} fails
- * for some reason
- * @throws CoderException if the value could not be decoded for some reason
- */
- public T decode(InputStream inStream, Context context)
- throws CoderException, IOException;
-
- /**
- * If this is a {@code Coder} for a parameterized type, returns the
- * list of {@code Coder}s being used for each of the parameters, or
- * returns {@code null} if this cannot be done or this is not a
- * parameterized type.
- */
- public List<? extends Coder<?>> getCoderArguments();
-
- /**
- * Returns the {@link CloudObject} that represents this {@code Coder}.
- */
- public CloudObject asCloudObject();
-
- /**
- * Throw {@link NonDeterministicException} if the coding is not deterministic.
- *
- * <p>In order for a {@code Coder} to be considered deterministic,
- * the following must be true:
- * <ul>
- * <li>two values that compare as equal (via {@code Object.equals()}
- * or {@code Comparable.compareTo()}, if supported) have the same
- * encoding.
- * <li>the {@code Coder} always produces a canonical encoding, which is the
- * same for an instance of an object even if produced on different
- * computers at different times.
- * </ul>
- *
- * @throws Coder.NonDeterministicException if this coder is not deterministic.
- */
- public void verifyDeterministic() throws Coder.NonDeterministicException;
-
- /**
- * Returns {@code true} if this {@link Coder} is injective with respect to {@link Objects#equals}.
- *
- * <p>Whenever the encoded bytes of two values are equal, then the original values are equal
- * according to {@code Objects.equals()}. Note that this is well-defined for {@code null}.
- *
- * <p>This condition is most notably false for arrays. More generally, this condition is false
- * whenever {@code equals()} compares object identity, rather than performing a
- * semantic/structural comparison.
- */
- public boolean consistentWithEquals();
-
- /**
- * Returns an object with an {@code Object.equals()} method that represents structural equality
- * on the argument.
- *
- * <p>For any two values {@code x} and {@code y} of type {@code T}, if their encoded bytes are the
- * same, then it must be the case that {@code structuralValue(x).equals(@code structuralValue(y)}.
- *
- * <p>Most notably:
- * <ul>
- * <li>The structural value for an array coder should perform a structural comparison of the
- * contents of the arrays, rather than the default behavior of comparing according to object
- * identity.
- * <li>The structural value for a coder accepting {@code null} should be a proper object with
- * an {@code equals()} method, even if the input value is {@code null}.
- * </ul>
- *
- * <p>See also {@link #consistentWithEquals()}.
- */
- public Object structuralValue(T value) throws Exception;
-
- /**
- * Returns whether {@link #registerByteSizeObserver} cheap enough to
- * call for every element, that is, if this {@code Coder} can
- * calculate the byte size of the element to be coded in roughly
- * constant time (or lazily).
- *
- * <p>Not intended to be called by user code, but instead by
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}
- * implementations.
- */
- public boolean isRegisterByteSizeObserverCheap(T value, Context context);
-
- /**
- * Notifies the {@code ElementByteSizeObserver} about the byte size
- * of the encoded value using this {@code Coder}.
- *
- * <p>Not intended to be called by user code, but instead by
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}
- * implementations.
- */
- public void registerByteSizeObserver(
- T value, ElementByteSizeObserver observer, Context context)
- throws Exception;
-
- /**
- * An identifier for the binary format written by {@link #encode}.
- *
- * <p>This value, along with the fully qualified class name, forms an identifier for the
- * binary format of this coder. Whenever this value changes, the new encoding is considered
- * incompatible with the prior format: It is presumed that the prior version of the coder will
- * be unable to correctly read the new format and the new version of the coder will be unable to
- * correctly read the old format.
- *
- * <p>If the format is changed in a backwards-compatible way (the Coder can still accept data from
- * the prior format), such as by adding optional fields to a Protocol Buffer or Avro definition,
- * and you want Dataflow to understand that the new coder is compatible with the prior coder,
- * this value must remain unchanged. It is then the responsibility of {@link #decode} to correctly
- * read data from the prior format.
- */
- @Experimental(Kind.CODER_ENCODING_ID)
- public String getEncodingId();
-
- /**
- * A collection of encodings supported by {@link #decode} in addition to the encoding
- * from {@link #getEncodingId()} (which is assumed supported).
- *
- * <p><i>This information is not currently used for any purpose</i>. It is descriptive only,
- * and this method is subject to change.
- *
- * @see #getEncodingId()
- */
- @Experimental(Kind.CODER_ENCODING_ID)
- public Collection<String> getAllowedEncodings();
-
- /**
- * Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is
- * not deterministic, including details of why the encoding is not deterministic.
- */
- public static class NonDeterministicException extends Throwable {
- private Coder<?> coder;
- private List<String> reasons;
-
- public NonDeterministicException(
- Coder<?> coder, String reason, @Nullable NonDeterministicException e) {
- this(coder, Arrays.asList(reason), e);
- }
-
- public NonDeterministicException(Coder<?> coder, String reason) {
- this(coder, Arrays.asList(reason), null);
- }
-
- public NonDeterministicException(Coder<?> coder, List<String> reasons) {
- this(coder, reasons, null);
- }
-
- public NonDeterministicException(
- Coder<?> coder,
- List<String> reasons,
- @Nullable NonDeterministicException cause) {
- super(cause);
- Preconditions.checkArgument(reasons.size() > 0,
- "Reasons must not be empty.");
- this.reasons = reasons;
- this.coder = coder;
- }
-
- public Iterable<String> getReasons() {
- return reasons;
- }
-
- @Override
- public String getMessage() {
- return String.format("%s is not deterministic because:\n %s",
- coder, Joiner.on("\n ").join(reasons));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java
deleted file mode 100644
index 8ff8571..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import java.io.IOException;
-
-/**
- * An {@link Exception} thrown if there is a problem encoding or decoding a value.
- */
-public class CoderException extends IOException {
- public CoderException(String message) {
- super(message);
- }
-
- public CoderException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public CoderException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java
deleted file mode 100644
index 82b40a4..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Copyright (C) 2014 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Static utility methods for creating and working with {@link Coder}s.
- */
-public final class CoderFactories {
- private CoderFactories() { } // Static utility class
-
- /**
- * Creates a {@link CoderFactory} built from particular static methods of a class that
- * implements {@link Coder}.
- *
- * <p>The class must have the following static methods:
- *
- * <ul>
- * <li> {@code
- * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...)
- * }
- * <li> {@code
- * public static List<Object> getInstanceComponents(T exampleValue);
- * }
- * </ul>
- *
- * <p>The {@code of(...)} method will be used to construct a
- * {@code Coder<T>} from component {@link Coder}s.
- * It must accept one {@link Coder} argument for each
- * generic type parameter of {@code T}. If {@code T} takes no generic
- * type parameters, then the {@code of()} factory method should take
- * no arguments.
- *
- * <p>The {@code getInstanceComponents} method will be used to
- * decompose a value during the {@link Coder} inference process,
- * to automatically choose coders for the components.
- *
- * <p>Note that the class {@code T} to be coded may be a
- * not-yet-specialized generic class.
- * For a generic class {@code MyClass<X>} and an actual type parameter
- * {@code Foo}, the {@link CoderFactoryFromStaticMethods} will
- * accept any {@code Coder<Foo>} and produce a {@code Coder<MyClass<Foo>>}.
- *
- * <p>For example, the {@link CoderFactory} returned by
- * {@code fromStaticMethods(ListCoder.class)}
- * will produce a {@code Coder<List<X>>} for any {@code Coder Coder<X>}.
- */
- public static <T> CoderFactory fromStaticMethods(Class<T> clazz) {
- return new CoderFactoryFromStaticMethods(clazz);
- }
-
- /**
- * Creates a {@link CoderFactory} that always returns the
- * given coder.
- *
- * <p>The {@code getInstanceComponents} method of this
- * {@link CoderFactory} always returns an empty list.
- */
- public static <T> CoderFactory forCoder(Coder<T> coder) {
- return new CoderFactoryForCoder<>(coder);
- }
-
- /**
- * See {@link #fromStaticMethods} for a detailed description
- * of the characteristics of this {@link CoderFactory}.
- */
- private static class CoderFactoryFromStaticMethods implements CoderFactory {
-
- @Override
- @SuppressWarnings("rawtypes")
- public Coder<?> create(List<? extends Coder<?>> componentCoders) {
- try {
- return (Coder) factoryMethod.invoke(
- null /* static */, componentCoders.toArray());
- } catch (IllegalAccessException |
- IllegalArgumentException |
- InvocationTargetException |
- NullPointerException |
- ExceptionInInitializerError exn) {
- throw new IllegalStateException(
- "error when invoking Coder factory method " + factoryMethod,
- exn);
- }
- }
-
- @Override
- public List<Object> getInstanceComponents(Object value) {
- try {
- @SuppressWarnings("unchecked")
- List<Object> components = (List<Object>) getComponentsMethod.invoke(
- null /* static */, value);
- return components;
- } catch (IllegalAccessException
- | IllegalArgumentException
- | InvocationTargetException
- | NullPointerException
- | ExceptionInInitializerError exn) {
- throw new IllegalStateException(
- "error when invoking Coder getComponents method " + getComponentsMethod,
- exn);
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////
-
- // Method to create a coder given component coders
- // For a Coder class of kind * -> * -> ... n times ... -> *
- // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
- private Method factoryMethod;
-
- // Method to decompose a value of type T into its parts.
- // For a Coder class of kind * -> * -> ... n times ... -> *
- // this has type T -> List<Object>
- // where the list has n elements.
- private Method getComponentsMethod;
-
- /**
- * Returns a CoderFactory that invokes the given static factory method
- * to create the Coder.
- */
- private CoderFactoryFromStaticMethods(Class<?> coderClazz) {
- this.factoryMethod = getFactoryMethod(coderClazz);
- this.getComponentsMethod = getInstanceComponentsMethod(coderClazz);
- }
-
- /**
- * Returns the static {@code of} constructor method on {@code coderClazz}
- * if it exists. It is assumed to have one {@link Coder} parameter for
- * each type parameter of {@code coderClazz}.
- */
- private Method getFactoryMethod(Class<?> coderClazz) {
- Method factoryMethodCandidate;
-
- // Find the static factory method of coderClazz named 'of' with
- // the appropriate number of type parameters.
- int numTypeParameters = coderClazz.getTypeParameters().length;
- Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters];
- Arrays.fill(factoryMethodArgTypes, Coder.class);
- try {
- factoryMethodCandidate =
- coderClazz.getDeclaredMethod("of", factoryMethodArgTypes);
- } catch (NoSuchMethodException | SecurityException exn) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "does not have an accessible method named 'of' with "
- + numTypeParameters + " arguments of Coder type",
- exn);
- }
- if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "method named 'of' with " + numTypeParameters
- + " arguments of Coder type is not static");
- }
- if (!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "method named 'of' with " + numTypeParameters
- + " arguments of Coder type does not return a " + coderClazz);
- }
- try {
- if (!factoryMethodCandidate.isAccessible()) {
- factoryMethodCandidate.setAccessible(true);
- }
- } catch (SecurityException exn) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "method named 'of' with " + numTypeParameters
- + " arguments of Coder type is not accessible",
- exn);
- }
-
- return factoryMethodCandidate;
- }
-
- /**
- * Finds the static method on {@code coderType} to use
- * to decompose a value of type {@code T} into components,
- * each corresponding to an argument of the {@code of}
- * method.
- */
- private <T> Method getInstanceComponentsMethod(Class<?> coderClazz) {
- TypeDescriptor<?> coderType = TypeDescriptor.of(coderClazz);
- TypeDescriptor<T> argumentType = getCodedType(coderType);
-
- // getInstanceComponents may be implemented in a superclass,
- // so we search them all for an applicable method. We do not
- // try to be clever about finding the best overload. It may
- // be in a generic superclass, erased to accept an Object.
- // However, subtypes are listed before supertypes (it is a
- // topological ordering) so probably the best one will be chosen
- // if there are more than one (which should be rare)
- for (TypeDescriptor<?> supertype : coderType.getClasses()) {
- for (Method method : supertype.getRawType().getDeclaredMethods()) {
- if (method.getName().equals("getInstanceComponents")) {
- TypeDescriptor<?> formalArgumentType = supertype.getArgumentTypes(method).get(0);
- if (formalArgumentType.getRawType().isAssignableFrom(argumentType.getRawType())) {
- return method;
- }
- }
- }
- }
-
- throw new IllegalArgumentException(
- "cannot create a CoderFactory from " + coderType + ": "
- + "does not have an accessible method "
- + "'getInstanceComponents'");
- }
-
- /**
- * If {@code coderType} is a subclass of {@link Coder} for a specific
- * type {@code T}, returns {@code T.class}. Otherwise, raises IllegalArgumentException.
- */
- private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<?> coderType) {
- for (TypeDescriptor<?> ifaceType : coderType.getInterfaces()) {
- if (ifaceType.getRawType().equals(Coder.class)) {
- ParameterizedType coderIface = (ParameterizedType) ifaceType.getType();
- @SuppressWarnings("unchecked")
- TypeDescriptor<T> token =
- (TypeDescriptor<T>) TypeDescriptor.of(coderIface.getActualTypeArguments()[0]);
- return token;
- }
- }
- throw new IllegalArgumentException(
- "cannot build CoderFactory from class " + coderType
- + ": does not implement Coder<T> for any T.");
- }
- }
-
- /**
- * See {@link #forCoder} for a detailed description of this
- * {@link CoderFactory}.
- */
- private static class CoderFactoryForCoder<T> implements CoderFactory {
- private Coder<T> coder;
-
- public CoderFactoryForCoder(Coder<T> coder) {
- this.coder = coder;
- }
-
- @Override
- public Coder<?> create(List<? extends Coder<?>> componentCoders) {
- return this.coder;
- }
-
- @Override
- public List<Object> getInstanceComponents(Object value) {
- return Collections.emptyList();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java
deleted file mode 100644
index 541256c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (C) 2014 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import java.util.List;
-
-/**
- * A {@link CoderFactory} creates coders and decomposes values.
- * It may operate on a parameterized type, such as {@link List},
- * in which case the {@link #create} method accepts a list of
- * coders to use for the type parameters.
- */
-public interface CoderFactory {
-
- /**
- * Returns a {@code Coder<?>}, given argument coder to use for
- * values of a particular type, given the Coders for each of
- * the type's generic parameter types.
- */
- public Coder<?> create(List<? extends Coder<?>> componentCoders);
-
- /**
- * Returns a list of objects contained in {@code value}, one per
- * type argument, or {@code null} if none can be determined.
- * The list of returned objects should be the same size as the
- * list of coders required by {@link #create}.
- */
- public List<Object> getInstanceComponents(Object value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java
deleted file mode 100644
index a3e6ec4..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2014 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-/**
- * A {@link CoderProvider} may create a {@link Coder} for
- * any concrete class.
- */
-public interface CoderProvider {
-
- /**
- * Provides a coder for a given class, if possible.
- *
- * @throws CannotProvideCoderException if no coder can be provided
- */
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java
deleted file mode 100644
index 8b0aedd..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Copyright (C) 2014 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.List;
-
-/**
- * Static utility methods for working with {@link CoderProvider CoderProviders}.
- */
-public final class CoderProviders {
-
- // Static utility class
- private CoderProviders() { }
-
- /**
- * Creates a {@link CoderProvider} built from particular static methods of a class that
- * implements {@link Coder}. The requirements for this method are precisely the requirements
- * for a {@link Coder} class to be usable with {@link DefaultCoder} annotations.
- *
- * <p>The class must have the following static method:
- *
- * <pre>{@code
- * public static Coder<T> of(TypeDescriptor<T> type)
- * }
- * </pre>
- */
- public static <T> CoderProvider fromStaticMethods(Class<T> clazz) {
- return new CoderProviderFromStaticMethods(clazz);
- }
-
-
- /**
- * Returns a {@link CoderProvider} that consults each of the provider {@code coderProviders}
- * and returns the first {@link Coder} provided.
- *
- * <p>Note that the order in which the providers are listed matters: While the set of types
- * handled will be the union of those handled by all of the providers in the list, the actual
- * {@link Coder} provided by the first successful provider may differ, and may have inferior
- * properties. For example, not all {@link Coder Coders} are deterministic, handle {@code null}
- * values, or have comparable performance.
- */
- public static CoderProvider firstOf(CoderProvider... coderProviders) {
- return new FirstOf(ImmutableList.copyOf(coderProviders));
- }
-
- ///////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * @see #firstOf
- */
- private static class FirstOf implements CoderProvider {
-
- private Iterable<CoderProvider> providers;
-
- public FirstOf(Iterable<CoderProvider> providers) {
- this.providers = providers;
- }
-
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- List<String> messages = Lists.newArrayList();
- for (CoderProvider provider : providers) {
- try {
- return provider.getCoder(type);
- } catch (CannotProvideCoderException exc) {
- messages.add(String.format("%s could not provide a Coder for type %s: %s",
- provider, type, exc.getMessage()));
- }
- }
- throw new CannotProvideCoderException(
- String.format("Cannot provide coder for type %s: %s.",
- type, Joiner.on("; ").join(messages)));
- }
- }
-
- private static class CoderProviderFromStaticMethods implements CoderProvider {
-
- /** If true, then clazz has {@code of(TypeDescriptor)}. If false, {@code of(Class)}. */
- private final boolean takesTypeDescriptor;
- private final Class<?> clazz;
-
- public CoderProviderFromStaticMethods(Class<?> clazz) {
- // Note that the second condition supports older classes, which only needed to provide
- // of(Class), not of(TypeDescriptor). Our own classes have updated to accept a
- // TypeDescriptor. Hence the error message points only to the current specification,
- // not both acceptable conditions.
- checkArgument(classTakesTypeDescriptor(clazz) || classTakesClass(clazz),
- "Class " + clazz.getCanonicalName()
- + " is missing required static method of(TypeDescriptor).");
-
- this.takesTypeDescriptor = classTakesTypeDescriptor(clazz);
- this.clazz = clazz;
- }
-
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- try {
- if (takesTypeDescriptor) {
- @SuppressWarnings("unchecked")
- Coder<T> result = InstanceBuilder.ofType(Coder.class)
- .fromClass(clazz)
- .fromFactoryMethod("of")
- .withArg(TypeDescriptor.class, type)
- .build();
- return result;
- } else {
- @SuppressWarnings("unchecked")
- Coder<T> result = InstanceBuilder.ofType(Coder.class)
- .fromClass(clazz)
- .fromFactoryMethod("of")
- .withArg(Class.class, type.getRawType())
- .build();
- return result;
- }
- } catch (RuntimeException exc) {
- if (exc.getCause() instanceof InvocationTargetException) {
- throw new CannotProvideCoderException(exc.getCause().getCause());
- }
- throw exc;
- }
- }
-
- private boolean classTakesTypeDescriptor(Class<?> clazz) {
- try {
- clazz.getDeclaredMethod("of", TypeDescriptor.class);
- return true;
- } catch (NoSuchMethodException | SecurityException exc) {
- return false;
- }
- }
-
- private boolean classTakesClass(Class<?> clazz) {
- try {
- clazz.getDeclaredMethod("of", Class.class);
- return true;
- } catch (NoSuchMethodException | SecurityException exc) {
- return false;
- }
- }
- }
-}