You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:08 UTC
[21/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CustomCoder.java
deleted file mode 100644
index a43e95e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CustomCoder.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-import static com.google.cloud.dataflow.sdk.util.Structs.addStringList;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.StringUtils;
-import com.google.common.collect.Lists;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * An abstract base class for writing a {@link Coder} class that encodes itself via Java
- * serialization.
- *
- * <p>To complete an implementation, subclasses must implement {@link Coder#encode}
- * and {@link Coder#decode} methods. Anonymous subclasses must furthermore override
- * {@link #getEncodingId}.
- *
- * <p>Not to be confused with {@link SerializableCoder} that encodes objects that implement the
- * {@link Serializable} interface.
- *
- * @param <T> the type of elements handled by this coder
- */
-public abstract class CustomCoder<T> extends AtomicCoder<T>
- implements Serializable {
- @JsonCreator
- public static CustomCoder<?> of(
- // N.B. typeId is a required parameter here, since a field named "@type"
- // is presented to the deserializer as an input.
- //
- // If this method did not consume the field, Jackson2 would observe an
- // unconsumed field and a returned value of a derived type. So Jackson2
- // would attempt to update the returned value with the unconsumed field
- // data, The standard JsonDeserializer does not implement a mechanism for
- // updating constructed values, so it would throw an exception, causing
- // deserialization to fail.
- @JsonProperty(value = "@type", required = false) String typeId,
- @JsonProperty(value = "encoding_id", required = false) String encodingId,
- @JsonProperty("type") String type,
- @JsonProperty("serialized_coder") String serializedCoder) {
- return (CustomCoder<?>) SerializableUtils.deserializeFromByteArray(
- StringUtils.jsonStringToByteArray(serializedCoder),
- type);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return A thin {@link CloudObject} wrapping of the Java serialization of {@code this}.
- */
- @Override
- public CloudObject asCloudObject() {
- // N.B. We use the CustomCoder class, not the derived class, since during
- // deserialization we will be using the CustomCoder's static factory method
- // to construct an instance of the derived class.
- CloudObject result = CloudObject.forClass(CustomCoder.class);
- addString(result, "type", getClass().getName());
- addString(result, "serialized_coder",
- StringUtils.byteArrayToJsonString(
- SerializableUtils.serializeToByteArray(this)));
-
- String encodingId = getEncodingId();
- checkNotNull(encodingId, "Coder.getEncodingId() must not return null.");
- if (!encodingId.isEmpty()) {
- addString(result, PropertyNames.ENCODING_ID, encodingId);
- }
-
- Collection<String> allowedEncodings = getAllowedEncodings();
- if (!allowedEncodings.isEmpty()) {
- addStringList(result, PropertyNames.ALLOWED_ENCODINGS, Lists.newArrayList(allowedEncodings));
- }
-
- return result;
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException a {@link CustomCoder} is presumed
- * nondeterministic.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "CustomCoder implementations must override verifyDeterministic,"
- + " or they are presumed nondeterministic.");
- }
-
- /**
- * {@inheritDoc}
- *
- * @return The canonical class name for this coder. For stable data formats that are independent
- * of class name, it is recommended to override this method.
- *
- * @throws UnsupportedOperationException when an anonymous class is used, since they do not have
- * a stable canonical class name.
- */
- @Override
- public String getEncodingId() {
- if (getClass().isAnonymousClass()) {
- throw new UnsupportedOperationException(
- String.format("Anonymous CustomCoder subclass %s must override getEncodingId()."
- + " Otherwise, convert to a named class and getEncodingId() will be automatically"
- + " generated from the fully qualified class name.",
- getClass()));
- }
- return getClass().getCanonicalName();
- }
-
- // This coder inherits isRegisterByteSizeObserverCheap,
- // getEncodedElementByteSize and registerByteSizeObserver
- // from StandardCoder. Override if we can do better.
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DefaultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DefaultCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DefaultCoder.java
deleted file mode 100644
index b277166..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DefaultCoder.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * The {@link DefaultCoder} annotation
- * specifies a default {@link Coder} class to handle encoding and decoding
- * instances of the annotated class.
- *
- * <p>The specified {@link Coder} must satisfy the requirements of
- * {@link CoderProviders#fromStaticMethods}. Two classes provided by the SDK that
- * are intended for use with this annotation include {@link SerializableCoder}
- * and {@link AvroCoder}.
- *
- * <p>To configure the use of Java serialization as the default
- * for a class, annotate the class to use
- * {@link SerializableCoder} as follows:
- *
- * <pre><code>{@literal @}DefaultCoder(SerializableCoder.class)
- * public class MyCustomDataType implements Serializable {
- * // ...
- * }</code></pre>
- *
- * <p>Similarly, to configure the use of
- * {@link AvroCoder} as the default:
- * <pre><code>{@literal @}DefaultCoder(AvroCoder.class)
- * public class MyCustomDataType {
- * public MyCustomDataType() {} // Avro requires an empty constructor.
- * // ...
- * }</code></pre>
- *
- * <p>Coders specified explicitly via
- * {@link PCollection#setCoder}
- * take precedence, followed by Coders registered at runtime via
- * {@link CoderRegistry#registerCoder}. See {@link CoderRegistry} for a more detailed discussion
- * of the precedence rules.
- */
-@Documented
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-@SuppressWarnings("rawtypes")
-public @interface DefaultCoder {
- Class<? extends Coder> value();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DelegateCoder.java
deleted file mode 100644
index d1df083..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DelegateCoder.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A {@code DelegateCoder<T, IntermediateT>} wraps a {@link Coder} for {@code IntermediateT} and
- * encodes/decodes values of type {@code T} by converting
- * to/from {@code IntermediateT} and then encoding/decoding using the underlying
- * {@code Coder<IntermediateT>}.
- *
- * <p>The conversions from {@code T} to {@code IntermediateT} and vice versa
- * must be supplied as {@link CodingFunction}, a serializable
- * function that may throw any {@code Exception}. If a thrown
- * exception is an instance of {@link CoderException} or
- * {@link IOException}, it will be re-thrown, otherwise it will be wrapped as
- * a {@link CoderException}.
- *
- * @param <T> The type of objects coded by this Coder.
- * @param <IntermediateT> The type of objects a {@code T} will be converted to for coding.
- */
-public class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
- /**
- * A {@link DelegateCoder.CodingFunction CodingFunction<InputT, OutputT>} is a serializable
- * function from {@code InputT} to {@code OutputT} that may throw any {@link Exception}.
- */
- public static interface CodingFunction<InputT, OutputT> extends Serializable {
- public abstract OutputT apply(InputT input) throws Exception;
- }
-
- public static <T, IntermediateT> DelegateCoder<T, IntermediateT> of(Coder<IntermediateT> coder,
- CodingFunction<T, IntermediateT> toFn,
- CodingFunction<IntermediateT, T> fromFn) {
- return new DelegateCoder<T, IntermediateT>(coder, toFn, fromFn);
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- coder.encode(applyAndWrapExceptions(toFn, value), outStream, context);
- }
-
- @Override
- public T decode(InputStream inStream, Context context) throws CoderException, IOException {
- return applyAndWrapExceptions(fromFn, coder.decode(inStream, context));
- }
-
- /**
- * Returns the coder used to encode/decode the intermediate values produced/consumed by the
- * coding functions of this {@code DelegateCoder}.
- */
- public Coder<IntermediateT> getCoder() {
- return coder;
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException when the underlying coder's {@code verifyDeterministic()}
- * throws a {@link Coder.NonDeterministicException}. For this to be safe, the
- * intermediate {@code CodingFunction<T, IntermediateT>} must also be deterministic.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- coder.verifyDeterministic();
- }
-
- /**
- * {@inheritDoc}
- *
- * @return a structural for a value of type {@code T} obtained by first converting to
- * {@code IntermediateT} and then obtaining a structural value according to the underlying
- * coder.
- */
- @Override
- public Object structuralValue(T value) throws Exception {
- return coder.structuralValue(toFn.apply(value));
- }
-
- @Override
- public String toString() {
- return "DelegateCoder(" + coder + ")";
- }
-
- /**
- * {@inheritDoc}
- *
- * @return a {@link String} composed from the underlying coder class name and its encoding id.
- * Note that this omits any description of the coding functions. These should be modified
- * with care.
- */
- @Override
- public String getEncodingId() {
- return delegateEncodingId(coder.getClass(), coder.getEncodingId());
- }
-
- /**
- * {@inheritDoc}
- *
- * @return allowed encodings which are composed from the underlying coder class and its allowed
- * encoding ids. Note that this omits any description of the coding functions. These
- * should be modified with care.
- */
- @Override
- public Collection<String> getAllowedEncodings() {
- List<String> allowedEncodings = Lists.newArrayList();
- for (String allowedEncoding : coder.getAllowedEncodings()) {
- allowedEncodings.add(delegateEncodingId(coder.getClass(), allowedEncoding));
- }
- return allowedEncodings;
- }
-
- private String delegateEncodingId(Class<?> delegateClass, String encodingId) {
- return String.format("%s:%s", delegateClass.getName(), encodingId);
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private <InputT, OutputT> OutputT applyAndWrapExceptions(
- CodingFunction<InputT, OutputT> fn,
- InputT input) throws CoderException, IOException {
- try {
- return fn.apply(input);
- } catch (IOException exc) {
- throw exc;
- } catch (Exception exc) {
- throw new CoderException(exc);
- }
- }
-
- private final Coder<IntermediateT> coder;
- private final CodingFunction<T, IntermediateT> toFn;
- private final CodingFunction<IntermediateT, T> fromFn;
-
- protected DelegateCoder(Coder<IntermediateT> coder,
- CodingFunction<T, IntermediateT> toFn,
- CodingFunction<IntermediateT, T> fromFn) {
- this.coder = coder;
- this.fromFn = fromFn;
- this.toFn = toFn;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DeterministicStandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DeterministicStandardCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DeterministicStandardCoder.java
deleted file mode 100644
index 6dc2506..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DeterministicStandardCoder.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-/**
- * A {@link DeterministicStandardCoder} is a {@link StandardCoder} that is
- * deterministic, in the sense that for objects considered equal
- * according to {@link Object#equals(Object)}, the encoded bytes are
- * also equal.
- *
- * @param <T> the type of the values being transcoded
- */
-public abstract class DeterministicStandardCoder<T> extends StandardCoder<T> {
- protected DeterministicStandardCoder() {}
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException never, unless overridden. A
- * {@link DeterministicStandardCoder} is presumed deterministic.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException { }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DoubleCoder.java
deleted file mode 100644
index 44cade6..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DoubleCoder.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link DoubleCoder} encodes {@link Double} values in 8 bytes using Java serialization.
- */
-public class DoubleCoder extends AtomicCoder<Double> {
-
- @JsonCreator
- public static DoubleCoder of() {
- return INSTANCE;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final DoubleCoder INSTANCE = new DoubleCoder();
-
- private DoubleCoder() {}
-
- @Override
- public void encode(Double value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null Double");
- }
- new DataOutputStream(outStream).writeDouble(value);
- }
-
- @Override
- public Double decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- try {
- return new DataInputStream(inStream).readDouble();
- } catch (EOFException | UTFDataFormatException exn) {
- // These exceptions correspond to decoding problems, so change
- // what kind of exception they're branded as.
- throw new CoderException(exn);
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException always.
- * Floating-point operations are not guaranteed to be deterministic, even
- * if the storage format might be, so floating point representations are not
- * recommended for use in operations that require deterministic inputs.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "Floating point encodings are not guaranteed to be deterministic.");
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. This coder is injective.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. {@link DoubleCoder#getEncodedElementByteSize} returns a constant.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(Double value, Context context) {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code 8}, the byte size of a {@link Double} encoded using Java serialization.
- */
- @Override
- protected long getEncodedElementByteSize(Double value, Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null Double");
- }
- return 8;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DurationCoder.java
deleted file mode 100644
index 46fbdde..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/DurationCoder.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import org.joda.time.Duration;
-import org.joda.time.ReadableDuration;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} that encodes a joda {@link Duration} as a {@link Long} using the format of
- * {@link VarLongCoder}.
- */
-public class DurationCoder extends AtomicCoder<ReadableDuration> {
-
- @JsonCreator
- public static DurationCoder of() {
- return INSTANCE;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final DurationCoder INSTANCE = new DurationCoder();
-
- private final VarLongCoder longCoder = VarLongCoder.of();
-
- private DurationCoder() {}
-
- private Long toLong(ReadableDuration value) {
- return value.getMillis();
- }
-
- private ReadableDuration fromLong(Long decoded) {
- return Duration.millis(decoded);
- }
-
- @Override
- public void encode(ReadableDuration value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- if (value == null) {
- throw new CoderException("cannot encode a null ReadableDuration");
- }
- longCoder.encode(toLong(value), outStream, context);
- }
-
- @Override
- public ReadableDuration decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- return fromLong(longCoder.decode(inStream, context));
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. This coder is injective.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}, because it is cheap to ascertain the byte size of a long.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(ReadableDuration value, Context context) {
- return longCoder.isRegisterByteSizeObserverCheap(toLong(value), context);
- }
-
- @Override
- public void registerByteSizeObserver(
- ReadableDuration value, ElementByteSizeObserver observer, Context context) throws Exception {
- longCoder.registerByteSizeObserver(toLong(value), observer, context);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/EntityCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/EntityCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/EntityCoder.java
deleted file mode 100644
index ffd516a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/EntityCoder.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.api.services.datastore.DatastoreV1.Entity;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for {@link Entity} objects based on their encoded Protocol Buffer form.
- */
-public class EntityCoder extends AtomicCoder<Entity> {
-
- @JsonCreator
- public static EntityCoder of() {
- return INSTANCE;
- }
-
- /***************************/
-
- private static final EntityCoder INSTANCE = new EntityCoder();
-
- private EntityCoder() {}
-
- @Override
- public void encode(Entity value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null Entity");
- }
-
- // Since Entity implements com.google.protobuf.MessageLite,
- // we could directly use writeTo to write to a OutputStream object
- outStream.write(java.nio.ByteBuffer.allocate(4).putInt(value.getSerializedSize()).array());
- value.writeTo(outStream);
- outStream.flush();
- }
-
- @Override
- public Entity decode(InputStream inStream, Context context)
- throws IOException {
- byte[] entitySize = new byte[4];
- inStream.read(entitySize, 0, 4);
- int size = java.nio.ByteBuffer.wrap(entitySize).getInt();
- byte[] data = new byte[size];
- inStream.read(data, 0, size);
- return Entity.parseFrom(data);
- }
-
- @Override
- protected long getEncodedElementByteSize(Entity value, Context context)
- throws Exception {
- return value.getSerializedSize();
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException always.
- * A datastore kind can hold arbitrary {@link Object} instances, which
- * makes the encoding non-deterministic.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "Datastore encodings can hold arbitrary Object instances");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/InstantCoder.java
deleted file mode 100644
index 65c517f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/InstantCoder.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Converter;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for joda {@link Instant} that encodes it as a big endian {@link Long}
- * shifted such that lexicographic ordering of the bytes corresponds to chronological order.
- */
-public class InstantCoder extends AtomicCoder<Instant> {
-
- @JsonCreator
- public static InstantCoder of() {
- return INSTANCE;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static final InstantCoder INSTANCE = new InstantCoder();
-
- private final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
-
- private InstantCoder() {}
-
- /**
- * Converts {@link Instant} to a {@code Long} representing its millis-since-epoch,
- * but shifted so that the byte representation of negative values are lexicographically
- * ordered before the byte representation of positive values.
- *
- * <p>This deliberately utilizes the well-defined overflow for {@code Long} values.
- * See http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2
- */
- private static final Converter<Instant, Long> ORDER_PRESERVING_CONVERTER =
- new Converter<Instant, Long>() {
-
- @Override
- protected Long doForward(Instant instant) {
- return instant.getMillis() - Long.MIN_VALUE;
- }
-
- @Override
- protected Instant doBackward(Long shiftedMillis) {
- return new Instant(shiftedMillis + Long.MIN_VALUE);
- }
- };
-
- @Override
- public void encode(Instant value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- if (value == null) {
- throw new CoderException("cannot encode a null Instant");
- }
- longCoder.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context);
- }
-
- @Override
- public Instant decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- return ORDER_PRESERVING_CONVERTER.reverse().convert(longCoder.decode(inStream, context));
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. This coder is injective.
- */
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. The byte size for a big endian long is a constant.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(Instant value, Context context) {
- return longCoder.isRegisterByteSizeObserverCheap(
- ORDER_PRESERVING_CONVERTER.convert(value), context);
- }
-
- @Override
- public void registerByteSizeObserver(
- Instant value, ElementByteSizeObserver observer, Context context) throws Exception {
- longCoder.registerByteSizeObserver(
- ORDER_PRESERVING_CONVERTER.convert(value), observer, context);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableCoder.java
deleted file mode 100644
index 55843ee..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableCoder.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * An {@link IterableCoder} encodes any {@link Iterable} in the format
- * of {@link IterableLikeCoder}.
- *
- * @param <T> the type of the elements of the iterables being transcoded
- */
-public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
-
- public static <T> IterableCoder<T> of(Coder<T> elemCoder) {
- return new IterableCoder<>(elemCoder);
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal operations below here.
-
- @Override
- protected final Iterable<T> decodeToIterable(List<T> decodedElements) {
- return decodedElements;
- }
-
- @JsonCreator
- public static IterableCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() == 1,
- "Expecting 1 component, got " + components.size());
- return of(components.get(0));
- }
-
- /**
- * Returns the first element in this iterable if it is non-empty,
- * otherwise returns {@code null}.
- */
- public static <T> List<Object> getInstanceComponents(
- Iterable<T> exampleValue) {
- return getInstanceComponentsHelper(exampleValue);
- }
-
- protected IterableCoder(Coder<T> elemCoder) {
- super(elemCoder, "Iterable");
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
- addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableLikeCoder.java
deleted file mode 100644
index 194e9ef..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/IterableLikeCoder.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.BufferedElementCountingOutputStream;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObservableIterable;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Preconditions;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Observable;
-import java.util.Observer;
-
-/**
- * An abstract base class with functionality for assembling a
- * {@link Coder} for a class that implements {@code Iterable}.
- *
- * <p>To complete a subclass, implement the {@link #decodeToIterable} method. This superclass
- * will decode the elements in the input stream into a {@link List} and then pass them to that
- * method to be converted into the appropriate iterable type. Note that this means the input
- * iterables must fit into memory.
- *
- * <p>The format of this coder is as follows:
- *
- * <ul>
- * <li>If the input {@link Iterable} has a known and finite size, then the size is written to the
- * output stream in big endian format, followed by all of the encoded elements.</li>
- * <li>If the input {@link Iterable} is not known to have a finite size, then each element
- * of the input is preceded by {@code true} encoded as a byte (indicating "more data")
- * followed by the encoded element, and terminated by {@code false} encoded as a byte.</li>
- * </ul>
- *
- * @param <T> the type of the elements of the {@code Iterable}s being transcoded
- * @param <IterableT> the type of the Iterables being transcoded
- */
-public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
- extends StandardCoder<IterableT> {
- public Coder<T> getElemCoder() {
- return elementCoder;
- }
-
- /**
- * Builds an instance of {@code IterableT}, this coder's associated {@link Iterable}-like
- * subtype, from a list of decoded elements.
- */
- protected abstract IterableT decodeToIterable(List<T> decodedElements);
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal operations below here.
-
- private final Coder<T> elementCoder;
- private final String iterableName;
-
- /**
- * Returns the first element in the iterable-like {@code exampleValue} if it is non-empty,
- * otherwise returns {@code null}.
- */
- protected static <T, IterableT extends Iterable<T>>
- List<Object> getInstanceComponentsHelper(IterableT exampleValue) {
- for (T value : exampleValue) {
- return Arrays.<Object>asList(value);
- }
- return null;
- }
-
- protected IterableLikeCoder(Coder<T> elementCoder, String iterableName) {
- Preconditions.checkArgument(elementCoder != null,
- "element Coder for IterableLikeCoder must not be null");
- Preconditions.checkArgument(iterableName != null,
- "iterable name for IterableLikeCoder must not be null");
- this.elementCoder = elementCoder;
- this.iterableName = iterableName;
- }
-
- @Override
- public void encode(
- IterableT iterable, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (iterable == null) {
- throw new CoderException("cannot encode a null " + iterableName);
- }
- Context nestedContext = context.nested();
- DataOutputStream dataOutStream = new DataOutputStream(outStream);
- if (iterable instanceof Collection) {
- // We can know the size of the Iterable. Use an encoding with a
- // leading size field, followed by that many elements.
- Collection<T> collection = (Collection<T>) iterable;
- dataOutStream.writeInt(collection.size());
- for (T elem : collection) {
- elementCoder.encode(elem, dataOutStream, nestedContext);
- }
- } else {
- // We don't know the size without traversing it so use a fixed size buffer
- // and encode as many elements as possible into it before outputting the size followed
- // by the elements.
- dataOutStream.writeInt(-1);
- BufferedElementCountingOutputStream countingOutputStream =
- new BufferedElementCountingOutputStream(dataOutStream);
- for (T elem : iterable) {
- countingOutputStream.markElementStart();
- elementCoder.encode(elem, countingOutputStream, nestedContext);
- }
- countingOutputStream.finish();
- }
- // Make sure all our output gets pushed to the underlying outStream.
- dataOutStream.flush();
- }
-
- @Override
- public IterableT decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- Context nestedContext = context.nested();
- DataInputStream dataInStream = new DataInputStream(inStream);
- int size = dataInStream.readInt();
- if (size >= 0) {
- List<T> elements = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- elements.add(elementCoder.decode(dataInStream, nestedContext));
- }
- return decodeToIterable(elements);
- } else {
- List<T> elements = new ArrayList<>();
- long count;
- // We don't know the size a priori. Check if we're done with
- // each block of elements.
- while ((count = VarInt.decodeLong(dataInStream)) > 0) {
- while (count > 0) {
- elements.add(elementCoder.decode(dataInStream, nestedContext));
- count -= 1;
- }
- }
- return decodeToIterable(elements);
- }
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.asList(elementCoder);
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException always.
- * Encoding is not deterministic for the general {@link Iterable} case, as it depends
- * upon the type of iterable. This may allow two objects to compare as equal
- * while the encoding differs.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "IterableLikeCoder can not guarantee deterministic ordering.");
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} if the iterable is of a known class that supports lazy counting
- * of byte size, since that requires minimal extra computation.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(
- IterableT iterable, Context context) {
- return iterable instanceof ElementByteSizeObservableIterable;
- }
-
- @Override
- public void registerByteSizeObserver(
- IterableT iterable, ElementByteSizeObserver observer, Context context)
- throws Exception {
- if (iterable == null) {
- throw new CoderException("cannot encode a null Iterable");
- }
- Context nestedContext = context.nested();
-
- if (iterable instanceof ElementByteSizeObservableIterable) {
- observer.setLazy();
- ElementByteSizeObservableIterable<?, ?> observableIterable =
- (ElementByteSizeObservableIterable<?, ?>) iterable;
- observableIterable.addObserver(
- new IteratorObserver(observer, iterable instanceof Collection));
- } else {
- if (iterable instanceof Collection) {
- // We can know the size of the Iterable. Use an encoding with a
- // leading size field, followed by that many elements.
- Collection<T> collection = (Collection<T>) iterable;
- observer.update(4L);
- for (T elem : collection) {
- elementCoder.registerByteSizeObserver(elem, observer, nestedContext);
- }
- } else {
- // TODO: Update to use an accurate count depending on size and count, currently we
- // are under estimating the size by up to 10 bytes per block of data since we are
- // not encoding the count prefix which occurs at most once per 64k of data and is upto
- // 10 bytes long. Since we include the total count we can upper bound the underestimate
- // to be 10 / 65536 ~= 0.0153% of the actual size.
- observer.update(4L);
- long count = 0;
- for (T elem : iterable) {
- count += 1;
- elementCoder.registerByteSizeObserver(elem, observer, nestedContext);
- }
- if (count > 0) {
- // Update the length based upon the number of counted elements, this helps
- // eliminate the case where all the elements are encoded in the first block and
- // it is quite short (e.g. Long.MAX_VALUE nulls encoded with VoidCoder).
- observer.update(VarInt.getLength(count));
- }
- // Update with the terminator byte.
- observer.update(1L);
- }
- }
- }
-
- /**
- * An observer that gets notified when an observable iterator
- * returns a new value. This observer just notifies an outerObserver
- * about this event. Additionally, the outerObserver is notified
- * about additional separators that are transparently added by this
- * coder.
- */
- private class IteratorObserver implements Observer {
- private final ElementByteSizeObserver outerObserver;
- private final boolean countable;
-
- public IteratorObserver(ElementByteSizeObserver outerObserver,
- boolean countable) {
- this.outerObserver = outerObserver;
- this.countable = countable;
-
- if (countable) {
- // Additional 4 bytes are due to size.
- outerObserver.update(4L);
- } else {
- // Additional 5 bytes are due to size = -1 (4 bytes) and
- // hasNext = false (1 byte).
- outerObserver.update(5L);
- }
- }
-
- @Override
- public void update(Observable obs, Object obj) {
- if (!(obj instanceof Long)) {
- throw new AssertionError("unexpected parameter object");
- }
-
- if (countable) {
- outerObserver.update(obs, obj);
- } else {
- // Additional 1 byte is due to hasNext = true flag.
- outerObserver.update(obs, 1 + (long) obj);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
deleted file mode 100644
index 6e2833e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.Structs;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.common.io.ByteStreams;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.FilterInputStream;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-
-/**
- * A coder for JAXB annotated objects. This coder uses JAXB marshalling/unmarshalling mechanisms
- * to encode/decode the objects. Users must provide the {@code Class} of the JAXB annotated object.
- *
- * @param <T> type of JAXB annotated objects that will be serialized.
- */
-public class JAXBCoder<T> extends AtomicCoder<T> {
-
- private final Class<T> jaxbClass;
- private transient Marshaller jaxbMarshaller = null;
- private transient Unmarshaller jaxbUnmarshaller = null;
-
- public Class<T> getJAXBClass() {
- return jaxbClass;
- }
-
- private JAXBCoder(Class<T> jaxbClass) {
- this.jaxbClass = jaxbClass;
- }
-
- /**
- * Create a coder for a given type of JAXB annotated objects.
- *
- * @param jaxbClass the {@code Class} of the JAXB annotated objects.
- */
- public static <T> JAXBCoder<T> of(Class<T> jaxbClass) {
- return new JAXBCoder<>(jaxbClass);
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- try {
- if (jaxbMarshaller == null) {
- JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass);
- jaxbMarshaller = jaxbContext.createMarshaller();
- }
- if (!context.isWholeStream) {
- try {
- long size = getEncodedElementByteSize(value, Context.OUTER);
- // record the number of bytes the XML consists of so when reading we only read the encoded
- // value
- VarInt.encode(size, outStream);
- } catch (Exception e) {
- throw new CoderException(
- "An Exception occured while trying to get the size of an encoded representation", e);
- }
- }
-
- jaxbMarshaller.marshal(value, new CloseIgnoringOutputStream(outStream));
- } catch (JAXBException e) {
- throw new CoderException(e);
- }
- }
-
- @Override
- public T decode(InputStream inStream, Context context) throws CoderException, IOException {
- try {
- if (jaxbUnmarshaller == null) {
- JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass);
- jaxbUnmarshaller = jaxbContext.createUnmarshaller();
- }
-
- InputStream stream = inStream;
- if (!context.isWholeStream) {
- long limit = VarInt.decodeLong(inStream);
- stream = ByteStreams.limit(inStream, limit);
- }
- @SuppressWarnings("unchecked")
- T obj = (T) jaxbUnmarshaller.unmarshal(new CloseIgnoringInputStream(stream));
- return obj;
- } catch (JAXBException e) {
- throw new CoderException(e);
- }
- }
-
- @Override
- public String getEncodingId() {
- return getJAXBClass().getName();
- }
-
- private static class CloseIgnoringInputStream extends FilterInputStream {
-
- protected CloseIgnoringInputStream(InputStream in) {
- super(in);
- }
-
- @Override
- public void close() {
- // Do nothing. JAXB closes the underlying stream so we must filter out those calls.
- }
- }
-
- private static class CloseIgnoringOutputStream extends FilterOutputStream {
-
- protected CloseIgnoringOutputStream(OutputStream out) {
- super(out);
- }
-
- @Override
- public void close() throws IOException {
- // JAXB closes the underlying stream so we must filter out those calls.
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////////
- // JSON Serialization details below
-
- private static final String JAXB_CLASS = "jaxb_class";
-
- /**
- * Constructor for JSON deserialization only.
- */
- @JsonCreator
- public static <T> JAXBCoder<T> of(
- @JsonProperty(JAXB_CLASS) String jaxbClassName) {
- try {
- @SuppressWarnings("unchecked")
- Class<T> jaxbClass = (Class<T>) Class.forName(jaxbClassName);
- return of(jaxbClass);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
- Structs.addString(result, JAXB_CLASS, jaxbClass.getName());
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java
deleted file mode 100644
index 227c546..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * A {@code KvCoder} encodes {@link KV}s.
- *
- * @param <K> the type of the keys of the KVs being transcoded
- * @param <V> the type of the values of the KVs being transcoded
- */
-public class KvCoder<K, V> extends KvCoderBase<KV<K, V>> {
- public static <K, V> KvCoder<K, V> of(Coder<K> keyCoder,
- Coder<V> valueCoder) {
- return new KvCoder<>(keyCoder, valueCoder);
- }
-
- @JsonCreator
- public static KvCoder<?, ?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() == 2,
- "Expecting 2 components, got " + components.size());
- return of(components.get(0), components.get(1));
- }
-
- public static <K, V> List<Object> getInstanceComponents(
- KV<K, V> exampleValue) {
- return Arrays.asList(
- exampleValue.getKey(),
- exampleValue.getValue());
- }
-
- public Coder<K> getKeyCoder() {
- return keyCoder;
- }
-
- public Coder<V> getValueCoder() {
- return valueCoder;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private final Coder<K> keyCoder;
- private final Coder<V> valueCoder;
-
- private KvCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
- this.keyCoder = keyCoder;
- this.valueCoder = valueCoder;
- }
-
- @Override
- public void encode(KV<K, V> kv, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (kv == null) {
- throw new CoderException("cannot encode a null KV");
- }
- Context nestedContext = context.nested();
- keyCoder.encode(kv.getKey(), outStream, nestedContext);
- valueCoder.encode(kv.getValue(), outStream, nestedContext);
- }
-
- @Override
- public KV<K, V> decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- Context nestedContext = context.nested();
- K key = keyCoder.decode(inStream, nestedContext);
- V value = valueCoder.decode(inStream, nestedContext);
- return KV.of(key, value);
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.asList(keyCoder, valueCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic("Key coder must be deterministic", getKeyCoder());
- verifyDeterministic("Value coder must be deterministic", getValueCoder());
- }
-
- @Override
- public boolean consistentWithEquals() {
- return keyCoder.consistentWithEquals() && valueCoder.consistentWithEquals();
- }
-
- @Override
- public Object structuralValue(KV<K, V> kv) throws Exception {
- if (consistentWithEquals()) {
- return kv;
- } else {
- return KV.of(getKeyCoder().structuralValue(kv.getKey()),
- getValueCoder().structuralValue(kv.getValue()));
- }
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
- addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
- return result;
- }
-
- /**
- * Returns whether both keyCoder and valueCoder are considered not expensive.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv, Context context) {
- return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(),
- context.nested())
- && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(),
- context.nested());
- }
-
- /**
- * Notifies ElementByteSizeObserver about the byte size of the
- * encoded value using this coder.
- */
- @Override
- public void registerByteSizeObserver(
- KV<K, V> kv, ElementByteSizeObserver observer, Context context)
- throws Exception {
- if (kv == null) {
- throw new CoderException("cannot encode a null KV");
- }
- keyCoder.registerByteSizeObserver(
- kv.getKey(), observer, context.nested());
- valueCoder.registerByteSizeObserver(
- kv.getValue(), observer, context.nested());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java
deleted file mode 100644
index 361ffb9..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * A abstract base class for KvCoder. Works around a Jackson2 bug tickled when building
- * {@link KvCoder} directly (as of this writing, Jackson2 walks off the end of
- * an array when it tries to deserialize a class with multiple generic type
- * parameters). This class should be removed when possible.
- *
- * @param <T> the type of values being transcoded
- */
-@Deprecated
-public abstract class KvCoderBase<T> extends StandardCoder<T> {
- /**
- * A constructor used only for decoding from JSON.
- *
- * @param typeId present in the JSON encoding, but unused
- * @param isPairLike present in the JSON encoding, but unused
- */
- @Deprecated
- @JsonCreator
- public static KvCoderBase<?> of(
- // N.B. typeId is a required parameter here, since a field named "@type"
- // is presented to the deserializer as an input.
- //
- // If this method did not consume the field, Jackson2 would observe an
- // unconsumed field and a returned value of a derived type. So Jackson2
- // would attempt to update the returned value with the unconsumed field
- // data. The standard JsonDeserializer does not implement a mechanism for
- // updating constructed values, so it would throw an exception, causing
- // deserialization to fail.
- @JsonProperty(value = "@type", required = false) String typeId,
- @JsonProperty(value = PropertyNames.IS_PAIR_LIKE, required = false) boolean isPairLike,
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- return KvCoder.of(components);
- }
-
- protected KvCoderBase() {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java
deleted file mode 100644
index cfb3ab7..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * A {@link Coder} for {@link List}, using the format of {@link IterableLikeCoder}.
- *
- * @param <T> the type of the elements of the Lists being transcoded
- */
-public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
-
- public static <T> ListCoder<T> of(Coder<T> elemCoder) {
- return new ListCoder<>(elemCoder);
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal operations below here.
-
- @Override
- protected final List<T> decodeToIterable(List<T> decodedElements) {
- return decodedElements;
- }
-
- @JsonCreator
- public static ListCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() == 1,
- "Expecting 1 component, got " + components.size());
- return of((Coder<?>) components.get(0));
- }
-
- /**
- * Returns the first element in this list if it is non-empty,
- * otherwise returns {@code null}.
- */
- public static <T> List<Object> getInstanceComponents(List<T> exampleValue) {
- return getInstanceComponentsHelper(exampleValue);
- }
-
- protected ListCoder(Coder<T> elemCoder) {
- super(elemCoder, "List");
- }
-
- /**
- * List sizes are always known, so ListIterable may be deterministic while
- * the general IterableLikeCoder is not.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic(
- "ListCoder.elemCoder must be deterministic", getElemCoder());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java
deleted file mode 100644
index 5a60ab5..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * A {@link Coder} for {@link Map Maps} that encodes them according to provided
- * coders for keys and values.
- *
- * @param <K> the type of the keys of the KVs being transcoded
- * @param <V> the type of the values of the KVs being transcoded
- */
-public class MapCoder<K, V> extends MapCoderBase<Map<K, V>> {
- /**
- * Produces a MapCoder with the given keyCoder and valueCoder.
- */
- public static <K, V> MapCoder<K, V> of(
- Coder<K> keyCoder,
- Coder<V> valueCoder) {
- return new MapCoder<>(keyCoder, valueCoder);
- }
-
- @JsonCreator
- public static MapCoder<?, ?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() == 2,
- "Expecting 2 components, got " + components.size());
- return of((Coder<?>) components.get(0), (Coder<?>) components.get(1));
- }
-
- /**
- * Returns the key and value for an arbitrary element of this map,
- * if it is non-empty, otherwise returns {@code null}.
- */
- public static <K, V> List<Object> getInstanceComponents(
- Map<K, V> exampleValue) {
- for (Map.Entry<K, V> entry : exampleValue.entrySet()) {
- return Arrays.asList(entry.getKey(), entry.getValue());
- }
- return null;
- }
-
- public Coder<K> getKeyCoder() {
- return keyCoder;
- }
-
- public Coder<V> getValueCoder() {
- return valueCoder;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- Coder<K> keyCoder;
- Coder<V> valueCoder;
-
- MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
- this.keyCoder = keyCoder;
- this.valueCoder = valueCoder;
- }
-
- @Override
- public void encode(
- Map<K, V> map,
- OutputStream outStream,
- Context context)
- throws IOException, CoderException {
- if (map == null) {
- throw new CoderException("cannot encode a null Map");
- }
- DataOutputStream dataOutStream = new DataOutputStream(outStream);
- dataOutStream.writeInt(map.size());
- for (Entry<K, V> entry : map.entrySet()) {
- keyCoder.encode(entry.getKey(), outStream, context.nested());
- valueCoder.encode(entry.getValue(), outStream, context.nested());
- }
- dataOutStream.flush();
- }
-
- @Override
- public Map<K, V> decode(InputStream inStream, Context context)
- throws IOException, CoderException {
- DataInputStream dataInStream = new DataInputStream(inStream);
- int size = dataInStream.readInt();
- Map<K, V> retval = Maps.newHashMapWithExpectedSize(size);
- for (int i = 0; i < size; ++i) {
- K key = keyCoder.decode(inStream, context.nested());
- V value = valueCoder.decode(inStream, context.nested());
- retval.put(key, value);
- }
- return retval;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return a {@link List} containing the key coder at index 0 at the and value coder at index 1.
- */
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.asList(keyCoder, valueCoder);
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException always. Not all maps have a deterministic encoding.
- * For example, {@code HashMap} comparison does not depend on element order, so
- * two {@code HashMap} instances may be equal but produce different encodings.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "Ordering of entries in a Map may be non-deterministic.");
- }
-
- @Override
- public void registerByteSizeObserver(
- Map<K, V> map, ElementByteSizeObserver observer, Context context)
- throws Exception {
- observer.update(4L);
- for (Entry<K, V> entry : map.entrySet()) {
- keyCoder.registerByteSizeObserver(
- entry.getKey(), observer, context.nested());
- valueCoder.registerByteSizeObserver(
- entry.getValue(), observer, context.nested());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java
deleted file mode 100644
index 1fc3f57..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * A abstract base class for MapCoder. Works around a Jackson2 bug tickled when building
- * {@link MapCoder} directly (as of this writing, Jackson2 walks off the end of
- * an array when it tries to deserialize a class with multiple generic type
- * parameters). This should be removed in favor of a better workaround.
- * @param <T> the type of values being transcoded
- */
-@Deprecated
-public abstract class MapCoderBase<T> extends StandardCoder<T> {
- @Deprecated
- @JsonCreator
- public static MapCoderBase<?> of(
- // N.B. typeId is a required parameter here, since a field named "@type"
- // is presented to the deserializer as an input.
- //
- // If this method did not consume the field, Jackson2 would observe an
- // unconsumed field and a returned value of a derived type. So Jackson2
- // would attempt to update the returned value with the unconsumed field
- // data, The standard JsonDeserializer does not implement a mechanism for
- // updating constructed values, so it would throw an exception, causing
- // deserialization to fail.
- @JsonProperty(value = "@type", required = false) String typeId,
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- return MapCoder.of(components);
- }
-
- protected MapCoderBase() {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java
deleted file mode 100644
index fc12a03..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link NullableCoder} encodes nullable values of type {@code T} using a nested
- * {@code Coder<T>} that does not tolerate {@code null} values. {@link NullableCoder} uses
- * exactly 1 byte per entry to indicate whether the value is {@code null}, then adds the encoding
- * of the inner coder for non-null values.
- *
- * @param <T> the type of the values being transcoded
- */
-public class NullableCoder<T> extends StandardCoder<T> {
- public static <T> NullableCoder<T> of(Coder<T> valueCoder) {
- if (valueCoder instanceof NullableCoder) {
- return (NullableCoder<T>) valueCoder;
- }
- return new NullableCoder<>(valueCoder);
- }
-
- @JsonCreator
- public static NullableCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() == 1,
- "Expecting 1 components, got " + components.size());
- return of(components.get(0));
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private final Coder<T> valueCoder;
- private static final int ENCODE_NULL = 0;
- private static final int ENCODE_PRESENT = 1;
-
- private NullableCoder(Coder<T> valueCoder) {
- this.valueCoder = valueCoder;
- }
-
- @Override
- public void encode(@Nullable T value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- outStream.write(ENCODE_NULL);
- } else {
- outStream.write(ENCODE_PRESENT);
- valueCoder.encode(value, outStream, context.nested());
- }
- }
-
- @Override
- @Nullable
- public T decode(InputStream inStream, Context context) throws IOException, CoderException {
- int b = inStream.read();
- if (b == ENCODE_NULL) {
- return null;
- } else if (b != ENCODE_PRESENT) {
- throw new CoderException(String.format(
- "NullableCoder expects either a byte valued %s (null) or %s (present), got %s",
- ENCODE_NULL, ENCODE_PRESENT, b));
- }
- return valueCoder.decode(inStream, context.nested());
- }
-
- @Override
- public List<Coder<T>> getCoderArguments() {
- return ImmutableList.of(valueCoder);
- }
-
- /**
- * {@code NullableCoder} is deterministic if the nested {@code Coder} is.
- *
- * {@inheritDoc}
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic("Value coder must be deterministic", valueCoder);
- }
-
- /**
- * {@code NullableCoder} is consistent with equals if the nested {@code Coder} is.
- *
- * {@inheritDoc}
- */
- @Override
- public boolean consistentWithEquals() {
- return valueCoder.consistentWithEquals();
- }
-
- @Override
- public Object structuralValue(@Nullable T value) throws Exception {
- if (value == null) {
- return Optional.absent();
- }
- return Optional.of(valueCoder.structuralValue(value));
- }
-
- /**
- * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
- * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise
- * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}.
- *
- * {@inheritDoc}
- */
- @Override
- public void registerByteSizeObserver(
- @Nullable T value, ElementByteSizeObserver observer, Context context) throws Exception {
- observer.update(1);
- if (value != null) {
- valueCoder.registerByteSizeObserver(value, observer, context.nested());
- }
- }
-
- /**
- * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
- * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise
- * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}.
- *
- * {@inheritDoc}
- */
- @Override
- protected long getEncodedElementByteSize(@Nullable T value, Context context) throws Exception {
- if (value == null) {
- return 1;
- }
-
- if (valueCoder instanceof StandardCoder) {
- // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of
- // the value, adding 1 byte to count the null indicator.
- return 1 + ((StandardCoder<T>) valueCoder)
- .getEncodedElementByteSize(value, context.nested());
- }
-
- // If value is not a StandardCoder then fall back to the default StandardCoder behavior
- // of encoding and counting the bytes. The encoding will include the null indicator byte.
- return super.getEncodedElementByteSize(value, context);
- }
-
- /**
- * {@code NullableCoder} is cheap if {@code valueCoder} is cheap.
- *
- * {@inheritDoc}
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) {
- return valueCoder.isRegisterByteSizeObserverCheap(value, context.nested());
- }
-}