You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/25 01:03:25 UTC

[2/3] beam git commit: Stop Extending AtomicCoder

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
index 94f324a..0004d03 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
@@ -21,9 +21,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -32,7 +31,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
  * A {@link Coder} that serializes and deserializes the {@link Result} objects using {@link
  * ProtobufUtil}.
  */
-class HBaseResultCoder extends AtomicCoder<Result> implements Serializable {
+class HBaseResultCoder extends CustomCoder<Result> implements Serializable {
   private static final HBaseResultCoder INSTANCE = new HBaseResultCoder();
 
   private HBaseResultCoder() {}

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 785699c..2ae940d 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -83,11 +83,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index fbd96eb..68efb9a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -53,11 +53,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.Read.Unbounded;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -1321,7 +1321,7 @@ public class KafkaIO {
     }
   }
 
-  private static class NullOnlyCoder<T> extends AtomicCoder<T> {
+  private static class NullOnlyCoder<T> extends CustomCoder<T> {
     @Override
     public void encode(T value, OutputStream outStream, Context context) {
       checkArgument(value == null, "Can only encode nulls");

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 25ef7df..160e8ce 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -17,28 +17,23 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
-
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.KV;
 
 /**
  * {@link Coder} for {@link KafkaRecord}.
  */
-public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
+public class KafkaRecordCoder<K, V> extends CustomCoder<KafkaRecord<K, V>> {
 
   private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
   private static final VarLongCoder longCoder = VarLongCoder.of();
@@ -46,13 +41,6 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
 
   private final KvCoder<K, V> kvCoder;
 
-  @JsonCreator
-  public static KafkaRecordCoder<?, ?> of(@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-                                          List<Coder<?>> components) {
-    KvCoder<?, ?> kvCoder = KvCoder.of(components);
-    return of(kvCoder.getKeyCoder(), kvCoder.getValueCoder());
-  }
-
   public static <K, V> KafkaRecordCoder<K, V> of(Coder<K> keyCoder, Coder<V> valueCoder) {
     return new KafkaRecordCoder<K, V>(keyCoder, valueCoder);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index 77fe127..4da2b05 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
@@ -32,7 +32,7 @@ import org.joda.time.Instant;
 /**
  * A {@link Coder} for {@link KinesisRecord}.
  */
-class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
+class KinesisRecordCoder extends CustomCoder<KinesisRecord> {
     private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
     private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
     private static final InstantCoder INSTANT_CODER = InstantCoder.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/xml/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/pom.xml b/sdks/java/io/xml/pom.xml
index 49ce239..51f1c6c 100644
--- a/sdks/java/io/xml/pom.xml
+++ b/sdks/java/io/xml/pom.xml
@@ -57,11 +57,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
index b8b1b79..1e2e07c 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.io.xml;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.io.ByteStreams;
 import java.io.FilterInputStream;
 import java.io.FilterOutputStream;
@@ -29,11 +27,9 @@ import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
-import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -43,10 +39,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * @param <T> type of JAXB annotated objects that will be serialized.
  */
-public class JAXBCoder<T> extends AtomicCoder<T> {
+public class JAXBCoder<T> extends CustomCoder<T> {
 
   private final Class<T> jaxbClass;
-  private final TypeDescriptor<T> typeDescriptor;
   private transient volatile JAXBContext jaxbContext;
   private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller;
   private final EmptyOnDeserializationThreadLocal<Unmarshaller> jaxbUnmarshaller;
@@ -57,7 +52,6 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
 
   private JAXBCoder(Class<T> jaxbClass) {
     this.jaxbClass = jaxbClass;
-    this.typeDescriptor = TypeDescriptor.of(jaxbClass);
     this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() {
       @Override
       protected Marshaller initialValue() {
@@ -147,7 +141,7 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
 
   @Override
   public TypeDescriptor<T> getEncodedTypeDescriptor() {
-    return typeDescriptor;
+    return TypeDescriptor.of(jaxbClass);
   }
 
   private static class CloseIgnoringInputStream extends FilterInputStream {
@@ -173,31 +167,4 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
       // JAXB closes the underlying stream so we must filter out those calls.
     }
   }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  // JSON Serialization details below
-
-  private static final String JAXB_CLASS = "jaxb_class";
-
-  /**
-   * Constructor for JSON deserialization only.
-   */
-  @JsonCreator
-  public static <T> JAXBCoder<T> of(
-      @JsonProperty(JAXB_CLASS) String jaxbClassName) {
-    try {
-      @SuppressWarnings("unchecked")
-      Class<T> jaxbClass = (Class<T>) Class.forName(jaxbClassName);
-      return of(jaxbClass);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  @Override
-  protected CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    Structs.addString(result, JAXB_CLASS, jaxbClass.getName());
-    return result;
-  }
 }