You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/03/01 21:03:15 UTC

[1/2] beam git commit: [BEAM-111] Move WritableCoder to hadoop-common

Repository: beam
Updated Branches:
  refs/heads/master b49ec3fa2 -> a81c45781


[BEAM-111] Move WritableCoder to hadoop-common


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44624c38
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44624c38
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44624c38

Branch: refs/heads/master
Commit: 44624c382ac5ff062191d41df1dcd008839352e0
Parents: b49ec3f
Author: Isma�l Mej�a <ie...@gmail.com>
Authored: Sat Feb 25 05:20:58 2017 +0100
Committer: Sela <an...@paypal.com>
Committed: Wed Mar 1 22:47:16 2017 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           |   6 +
 .../runners/spark/coders/NullWritableCoder.java |  76 ------------
 .../runners/spark/coders/WritableCoder.java     | 122 -------------------
 .../runners/spark/coders/WritableCoderTest.java |  45 -------
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   2 +-
 sdks/java/io/hadoop-common/pom.xml              |  10 ++
 .../beam/sdk/io/hadoop/WritableCoder.java       | 116 ++++++++++++++++++
 .../beam/sdk/io/hadoop/WritableCoderTest.java   |  45 +++++++
 sdks/java/io/hdfs/pom.xml                       |   5 -
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java |   1 +
 .../apache/beam/sdk/io/hdfs/WritableCoder.java  | 116 ------------------
 .../beam/sdk/io/hdfs/WritableCoderTest.java     |  45 -------
 12 files changed, 179 insertions(+), 410 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 409fc27..8c35178 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -306,6 +306,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
deleted file mode 100644
index ebbab1a..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import java.io.InputStream;
-import java.io.OutputStream;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.hadoop.io.NullWritable;
-
-/**
- * Simple writable coder for Null.
- */
-public final class NullWritableCoder extends WritableCoder<NullWritable> {
-  private static final long serialVersionUID = 1L;
-
-  @JsonCreator
-  public static NullWritableCoder of() {
-    return INSTANCE;
-  }
-
-  private static final NullWritableCoder INSTANCE = new NullWritableCoder();
-
-  private NullWritableCoder() {
-    super(NullWritable.class);
-  }
-
-  @Override
-  public void encode(NullWritable value, OutputStream outStream, Context context) {
-    // nothing to write
-  }
-
-  @Override
-  public NullWritable decode(InputStream inStream, Context context) {
-    return NullWritable.get();
-  }
-
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * Returns true since registerByteSizeObserver() runs in constant time.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(NullWritable value, Context context) {
-    return true;
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(NullWritable value, Context context) {
-    return 0;
-  }
-
-  @Override
-  public void verifyDeterministic() throws Coder.NonDeterministicException {
-    // NullWritableCoder is deterministic
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
deleted file mode 100644
index 40c2627..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-/**
- * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
- *
- * <p>To use, specify the coder type on a PCollection:
- * <pre>
- * {@code
- *   PCollection<MyRecord> records =
- *       foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder
- */
-public class WritableCoder<T extends Writable> extends StandardCoder<T> {
-  private static final long serialVersionUID = 0L;
-
-  /**
-   * Returns a {@code WritableCoder} instance for the provided element class.
-   * @param <T> the element type
-   * @param clazz the element class
-   * @return a {@code WritableCoder} instance for the provided element class
-   */
-  public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
-    if (clazz.equals(NullWritable.class)) {
-      @SuppressWarnings("unchecked")
-      WritableCoder<T> result = (WritableCoder<T>) NullWritableCoder.of();
-      return result;
-    }
-    return new WritableCoder<>(clazz);
-  }
-
-  @JsonCreator
-  @SuppressWarnings("unchecked")
-  public static WritableCoder<?> of(@JsonProperty("type") String classType)
-      throws ClassNotFoundException {
-    Class<?> clazz = Class.forName(classType);
-    if (!Writable.class.isAssignableFrom(clazz)) {
-      throw new ClassNotFoundException(
-          "Class " + classType + " does not implement Writable");
-    }
-    return of((Class<? extends Writable>) clazz);
-  }
-
-  private final Class<T> type;
-
-  public WritableCoder(Class<T> type) {
-    this.type = type;
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException {
-    value.write(new DataOutputStream(outStream));
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
-    try {
-      T t = type.getConstructor().newInstance();
-      t.readFields(new DataInputStream(inStream));
-      return t;
-    } catch (NoSuchMethodException | InstantiationException | IllegalAccessException e) {
-      throw new CoderException("unable to deserialize record", e);
-    } catch (InvocationTargetException ite) {
-      throw new CoderException("unable to deserialize record", ite.getCause());
-    }
-  }
-
-  @Override
-  public List<Coder<?>> getCoderArguments() {
-    return null;
-  }
-
-  @Override
-  protected CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    result.put("type", type.getName());
-    return result;
-  }
-
-  @Override
-  public void verifyDeterministic() throws Coder.NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Hadoop Writable may be non-deterministic.");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
deleted file mode 100644
index 538fd97..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.coders;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-/**
- * Tests for WritableCoder.
- */
-public class WritableCoderTest {
-
-  @Test
-  public void testIntWritableEncoding() throws Exception {
-    IntWritable value = new IntWritable(42);
-    WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
-
-    CoderProperties.coderDecodeEncodeEqual(coder, value);
-  }
-
-  @Test
-  public void testNullWritableEncoding() throws Exception {
-    WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
-
-    CoderProperties.coderDecodeEncodeEqual(coder, NullWritable.get());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index a5072d6..48b5433 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -23,9 +23,9 @@ import static org.junit.Assert.assertEquals;
 import java.io.File;
 import java.io.IOException;
 import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.runners.spark.coders.WritableCoder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index 13e159c..fcd984f 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -32,6 +32,16 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
new file mode 100644
index 0000000..0ba367d
--- /dev/null
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
+ *
+ * <p>To use, specify the coder type on a PCollection:
+ * <pre>
+ * {@code
+ *   PCollection<MyRecord> records =
+ *       foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
+ * }
+ * </pre>
+ *
+ * @param <T> the type of elements handled by this coder.
+ */
+public class WritableCoder<T extends Writable> extends StandardCoder<T> {
+  private static final long serialVersionUID = 0L;
+
+  /**
+   * Returns a {@code WritableCoder} instance for the provided element class.
+   * @param <T> the element type
+   */
+  public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
+    return new WritableCoder<>(clazz);
+  }
+
+  @JsonCreator
+  @SuppressWarnings("unchecked")
+  public static WritableCoder<?> of(@JsonProperty("type") String classType)
+      throws ClassNotFoundException {
+    Class<?> clazz = Class.forName(classType);
+    if (!Writable.class.isAssignableFrom(clazz)) {
+      throw new ClassNotFoundException(
+          "Class " + classType + " does not implement Writable");
+    }
+    return of((Class<? extends Writable>) clazz);
+  }
+
+  private final Class<T> type;
+
+  public WritableCoder(Class<T> type) {
+    this.type = type;
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream, Context context) throws IOException {
+    value.write(new DataOutputStream(outStream));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public T decode(InputStream inStream, Context context) throws IOException {
+    try {
+      if (type == NullWritable.class) {
+        // NullWritable has no default constructor
+        return (T) NullWritable.get();
+      }
+      T t = type.newInstance();
+      t.readFields(new DataInputStream(inStream));
+      return t;
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new CoderException("unable to deserialize record", e);
+    }
+  }
+
+  @Override
+  public List<Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
+    result.put("type", type.getName());
+    return result;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    throw new NonDeterministicException(this,
+        "Hadoop Writable may be non-deterministic.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
new file mode 100644
index 0000000..8127773
--- /dev/null
+++ b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+/**
+ * Tests for WritableCoder.
+ */
+public class WritableCoderTest {
+
+  @Test
+  public void testIntWritableEncoding() throws Exception {
+    IntWritable value = new IntWritable(42);
+    WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
+
+    CoderProperties.coderDecodeEncodeEqual(coder, value);
+  }
+
+  @Test
+  public void testNullWritableEncoding() throws Exception {
+    NullWritable value = NullWritable.get();
+    WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
+
+    CoderProperties.coderDecodeEncodeEqual(coder, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 1212b0e..f3a1a27 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -94,11 +94,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.auto.service</groupId>
       <artifactId>auto-service</artifactId>
       <optional>true</optional>

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index 2a731fb..0e3146f 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.CoderUtils;

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
deleted file mode 100644
index d958cda..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-/**
- * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
- *
- * <p>To use, specify the coder type on a PCollection:
- * <pre>
- * {@code
- *   PCollection<MyRecord> records =
- *       foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder.
- */
-public class WritableCoder<T extends Writable> extends StandardCoder<T> {
-  private static final long serialVersionUID = 0L;
-
-  /**
-   * Returns a {@code WritableCoder} instance for the provided element class.
-   * @param <T> the element type
-   */
-  public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
-    return new WritableCoder<>(clazz);
-  }
-
-  @JsonCreator
-  @SuppressWarnings("unchecked")
-  public static WritableCoder<?> of(@JsonProperty("type") String classType)
-      throws ClassNotFoundException {
-    Class<?> clazz = Class.forName(classType);
-    if (!Writable.class.isAssignableFrom(clazz)) {
-      throw new ClassNotFoundException(
-          "Class " + classType + " does not implement Writable");
-    }
-    return of((Class<? extends Writable>) clazz);
-  }
-
-  private final Class<T> type;
-
-  public WritableCoder(Class<T> type) {
-    this.type = type;
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException {
-    value.write(new DataOutputStream(outStream));
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
-    try {
-      if (type == NullWritable.class) {
-        // NullWritable has no default constructor
-        return (T) NullWritable.get();
-      }
-      T t = type.newInstance();
-      t.readFields(new DataInputStream(inStream));
-      return t;
-    } catch (InstantiationException | IllegalAccessException e) {
-      throw new CoderException("unable to deserialize record", e);
-    }
-  }
-
-  @Override
-  public List<Coder<?>> getCoderArguments() {
-    return null;
-  }
-
-  @Override
-  public CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    result.put("type", type.getName());
-    return result;
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Hadoop Writable may be non-deterministic.");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
deleted file mode 100644
index e78f850..0000000
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-/**
- * Tests for WritableCoder.
- */
-public class WritableCoderTest {
-
-  @Test
-  public void testIntWritableEncoding() throws Exception {
-    IntWritable value = new IntWritable(42);
-    WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
-
-    CoderProperties.coderDecodeEncodeEqual(coder, value);
-  }
-
-  @Test
-  public void testNullWritableEncoding() throws Exception {
-    NullWritable value = NullWritable.get();
-    WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
-
-    CoderProperties.coderDecodeEncodeEqual(coder, value);
-  }
-}


[2/2] beam git commit: This closes #2118

Posted by am...@apache.org.
This closes #2118


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a81c4578
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a81c4578
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a81c4578

Branch: refs/heads/master
Commit: a81c45781491185569f8ff75f5256d6de84a7150
Parents: b49ec3f 44624c3
Author: Sela <an...@paypal.com>
Authored: Wed Mar 1 22:47:40 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Wed Mar 1 22:47:40 2017 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           |   6 +
 .../runners/spark/coders/NullWritableCoder.java |  76 ------------
 .../runners/spark/coders/WritableCoder.java     | 122 -------------------
 .../runners/spark/coders/WritableCoderTest.java |  45 -------
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   2 +-
 sdks/java/io/hadoop-common/pom.xml              |  10 ++
 .../beam/sdk/io/hadoop/WritableCoder.java       | 116 ++++++++++++++++++
 .../beam/sdk/io/hadoop/WritableCoderTest.java   |  45 +++++++
 sdks/java/io/hdfs/pom.xml                       |   5 -
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java |   1 +
 .../apache/beam/sdk/io/hdfs/WritableCoder.java  | 116 ------------------
 .../beam/sdk/io/hdfs/WritableCoderTest.java     |  45 -------
 12 files changed, 179 insertions(+), 410 deletions(-)
----------------------------------------------------------------------