You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:08 UTC

[21/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CustomCoder.java
deleted file mode 100644
index a43e95e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CustomCoder.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-import static com.google.cloud.dataflow.sdk.util.Structs.addStringList;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.StringUtils;
-import com.google.common.collect.Lists;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * An abstract base class for writing a {@link Coder} class that encodes itself via Java
- * serialization.
- *
- * <p>To complete an implementation, subclasses must implement {@link Coder#encode}
- * and {@link Coder#decode} methods. Anonymous subclasses must furthermore override
- * {@link #getEncodingId}.
- *
- * <p>Not to be confused with {@link SerializableCoder} that encodes objects that implement the
- * {@link Serializable} interface.
- *
- * @param <T> the type of elements handled by this coder
- */
-public abstract class CustomCoder<T> extends AtomicCoder<T>
-    implements Serializable {
-  @JsonCreator
-  public static CustomCoder<?> of(
-      // N.B. typeId is a required parameter here, since a field named "@type"
-      // is presented to the deserializer as an input.
-      //
-      // If this method did not consume the field, Jackson2 would observe an
-      // unconsumed field and a returned value of a derived type.  So Jackson2
-      // would attempt to update the returned value with the unconsumed field
-      // data, The standard JsonDeserializer does not implement a mechanism for
-      // updating constructed values, so it would throw an exception, causing
-      // deserialization to fail.
-      @JsonProperty(value = "@type", required = false) String typeId,
-      @JsonProperty(value = "encoding_id", required = false) String encodingId,
-      @JsonProperty("type") String type,
-      @JsonProperty("serialized_coder") String serializedCoder) {
-    return (CustomCoder<?>) SerializableUtils.deserializeFromByteArray(
-        StringUtils.jsonStringToByteArray(serializedCoder),
-        type);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return A thin {@link CloudObject} wrapping of the Java serialization of {@code this}.
-   */
-  @Override
-  public CloudObject asCloudObject() {
-    // N.B. We use the CustomCoder class, not the derived class, since during
-    // deserialization we will be using the CustomCoder's static factory method
-    // to construct an instance of the derived class.
-    CloudObject result = CloudObject.forClass(CustomCoder.class);
-    addString(result, "type", getClass().getName());
-    addString(result, "serialized_coder",
-        StringUtils.byteArrayToJsonString(
-            SerializableUtils.serializeToByteArray(this)));
-
-    String encodingId = getEncodingId();
-    checkNotNull(encodingId, "Coder.getEncodingId() must not return null.");
-    if (!encodingId.isEmpty()) {
-      addString(result, PropertyNames.ENCODING_ID, encodingId);
-    }
-
-    Collection<String> allowedEncodings = getAllowedEncodings();
-    if (!allowedEncodings.isEmpty()) {
-      addStringList(result, PropertyNames.ALLOWED_ENCODINGS, Lists.newArrayList(allowedEncodings));
-    }
-
-    return result;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException a {@link CustomCoder} is presumed
-   * nondeterministic.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "CustomCoder implementations must override verifyDeterministic,"
-        + " or they are presumed nondeterministic.");
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return The canonical class name for this coder. For stable data formats that are independent
-   *         of class name, it is recommended to override this method.
-   *
-   * @throws UnsupportedOperationException when an anonymous class is used, since they do not have
-   *         a stable canonical class name.
-   */
-  @Override
-  public String getEncodingId() {
-    if (getClass().isAnonymousClass()) {
-      throw new UnsupportedOperationException(
-          String.format("Anonymous CustomCoder subclass %s must override getEncodingId()."
-              + " Otherwise, convert to a named class and getEncodingId() will be automatically"
-              + " generated from the fully qualified class name.",
-              getClass()));
-    }
-    return getClass().getCanonicalName();
-  }
-
-  // This coder inherits isRegisterByteSizeObserverCheap,
-  // getEncodedElementByteSize and registerByteSizeObserver
-  // from StandardCoder. Override if we can do better.
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DefaultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DefaultCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DefaultCoder.java
deleted file mode 100644
index b277166..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DefaultCoder.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * The {@link DefaultCoder} annotation
- * specifies a default {@link Coder} class to handle encoding and decoding
- * instances of the annotated class.
- *
- * <p>The specified {@link Coder} must satisfy the requirements of
- * {@link CoderProviders#fromStaticMethods}. Two classes provided by the SDK that
- * are intended for use with this annotation include {@link SerializableCoder}
- * and {@link AvroCoder}.
- *
- * <p>To configure the use of Java serialization as the default
- * for a class, annotate the class to use
- * {@link SerializableCoder} as follows:
- *
- * <pre><code>{@literal @}DefaultCoder(SerializableCoder.class)
- * public class MyCustomDataType implements Serializable {
- *   // ...
- * }</code></pre>
- *
- * <p>Similarly, to configure the use of
- * {@link AvroCoder} as the default:
- * <pre><code>{@literal @}DefaultCoder(AvroCoder.class)
- * public class MyCustomDataType {
- *   public MyCustomDataType() {}  // Avro requires an empty constructor.
- *   // ...
- * }</code></pre>
- *
- * <p>Coders specified explicitly via
- * {@link PCollection#setCoder}
- * take precedence, followed by Coders registered at runtime via
- * {@link CoderRegistry#registerCoder}. See {@link CoderRegistry} for a more detailed discussion
- * of the precedence rules.
- */
-@Documented
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-@SuppressWarnings("rawtypes")
-public @interface DefaultCoder {
-  Class<? extends Coder> value();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DelegateCoder.java
deleted file mode 100644
index d1df083..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DelegateCoder.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A {@code DelegateCoder<T, IntermediateT>} wraps a {@link Coder} for {@code IntermediateT} and
- * encodes/decodes values of type {@code T} by converting
- * to/from {@code IntermediateT} and then encoding/decoding using the underlying
- * {@code Coder<IntermediateT>}.
- *
- * <p>The conversions from {@code T} to {@code IntermediateT} and vice versa
- * must be supplied as {@link CodingFunction}, a serializable
- * function that may throw any {@code Exception}. If a thrown
- * exception is an instance of {@link CoderException} or
- * {@link IOException}, it will be re-thrown, otherwise it will be wrapped as
- * a {@link CoderException}.
- *
- * @param <T> The type of objects coded by this Coder.
- * @param <IntermediateT> The type of objects a {@code T} will be converted to for coding.
- */
-public class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
-  /**
-   * A {@link DelegateCoder.CodingFunction CodingFunction&lt;InputT, OutputT&gt;} is a serializable
-   * function from {@code InputT} to {@code OutputT} that may throw any {@link Exception}.
-   */
-  public static interface CodingFunction<InputT, OutputT> extends Serializable {
-     public abstract OutputT apply(InputT input) throws Exception;
-  }
-
-  public static <T, IntermediateT> DelegateCoder<T, IntermediateT> of(Coder<IntermediateT> coder,
-      CodingFunction<T, IntermediateT> toFn,
-      CodingFunction<IntermediateT, T> fromFn) {
-    return new DelegateCoder<T, IntermediateT>(coder, toFn, fromFn);
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context)
-      throws CoderException, IOException {
-    coder.encode(applyAndWrapExceptions(toFn, value), outStream, context);
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws CoderException, IOException {
-    return applyAndWrapExceptions(fromFn, coder.decode(inStream, context));
-  }
-
-  /**
-   * Returns the coder used to encode/decode the intermediate values produced/consumed by the
-   * coding functions of this {@code DelegateCoder}.
-   */
-  public Coder<IntermediateT> getCoder() {
-    return coder;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException when the underlying coder's {@code verifyDeterministic()}
-   *         throws a {@link Coder.NonDeterministicException}. For this to be safe, the
-   *         intermediate {@code CodingFunction<T, IntermediateT>} must also be deterministic.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    coder.verifyDeterministic();
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return a structural for a value of type {@code T} obtained by first converting to
-   *         {@code IntermediateT} and then obtaining a structural value according to the underlying
-   *         coder.
-   */
-  @Override
-  public Object structuralValue(T value) throws Exception {
-    return coder.structuralValue(toFn.apply(value));
-  }
-
-  @Override
-  public String toString() {
-    return "DelegateCoder(" + coder + ")";
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return a {@link String} composed from the underlying coder class name and its encoding id.
-   *         Note that this omits any description of the coding functions. These should be modified
-   *         with care.
-   */
-  @Override
-  public String getEncodingId() {
-    return delegateEncodingId(coder.getClass(), coder.getEncodingId());
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return allowed encodings which are composed from the underlying coder class and its allowed
-   *         encoding ids. Note that this omits any description of the coding functions. These
-   *         should be modified with care.
-   */
-  @Override
-  public Collection<String> getAllowedEncodings() {
-    List<String> allowedEncodings = Lists.newArrayList();
-    for (String allowedEncoding : coder.getAllowedEncodings()) {
-      allowedEncodings.add(delegateEncodingId(coder.getClass(), allowedEncoding));
-    }
-    return allowedEncodings;
-  }
-
-  private String delegateEncodingId(Class<?> delegateClass, String encodingId) {
-    return String.format("%s:%s", delegateClass.getName(), encodingId);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private <InputT, OutputT> OutputT applyAndWrapExceptions(
-      CodingFunction<InputT, OutputT> fn,
-      InputT input) throws CoderException, IOException {
-    try {
-      return fn.apply(input);
-    } catch (IOException exc) {
-      throw exc;
-    } catch (Exception exc) {
-      throw new CoderException(exc);
-    }
-  }
-
-  private final Coder<IntermediateT> coder;
-  private final CodingFunction<T, IntermediateT> toFn;
-  private final CodingFunction<IntermediateT, T> fromFn;
-
-  protected DelegateCoder(Coder<IntermediateT> coder,
-      CodingFunction<T, IntermediateT> toFn,
-      CodingFunction<IntermediateT, T> fromFn) {
-    this.coder = coder;
-    this.fromFn = fromFn;
-    this.toFn = toFn;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DeterministicStandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DeterministicStandardCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DeterministicStandardCoder.java
deleted file mode 100644
index 6dc2506..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DeterministicStandardCoder.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-/**
- * A {@link DeterministicStandardCoder} is a {@link StandardCoder} that is
- * deterministic, in the sense that for objects considered equal
- * according to {@link Object#equals(Object)}, the encoded bytes are
- * also equal.
- *
- * @param <T> the type of the values being transcoded
- */
-public abstract class DeterministicStandardCoder<T> extends StandardCoder<T> {
-  protected DeterministicStandardCoder() {}
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException never, unless overridden. A
-   * {@link DeterministicStandardCoder} is presumed deterministic.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException { }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DoubleCoder.java
deleted file mode 100644
index 44cade6..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DoubleCoder.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link DoubleCoder} encodes {@link Double} values in 8 bytes using Java serialization.
- */
-public class DoubleCoder extends AtomicCoder<Double> {
-
-  @JsonCreator
-  public static DoubleCoder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final DoubleCoder INSTANCE = new DoubleCoder();
-
-  private DoubleCoder() {}
-
-  @Override
-  public void encode(Double value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Double");
-    }
-    new DataOutputStream(outStream).writeDouble(value);
-  }
-
-  @Override
-  public Double decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    try {
-      return new DataInputStream(inStream).readDouble();
-    } catch (EOFException | UTFDataFormatException exn) {
-      // These exceptions correspond to decoding problems, so change
-      // what kind of exception they're branded as.
-      throw new CoderException(exn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always.
-   *         Floating-point operations are not guaranteed to be deterministic, even
-   *         if the storage format might be, so floating point representations are not
-   *         recommended for use in operations that require deterministic inputs.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Floating point encodings are not guaranteed to be deterministic.");
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. This coder is injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link DoubleCoder#getEncodedElementByteSize} returns a constant.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(Double value, Context context) {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code 8}, the byte size of a {@link Double} encoded using Java serialization.
-   */
-  @Override
-  protected long getEncodedElementByteSize(Double value, Context context)
-      throws Exception {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Double");
-    }
-    return 8;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DurationCoder.java
deleted file mode 100644
index 46fbdde..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DurationCoder.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import org.joda.time.Duration;
-import org.joda.time.ReadableDuration;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} that encodes a joda {@link Duration} as a {@link Long} using the format of
- * {@link VarLongCoder}.
- */
-public class DurationCoder extends AtomicCoder<ReadableDuration> {
-
-  @JsonCreator
-  public static DurationCoder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final DurationCoder INSTANCE = new DurationCoder();
-
-  private final VarLongCoder longCoder = VarLongCoder.of();
-
-  private DurationCoder() {}
-
-  private Long toLong(ReadableDuration value) {
-    return value.getMillis();
-  }
-
-  private ReadableDuration fromLong(Long decoded) {
-    return Duration.millis(decoded);
-  }
-
-  @Override
-  public void encode(ReadableDuration value, OutputStream outStream, Context context)
-      throws CoderException, IOException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null ReadableDuration");
-    }
-    longCoder.encode(toLong(value), outStream, context);
-  }
-
-  @Override
-  public ReadableDuration decode(InputStream inStream, Context context)
-      throws CoderException, IOException {
-      return fromLong(longCoder.decode(inStream, context));
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. This coder is injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}, because it is cheap to ascertain the byte size of a long.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(ReadableDuration value, Context context) {
-    return longCoder.isRegisterByteSizeObserverCheap(toLong(value), context);
-  }
-
-  @Override
-  public void registerByteSizeObserver(
-      ReadableDuration value, ElementByteSizeObserver observer, Context context) throws Exception {
-    longCoder.registerByteSizeObserver(toLong(value), observer, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/EntityCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/EntityCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/EntityCoder.java
deleted file mode 100644
index ffd516a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/EntityCoder.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.api.services.datastore.DatastoreV1.Entity;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for {@link Entity} objects based on their encoded Protocol Buffer form.
- */
-public class EntityCoder extends AtomicCoder<Entity> {
-
-  @JsonCreator
-  public static EntityCoder of() {
-    return INSTANCE;
-  }
-
-  /***************************/
-
-  private static final EntityCoder INSTANCE = new EntityCoder();
-
-  private EntityCoder() {}
-
-  @Override
-  public void encode(Entity value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Entity");
-    }
-
-    // Since Entity implements com.google.protobuf.MessageLite,
-    // we could directly use writeTo to write to a OutputStream object
-    outStream.write(java.nio.ByteBuffer.allocate(4).putInt(value.getSerializedSize()).array());
-    value.writeTo(outStream);
-    outStream.flush();
-  }
-
-  @Override
-  public Entity decode(InputStream inStream, Context context)
-      throws IOException {
-    byte[] entitySize = new byte[4];
-    inStream.read(entitySize, 0, 4);
-    int size = java.nio.ByteBuffer.wrap(entitySize).getInt();
-    byte[] data = new byte[size];
-    inStream.read(data, 0, size);
-    return Entity.parseFrom(data);
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(Entity value, Context context)
-      throws Exception {
-    return value.getSerializedSize();
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always.
-   *         A datastore kind can hold arbitrary {@link Object} instances, which
-   *         makes the encoding non-deterministic.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Datastore encodings can hold arbitrary Object instances");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/InstantCoder.java
deleted file mode 100644
index 65c517f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/InstantCoder.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Converter;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for joda {@link Instant} that encodes it as a big endian {@link Long}
- * shifted such that lexicographic ordering of the bytes corresponds to chronological order.
- */
-public class InstantCoder extends AtomicCoder<Instant> {
-
-  @JsonCreator
-  public static InstantCoder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final InstantCoder INSTANCE = new InstantCoder();
-
-  private final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
-
-  private InstantCoder() {}
-
-  /**
-   * Converts {@link Instant} to a {@code Long} representing its millis-since-epoch,
-   * but shifted so that the byte representation of negative values are lexicographically
-   * ordered before the byte representation of positive values.
-   *
-   * <p>This deliberately utilizes the well-defined overflow for {@code Long} values.
-   * See http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2
-   */
-  private static final Converter<Instant, Long> ORDER_PRESERVING_CONVERTER =
-      new Converter<Instant, Long>() {
-
-        @Override
-        protected Long doForward(Instant instant) {
-          return instant.getMillis() - Long.MIN_VALUE;
-        }
-
-        @Override
-        protected Instant doBackward(Long shiftedMillis) {
-          return new Instant(shiftedMillis + Long.MIN_VALUE);
-        }
-  };
-
-  @Override
-  public void encode(Instant value, OutputStream outStream, Context context)
-      throws CoderException, IOException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Instant");
-    }
-    longCoder.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context);
-  }
-
-  @Override
-  public Instant decode(InputStream inStream, Context context)
-      throws CoderException, IOException {
-    return ORDER_PRESERVING_CONVERTER.reverse().convert(longCoder.decode(inStream, context));
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. This coder is injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. The byte size for a big endian long is a constant.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(Instant value, Context context) {
-    return longCoder.isRegisterByteSizeObserverCheap(
-        ORDER_PRESERVING_CONVERTER.convert(value), context);
-  }
-
-  @Override
-  public void registerByteSizeObserver(
-      Instant value, ElementByteSizeObserver observer, Context context) throws Exception {
-    longCoder.registerByteSizeObserver(
-        ORDER_PRESERVING_CONVERTER.convert(value), observer, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableCoder.java
deleted file mode 100644
index 55843ee..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableCoder.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * An {@link IterableCoder} encodes any {@link Iterable} in the format
- * of {@link IterableLikeCoder}.
- *
- * @param <T> the type of the elements of the iterables being transcoded
- */
-public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
-
-  public static <T> IterableCoder<T> of(Coder<T> elemCoder) {
-    return new IterableCoder<>(elemCoder);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal operations below here.
-
-  @Override
-  protected final Iterable<T> decodeToIterable(List<T> decodedElements) {
-    return decodedElements;
-  }
-
-  @JsonCreator
-  public static IterableCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    Preconditions.checkArgument(components.size() == 1,
-        "Expecting 1 component, got " + components.size());
-    return of(components.get(0));
-  }
-
-  /**
-   * Returns the first element in this iterable if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(
-      Iterable<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
-  protected IterableCoder(Coder<T> elemCoder) {
-    super(elemCoder, "Iterable");
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableLikeCoder.java
deleted file mode 100644
index 194e9ef..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableLikeCoder.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.BufferedElementCountingOutputStream;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObservableIterable;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Preconditions;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Observable;
-import java.util.Observer;
-
-/**
- * An abstract base class with functionality for assembling a
- * {@link Coder} for a class that implements {@code Iterable}.
- *
- * <p>To complete a subclass, implement the {@link #decodeToIterable} method. This superclass
- * will decode the elements in the input stream into a {@link List} and then pass them to that
- * method to be converted into the appropriate iterable type. Note that this means the input
- * iterables must fit into memory.
- *
- * <p>The format of this coder is as follows:
- *
- * <ul>
- *   <li>If the input {@link Iterable} has a known and finite size, then the size is written to the
- *       output stream in big endian format, followed by all of the encoded elements.</li>
- *   <li>If the input {@link Iterable} is not known to have a finite size, then each element
- *       of the input is preceded by {@code true} encoded as a byte (indicating "more data")
- *       followed by the encoded element, and terminated by {@code false} encoded as a byte.</li>
- * </ul>
- *
- * @param <T> the type of the elements of the {@code Iterable}s being transcoded
- * @param <IterableT> the type of the Iterables being transcoded
- */
-public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
-    extends StandardCoder<IterableT> {
-  public Coder<T> getElemCoder() {
-    return elementCoder;
-  }
-
-  /**
-   * Builds an instance of {@code IterableT}, this coder's associated {@link Iterable}-like
-   * subtype, from a list of decoded elements.
-   */
-  protected abstract IterableT decodeToIterable(List<T> decodedElements);
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal operations below here.
-
-  private final Coder<T> elementCoder;
-  private final String iterableName;
-
-  /**
-   * Returns the first element in the iterable-like {@code exampleValue} if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  protected static <T, IterableT extends Iterable<T>>
-      List<Object> getInstanceComponentsHelper(IterableT exampleValue) {
-    for (T value : exampleValue) {
-      return Arrays.<Object>asList(value);
-    }
-    return null;
-  }
-
-  protected IterableLikeCoder(Coder<T> elementCoder, String  iterableName) {
-    Preconditions.checkArgument(elementCoder != null,
-        "element Coder for IterableLikeCoder must not be null");
-    Preconditions.checkArgument(iterableName != null,
-        "iterable name for IterableLikeCoder must not be null");
-    this.elementCoder = elementCoder;
-    this.iterableName = iterableName;
-  }
-
-  @Override
-  public void encode(
-      IterableT iterable, OutputStream outStream, Context context)
-      throws IOException, CoderException  {
-    if (iterable == null) {
-      throw new CoderException("cannot encode a null " + iterableName);
-    }
-    Context nestedContext = context.nested();
-    DataOutputStream dataOutStream = new DataOutputStream(outStream);
-    if (iterable instanceof Collection) {
-      // We can know the size of the Iterable.  Use an encoding with a
-      // leading size field, followed by that many elements.
-      Collection<T> collection = (Collection<T>) iterable;
-      dataOutStream.writeInt(collection.size());
-      for (T elem : collection) {
-        elementCoder.encode(elem, dataOutStream, nestedContext);
-      }
-    } else {
-      // We don't know the size without traversing it so use a fixed size buffer
-      // and encode as many elements as possible into it before outputting the size followed
-      // by the elements.
-      dataOutStream.writeInt(-1);
-      BufferedElementCountingOutputStream countingOutputStream =
-          new BufferedElementCountingOutputStream(dataOutStream);
-      for (T elem : iterable) {
-        countingOutputStream.markElementStart();
-        elementCoder.encode(elem, countingOutputStream, nestedContext);
-      }
-      countingOutputStream.finish();
-    }
-    // Make sure all our output gets pushed to the underlying outStream.
-    dataOutStream.flush();
-  }
-
-  @Override
-  public IterableT decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    Context nestedContext = context.nested();
-    DataInputStream dataInStream = new DataInputStream(inStream);
-    int size = dataInStream.readInt();
-    if (size >= 0) {
-      List<T> elements = new ArrayList<>(size);
-      for (int i = 0; i < size; i++) {
-        elements.add(elementCoder.decode(dataInStream, nestedContext));
-      }
-      return decodeToIterable(elements);
-    } else {
-      List<T> elements = new ArrayList<>();
-      long count;
-      // We don't know the size a priori.  Check if we're done with
-      // each block of elements.
-      while ((count = VarInt.decodeLong(dataInStream)) > 0) {
-        while (count > 0) {
-          elements.add(elementCoder.decode(dataInStream, nestedContext));
-          count -= 1;
-        }
-      }
-      return decodeToIterable(elements);
-    }
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return Arrays.asList(elementCoder);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always.
-   * Encoding is not deterministic for the general {@link Iterable} case, as it depends
-   * upon the type of iterable. This may allow two objects to compare as equal
-   * while the encoding differs.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "IterableLikeCoder can not guarantee deterministic ordering.");
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true} if the iterable is of a known class that supports lazy counting
-   * of byte size, since that requires minimal extra computation.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(
-      IterableT iterable, Context context) {
-    return iterable instanceof ElementByteSizeObservableIterable;
-  }
-
-  @Override
-  public void registerByteSizeObserver(
-      IterableT iterable, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    if (iterable == null) {
-      throw new CoderException("cannot encode a null Iterable");
-    }
-    Context nestedContext = context.nested();
-
-    if (iterable instanceof ElementByteSizeObservableIterable) {
-      observer.setLazy();
-      ElementByteSizeObservableIterable<?, ?> observableIterable =
-          (ElementByteSizeObservableIterable<?, ?>) iterable;
-      observableIterable.addObserver(
-          new IteratorObserver(observer, iterable instanceof Collection));
-    } else {
-      if (iterable instanceof Collection) {
-        // We can know the size of the Iterable.  Use an encoding with a
-        // leading size field, followed by that many elements.
-        Collection<T> collection = (Collection<T>) iterable;
-        observer.update(4L);
-        for (T elem : collection) {
-          elementCoder.registerByteSizeObserver(elem, observer, nestedContext);
-        }
-      } else {
-        // TODO: Update to use an accurate count depending on size and count, currently we
-        // are under estimating the size by up to 10 bytes per block of data since we are
-        // not encoding the count prefix which occurs at most once per 64k of data and is upto
-        // 10 bytes long. Since we include the total count we can upper bound the underestimate
-        // to be 10 / 65536 ~= 0.0153% of the actual size.
-        observer.update(4L);
-        long count = 0;
-        for (T elem : iterable) {
-          count += 1;
-          elementCoder.registerByteSizeObserver(elem, observer, nestedContext);
-        }
-        if (count > 0) {
-          // Update the length based upon the number of counted elements, this helps
-          // eliminate the case where all the elements are encoded in the first block and
-          // it is quite short (e.g. Long.MAX_VALUE nulls encoded with VoidCoder).
-          observer.update(VarInt.getLength(count));
-        }
-        // Update with the terminator byte.
-        observer.update(1L);
-      }
-    }
-  }
-
-  /**
-   * An observer that gets notified when an observable iterator
-   * returns a new value. This observer just notifies an outerObserver
-   * about this event. Additionally, the outerObserver is notified
-   * about additional separators that are transparently added by this
-   * coder.
-   */
-  private class IteratorObserver implements Observer {
-    private final ElementByteSizeObserver outerObserver;
-    private final boolean countable;
-
-    public IteratorObserver(ElementByteSizeObserver outerObserver,
-                            boolean countable) {
-      this.outerObserver = outerObserver;
-      this.countable = countable;
-
-      if (countable) {
-        // Additional 4 bytes are due to size.
-        outerObserver.update(4L);
-      } else {
-        // Additional 5 bytes are due to size = -1 (4 bytes) and
-        // hasNext = false (1 byte).
-        outerObserver.update(5L);
-      }
-    }
-
-    @Override
-    public void update(Observable obs, Object obj) {
-      if (!(obj instanceof Long)) {
-        throw new AssertionError("unexpected parameter object");
-      }
-
-      if (countable) {
-        outerObserver.update(obs, obj);
-      } else {
-        // Additional 1 byte is due to hasNext = true flag.
-        outerObserver.update(obs, 1 + (long) obj);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
deleted file mode 100644
index 6e2833e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.Structs;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.common.io.ByteStreams;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.FilterInputStream;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-
-/**
- * A coder for JAXB annotated objects. This coder uses JAXB marshalling/unmarshalling mechanisms
- * to encode/decode the objects. Users must provide the {@code Class} of the JAXB annotated object.
- *
- * @param <T> type of JAXB annotated objects that will be serialized.
- */
-public class JAXBCoder<T> extends AtomicCoder<T> {
-
-  private final Class<T> jaxbClass;
-  private transient Marshaller jaxbMarshaller = null;
-  private transient Unmarshaller jaxbUnmarshaller = null;
-
-  public Class<T> getJAXBClass() {
-    return jaxbClass;
-  }
-
-  private JAXBCoder(Class<T> jaxbClass) {
-    this.jaxbClass = jaxbClass;
-  }
-
-  /**
-   * Create a coder for a given type of JAXB annotated objects.
-   *
-   * @param jaxbClass the {@code Class} of the JAXB annotated objects.
-   */
-  public static <T> JAXBCoder<T> of(Class<T> jaxbClass) {
-    return new JAXBCoder<>(jaxbClass);
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context)
-      throws CoderException, IOException {
-    try {
-      if (jaxbMarshaller == null) {
-        JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass);
-        jaxbMarshaller = jaxbContext.createMarshaller();
-      }
-      if (!context.isWholeStream) {
-        try {
-          long size = getEncodedElementByteSize(value, Context.OUTER);
-          // record the number of bytes the XML consists of so when reading we only read the encoded
-          // value
-          VarInt.encode(size, outStream);
-        } catch (Exception e) {
-          throw new CoderException(
-              "An Exception occured while trying to get the size of an encoded representation", e);
-        }
-      }
-
-      jaxbMarshaller.marshal(value, new CloseIgnoringOutputStream(outStream));
-    } catch (JAXBException e) {
-      throw new CoderException(e);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws CoderException, IOException {
-    try {
-      if (jaxbUnmarshaller == null) {
-        JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass);
-        jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-      }
-
-      InputStream stream = inStream;
-      if (!context.isWholeStream) {
-        long limit = VarInt.decodeLong(inStream);
-        stream = ByteStreams.limit(inStream, limit);
-      }
-      @SuppressWarnings("unchecked")
-      T obj = (T) jaxbUnmarshaller.unmarshal(new CloseIgnoringInputStream(stream));
-      return obj;
-    } catch (JAXBException e) {
-      throw new CoderException(e);
-    }
-  }
-
-  @Override
-  public String getEncodingId() {
-    return getJAXBClass().getName();
-  }
-
-  private static class CloseIgnoringInputStream extends FilterInputStream {
-
-    protected CloseIgnoringInputStream(InputStream in) {
-      super(in);
-    }
-
-    @Override
-    public void close() {
-      // Do nothing. JAXB closes the underlying stream so we must filter out those calls.
-    }
-  }
-
-  private static class CloseIgnoringOutputStream extends FilterOutputStream {
-
-    protected CloseIgnoringOutputStream(OutputStream out) {
-      super(out);
-    }
-
-    @Override
-    public void close() throws IOException {
-      // JAXB closes the underlying stream so we must filter out those calls.
-    }
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  // JSON Serialization details below
-
-  private static final String JAXB_CLASS = "jaxb_class";
-
-  /**
-   * Constructor for JSON deserialization only.
-   */
-  @JsonCreator
-  public static <T> JAXBCoder<T> of(
-      @JsonProperty(JAXB_CLASS) String jaxbClassName) {
-    try {
-      @SuppressWarnings("unchecked")
-      Class<T> jaxbClass = (Class<T>) Class.forName(jaxbClassName);
-      return of(jaxbClass);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    Structs.addString(result, JAXB_CLASS, jaxbClass.getName());
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java
deleted file mode 100644
index 227c546..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * A {@code KvCoder} encodes {@link KV}s.
- *
- * @param <K> the type of the keys of the KVs being transcoded
- * @param <V> the type of the values of the KVs being transcoded
- */
-public class KvCoder<K, V> extends KvCoderBase<KV<K, V>> {
-  public static <K, V> KvCoder<K, V> of(Coder<K> keyCoder,
-                                        Coder<V> valueCoder) {
-    return new KvCoder<>(keyCoder, valueCoder);
-  }
-
-  @JsonCreator
-  public static KvCoder<?, ?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    Preconditions.checkArgument(components.size() == 2,
-        "Expecting 2 components, got " + components.size());
-    return of(components.get(0), components.get(1));
-  }
-
-  public static <K, V> List<Object> getInstanceComponents(
-      KV<K, V> exampleValue) {
-    return Arrays.asList(
-        exampleValue.getKey(),
-        exampleValue.getValue());
-  }
-
-  public Coder<K> getKeyCoder() {
-    return keyCoder;
-  }
-
-  public Coder<V> getValueCoder() {
-    return valueCoder;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final Coder<K> keyCoder;
-  private final Coder<V> valueCoder;
-
-  private KvCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
-    this.keyCoder = keyCoder;
-    this.valueCoder = valueCoder;
-  }
-
-  @Override
-  public void encode(KV<K, V> kv, OutputStream outStream, Context context)
-      throws IOException, CoderException  {
-    if (kv == null) {
-      throw new CoderException("cannot encode a null KV");
-    }
-    Context nestedContext = context.nested();
-    keyCoder.encode(kv.getKey(), outStream, nestedContext);
-    valueCoder.encode(kv.getValue(), outStream, nestedContext);
-  }
-
-  @Override
-  public KV<K, V> decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    Context nestedContext = context.nested();
-    K key = keyCoder.decode(inStream, nestedContext);
-    V value = valueCoder.decode(inStream, nestedContext);
-    return KV.of(key, value);
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return Arrays.asList(keyCoder, valueCoder);
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic("Key coder must be deterministic", getKeyCoder());
-    verifyDeterministic("Value coder must be deterministic", getValueCoder());
-  }
-
-  @Override
-  public boolean consistentWithEquals() {
-    return keyCoder.consistentWithEquals() && valueCoder.consistentWithEquals();
-  }
-
-  @Override
-  public Object structuralValue(KV<K, V> kv) throws Exception {
-    if (consistentWithEquals()) {
-      return kv;
-    } else {
-      return KV.of(getKeyCoder().structuralValue(kv.getKey()),
-                   getValueCoder().structuralValue(kv.getValue()));
-    }
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
-    return result;
-  }
-
-  /**
-   * Returns whether both keyCoder and valueCoder are considered not expensive.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv, Context context) {
-    return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(),
-                                                    context.nested())
-        && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(),
-                                                      context.nested());
-  }
-
-  /**
-   * Notifies ElementByteSizeObserver about the byte size of the
-   * encoded value using this coder.
-   */
-  @Override
-  public void registerByteSizeObserver(
-      KV<K, V> kv, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    if (kv == null) {
-      throw new CoderException("cannot encode a null KV");
-    }
-    keyCoder.registerByteSizeObserver(
-        kv.getKey(), observer, context.nested());
-    valueCoder.registerByteSizeObserver(
-        kv.getValue(), observer, context.nested());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java
deleted file mode 100644
index 361ffb9..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * A abstract base class for KvCoder. Works around a Jackson2 bug tickled when building
- * {@link KvCoder} directly (as of this writing, Jackson2 walks off the end of
- * an array when it tries to deserialize a class with multiple generic type
- * parameters).  This class should be removed when possible.
- *
- * @param <T> the type of values being transcoded
- */
-@Deprecated
-public abstract class KvCoderBase<T> extends StandardCoder<T> {
-  /**
-   * A constructor used only for decoding from JSON.
-   *
-   * @param typeId present in the JSON encoding, but unused
-   * @param isPairLike present in the JSON encoding, but unused
-   */
-  @Deprecated
-  @JsonCreator
-  public static KvCoderBase<?> of(
-      // N.B. typeId is a required parameter here, since a field named "@type"
-      // is presented to the deserializer as an input.
-      //
-      // If this method did not consume the field, Jackson2 would observe an
-      // unconsumed field and a returned value of a derived type.  So Jackson2
-      // would attempt to update the returned value with the unconsumed field
-      // data.  The standard JsonDeserializer does not implement a mechanism for
-      // updating constructed values, so it would throw an exception, causing
-      // deserialization to fail.
-      @JsonProperty(value = "@type", required = false) String typeId,
-      @JsonProperty(value = PropertyNames.IS_PAIR_LIKE, required = false) boolean isPairLike,
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
-    return KvCoder.of(components);
-  }
-
-  protected KvCoderBase() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java
deleted file mode 100644
index cfb3ab7..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * A {@link Coder} for {@link List}, using the format of {@link IterableLikeCoder}.
- *
- * @param <T> the type of the elements of the Lists being transcoded
- */
-public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
-
-  public static <T> ListCoder<T> of(Coder<T> elemCoder) {
-    return new ListCoder<>(elemCoder);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal operations below here.
-
-  @Override
-  protected final List<T> decodeToIterable(List<T> decodedElements) {
-    return decodedElements;
-  }
-
-  @JsonCreator
-  public static ListCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    Preconditions.checkArgument(components.size() == 1,
-        "Expecting 1 component, got " + components.size());
-    return of((Coder<?>) components.get(0));
-  }
-
-  /**
-   * Returns the first element in this list if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(List<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
-  protected ListCoder(Coder<T> elemCoder) {
-    super(elemCoder, "List");
-  }
-
-  /**
-   * List sizes are always known, so ListIterable may be deterministic while
-   * the general IterableLikeCoder is not.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "ListCoder.elemCoder must be deterministic", getElemCoder());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java
deleted file mode 100644
index 5a60ab5..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * A {@link Coder} for {@link Map Maps} that encodes them according to provided
- * coders for keys and values.
- *
- * @param <K> the type of the keys of the KVs being transcoded
- * @param <V> the type of the values of the KVs being transcoded
- */
-public class MapCoder<K, V> extends MapCoderBase<Map<K, V>> {
-  /**
-   * Produces a MapCoder with the given keyCoder and valueCoder.
-   */
-  public static <K, V> MapCoder<K, V> of(
-      Coder<K> keyCoder,
-      Coder<V> valueCoder) {
-    return new MapCoder<>(keyCoder, valueCoder);
-  }
-
-  @JsonCreator
-  public static MapCoder<?, ?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    Preconditions.checkArgument(components.size() == 2,
-        "Expecting 2 components, got " + components.size());
-    return of((Coder<?>) components.get(0), (Coder<?>) components.get(1));
-  }
-
-  /**
-   * Returns the key and value for an arbitrary element of this map,
-   * if it is non-empty, otherwise returns {@code null}.
-   */
-   public static <K, V> List<Object> getInstanceComponents(
-       Map<K, V> exampleValue) {
-     for (Map.Entry<K, V> entry : exampleValue.entrySet()) {
-       return Arrays.asList(entry.getKey(), entry.getValue());
-     }
-     return null;
-   }
-
-  public Coder<K> getKeyCoder() {
-    return keyCoder;
-  }
-
-  public Coder<V> getValueCoder() {
-    return valueCoder;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  Coder<K> keyCoder;
-  Coder<V> valueCoder;
-
-  MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
-    this.keyCoder = keyCoder;
-    this.valueCoder = valueCoder;
-  }
-
-  @Override
-  public void encode(
-      Map<K, V> map,
-      OutputStream outStream,
-      Context context)
-      throws IOException, CoderException  {
-    if (map == null) {
-      throw new CoderException("cannot encode a null Map");
-    }
-    DataOutputStream dataOutStream = new DataOutputStream(outStream);
-    dataOutStream.writeInt(map.size());
-    for (Entry<K, V> entry : map.entrySet()) {
-      keyCoder.encode(entry.getKey(), outStream, context.nested());
-      valueCoder.encode(entry.getValue(), outStream, context.nested());
-    }
-    dataOutStream.flush();
-  }
-
-  @Override
-  public Map<K, V> decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    DataInputStream dataInStream = new DataInputStream(inStream);
-    int size = dataInStream.readInt();
-    Map<K, V> retval = Maps.newHashMapWithExpectedSize(size);
-    for (int i = 0; i < size; ++i) {
-      K key = keyCoder.decode(inStream, context.nested());
-      V value = valueCoder.decode(inStream, context.nested());
-      retval.put(key, value);
-    }
-    return retval;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return a {@link List} containing the key coder at index 0 at the and value coder at index 1.
-   */
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return Arrays.asList(keyCoder, valueCoder);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always. Not all maps have a deterministic encoding.
-   * For example, {@code HashMap} comparison does not depend on element order, so
-   * two {@code HashMap} instances may be equal but produce different encodings.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Ordering of entries in a Map may be non-deterministic.");
-  }
-
-  @Override
-  public void registerByteSizeObserver(
-      Map<K, V> map, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    observer.update(4L);
-    for (Entry<K, V> entry : map.entrySet()) {
-      keyCoder.registerByteSizeObserver(
-          entry.getKey(), observer, context.nested());
-      valueCoder.registerByteSizeObserver(
-          entry.getValue(), observer, context.nested());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java
deleted file mode 100644
index 1fc3f57..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * A abstract base class for MapCoder. Works around a Jackson2 bug tickled when building
- * {@link MapCoder} directly (as of this writing, Jackson2 walks off the end of
- * an array when it tries to deserialize a class with multiple generic type
- * parameters).  This should be removed in favor of a better workaround.
- * @param <T> the type of values being transcoded
- */
-@Deprecated
-public abstract class MapCoderBase<T> extends StandardCoder<T> {
-  @Deprecated
-  @JsonCreator
-  public static MapCoderBase<?> of(
-      // N.B. typeId is a required parameter here, since a field named "@type"
-      // is presented to the deserializer as an input.
-      //
-      // If this method did not consume the field, Jackson2 would observe an
-      // unconsumed field and a returned value of a derived type.  So Jackson2
-      // would attempt to update the returned value with the unconsumed field
-      // data, The standard JsonDeserializer does not implement a mechanism for
-      // updating constructed values, so it would throw an exception, causing
-      // deserialization to fail.
-      @JsonProperty(value = "@type", required = false) String typeId,
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    return MapCoder.of(components);
-  }
-
-  protected MapCoderBase() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java
deleted file mode 100644
index fc12a03..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link NullableCoder} encodes nullable values of type {@code T} using a nested
- * {@code Coder<T>} that does not tolerate {@code null} values. {@link NullableCoder} uses
- * exactly 1 byte per entry to indicate whether the value is {@code null}, then adds the encoding
- * of the inner coder for non-null values.
- *
- * @param <T> the type of the values being transcoded
- */
-public class NullableCoder<T> extends StandardCoder<T> {
-  public static <T> NullableCoder<T> of(Coder<T> valueCoder) {
-    if (valueCoder instanceof NullableCoder) {
-      return (NullableCoder<T>) valueCoder;
-    }
-    return new NullableCoder<>(valueCoder);
-  }
-
-  @JsonCreator
-  public static NullableCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    Preconditions.checkArgument(components.size() == 1,
-        "Expecting 1 components, got " + components.size());
-    return of(components.get(0));
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final Coder<T> valueCoder;
-  private static final int ENCODE_NULL = 0;
-  private static final int ENCODE_PRESENT = 1;
-
-  private NullableCoder(Coder<T> valueCoder) {
-    this.valueCoder = valueCoder;
-  }
-
-  @Override
-  public void encode(@Nullable T value, OutputStream outStream, Context context)
-      throws IOException, CoderException  {
-    if (value == null) {
-      outStream.write(ENCODE_NULL);
-    } else {
-      outStream.write(ENCODE_PRESENT);
-      valueCoder.encode(value, outStream, context.nested());
-    }
-  }
-
-  @Override
-  @Nullable
-  public T decode(InputStream inStream, Context context) throws IOException, CoderException {
-    int b = inStream.read();
-    if (b == ENCODE_NULL) {
-      return null;
-    } else if (b != ENCODE_PRESENT) {
-        throw new CoderException(String.format(
-            "NullableCoder expects either a byte valued %s (null) or %s (present), got %s",
-            ENCODE_NULL, ENCODE_PRESENT, b));
-    }
-    return valueCoder.decode(inStream, context.nested());
-  }
-
-  @Override
-  public List<Coder<T>> getCoderArguments() {
-    return ImmutableList.of(valueCoder);
-  }
-
-  /**
-   * {@code NullableCoder} is deterministic if the nested {@code Coder} is.
-   *
-   * {@inheritDoc}
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic("Value coder must be deterministic", valueCoder);
-  }
-
-  /**
-   * {@code NullableCoder} is consistent with equals if the nested {@code Coder} is.
-   *
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return valueCoder.consistentWithEquals();
-  }
-
-  @Override
-  public Object structuralValue(@Nullable T value) throws Exception {
-    if (value == null) {
-      return Optional.absent();
-    }
-    return Optional.of(valueCoder.structuralValue(value));
-  }
-
-  /**
-   * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
-   * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise
-   * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}.
-   *
-   * {@inheritDoc}
-   */
-  @Override
-  public void registerByteSizeObserver(
-      @Nullable T value, ElementByteSizeObserver observer, Context context) throws Exception {
-    observer.update(1);
-    if (value != null) {
-      valueCoder.registerByteSizeObserver(value, observer, context.nested());
-    }
-  }
-
-  /**
-   * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
-   * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise
-   * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}.
-   *
-   * {@inheritDoc}
-   */
-  @Override
-  protected long getEncodedElementByteSize(@Nullable T value, Context context) throws Exception {
-    if (value == null) {
-      return 1;
-    }
-
-    if (valueCoder instanceof StandardCoder) {
-      // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of
-      // the value, adding 1 byte to count the null indicator.
-      return 1  + ((StandardCoder<T>) valueCoder)
-          .getEncodedElementByteSize(value, context.nested());
-    }
-
-    // If value is not a StandardCoder then fall back to the default StandardCoder behavior
-    // of encoding and counting the bytes. The encoding will include the null indicator byte.
-    return super.getEncodedElementByteSize(value, context);
-  }
-
-  /**
-   * {@code NullableCoder} is cheap if {@code valueCoder} is cheap.
-   *
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) {
-    return valueCoder.isRegisterByteSizeObserverCheap(value, context.nested());
-  }
-}