You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/03/01 21:03:15 UTC
[1/2] beam git commit: [BEAM-111] Move WritableCoder to hadoop-common
Repository: beam
Updated Branches:
refs/heads/master b49ec3fa2 -> a81c45781
[BEAM-111] Move WritableCoder to hadoop-common
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44624c38
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44624c38
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44624c38
Branch: refs/heads/master
Commit: 44624c382ac5ff062191d41df1dcd008839352e0
Parents: b49ec3f
Author: Isma�l Mej�a <ie...@gmail.com>
Authored: Sat Feb 25 05:20:58 2017 +0100
Committer: Sela <an...@paypal.com>
Committed: Wed Mar 1 22:47:16 2017 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 6 +
.../runners/spark/coders/NullWritableCoder.java | 76 ------------
.../runners/spark/coders/WritableCoder.java | 122 -------------------
.../runners/spark/coders/WritableCoderTest.java | 45 -------
.../io/hadoop/HadoopFileFormatPipelineTest.java | 2 +-
sdks/java/io/hadoop-common/pom.xml | 10 ++
.../beam/sdk/io/hadoop/WritableCoder.java | 116 ++++++++++++++++++
.../beam/sdk/io/hadoop/WritableCoderTest.java | 45 +++++++
sdks/java/io/hdfs/pom.xml | 5 -
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 1 +
.../apache/beam/sdk/io/hdfs/WritableCoder.java | 116 ------------------
.../beam/sdk/io/hdfs/WritableCoderTest.java | 45 -------
12 files changed, 179 insertions(+), 410 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 409fc27..8c35178 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -306,6 +306,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
deleted file mode 100644
index ebbab1a..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import java.io.InputStream;
-import java.io.OutputStream;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.hadoop.io.NullWritable;
-
-/**
- * Simple writable coder for Null.
- */
-public final class NullWritableCoder extends WritableCoder<NullWritable> {
- private static final long serialVersionUID = 1L;
-
- @JsonCreator
- public static NullWritableCoder of() {
- return INSTANCE;
- }
-
- private static final NullWritableCoder INSTANCE = new NullWritableCoder();
-
- private NullWritableCoder() {
- super(NullWritable.class);
- }
-
- @Override
- public void encode(NullWritable value, OutputStream outStream, Context context) {
- // nothing to write
- }
-
- @Override
- public NullWritable decode(InputStream inStream, Context context) {
- return NullWritable.get();
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- /**
- * Returns true since registerByteSizeObserver() runs in constant time.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(NullWritable value, Context context) {
- return true;
- }
-
- @Override
- protected long getEncodedElementByteSize(NullWritable value, Context context) {
- return 0;
- }
-
- @Override
- public void verifyDeterministic() throws Coder.NonDeterministicException {
- // NullWritableCoder is deterministic
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
deleted file mode 100644
index 40c2627..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-/**
- * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
- *
- * <p>To use, specify the coder type on a PCollection:
- * <pre>
- * {@code
- * PCollection<MyRecord> records =
- * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder
- */
-public class WritableCoder<T extends Writable> extends StandardCoder<T> {
- private static final long serialVersionUID = 0L;
-
- /**
- * Returns a {@code WritableCoder} instance for the provided element class.
- * @param <T> the element type
- * @param clazz the element class
- * @return a {@code WritableCoder} instance for the provided element class
- */
- public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
- if (clazz.equals(NullWritable.class)) {
- @SuppressWarnings("unchecked")
- WritableCoder<T> result = (WritableCoder<T>) NullWritableCoder.of();
- return result;
- }
- return new WritableCoder<>(clazz);
- }
-
- @JsonCreator
- @SuppressWarnings("unchecked")
- public static WritableCoder<?> of(@JsonProperty("type") String classType)
- throws ClassNotFoundException {
- Class<?> clazz = Class.forName(classType);
- if (!Writable.class.isAssignableFrom(clazz)) {
- throw new ClassNotFoundException(
- "Class " + classType + " does not implement Writable");
- }
- return of((Class<? extends Writable>) clazz);
- }
-
- private final Class<T> type;
-
- public WritableCoder(Class<T> type) {
- this.type = type;
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context) throws IOException {
- value.write(new DataOutputStream(outStream));
- }
-
- @Override
- public T decode(InputStream inStream, Context context) throws IOException {
- try {
- T t = type.getConstructor().newInstance();
- t.readFields(new DataInputStream(inStream));
- return t;
- } catch (NoSuchMethodException | InstantiationException | IllegalAccessException e) {
- throw new CoderException("unable to deserialize record", e);
- } catch (InvocationTargetException ite) {
- throw new CoderException("unable to deserialize record", ite.getCause());
- }
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return null;
- }
-
- @Override
- protected CloudObject initializeCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
- result.put("type", type.getName());
- return result;
- }
-
- @Override
- public void verifyDeterministic() throws Coder.NonDeterministicException {
- throw new NonDeterministicException(this,
- "Hadoop Writable may be non-deterministic.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
deleted file mode 100644
index 538fd97..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.coders;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-/**
- * Tests for WritableCoder.
- */
-public class WritableCoderTest {
-
- @Test
- public void testIntWritableEncoding() throws Exception {
- IntWritable value = new IntWritable(42);
- WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
-
- CoderProperties.coderDecodeEncodeEqual(coder, value);
- }
-
- @Test
- public void testNullWritableEncoding() throws Exception {
- WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
-
- CoderProperties.coderDecodeEncodeEqual(coder, NullWritable.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index a5072d6..48b5433 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -23,9 +23,9 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.runners.spark.coders.WritableCoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index 13e159c..fcd984f 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -32,6 +32,16 @@
<dependencies>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
new file mode 100644
index 0000000..0ba367d
--- /dev/null
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
+ *
+ * <p>To use, specify the coder type on a PCollection:
+ * <pre>
+ * {@code
+ * PCollection<MyRecord> records =
+ * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
+ * }
+ * </pre>
+ *
+ * @param <T> the type of elements handled by this coder.
+ */
+public class WritableCoder<T extends Writable> extends StandardCoder<T> {
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Returns a {@code WritableCoder} instance for the provided element class.
+ * @param <T> the element type
+ */
+ public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
+ return new WritableCoder<>(clazz);
+ }
+
+ @JsonCreator
+ @SuppressWarnings("unchecked")
+ public static WritableCoder<?> of(@JsonProperty("type") String classType)
+ throws ClassNotFoundException {
+ Class<?> clazz = Class.forName(classType);
+ if (!Writable.class.isAssignableFrom(clazz)) {
+ throw new ClassNotFoundException(
+ "Class " + classType + " does not implement Writable");
+ }
+ return of((Class<? extends Writable>) clazz);
+ }
+
+ private final Class<T> type;
+
+ public WritableCoder(Class<T> type) {
+ this.type = type;
+ }
+
+ @Override
+ public void encode(T value, OutputStream outStream, Context context) throws IOException {
+ value.write(new DataOutputStream(outStream));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T decode(InputStream inStream, Context context) throws IOException {
+ try {
+ if (type == NullWritable.class) {
+ // NullWritable has no default constructor
+ return (T) NullWritable.get();
+ }
+ T t = type.newInstance();
+ t.readFields(new DataInputStream(inStream));
+ return t;
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new CoderException("unable to deserialize record", e);
+ }
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return null;
+ }
+
+ @Override
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
+ result.put("type", type.getName());
+ return result;
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "Hadoop Writable may be non-deterministic.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
new file mode 100644
index 0000000..8127773
--- /dev/null
+++ b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+/**
+ * Tests for WritableCoder.
+ */
+public class WritableCoderTest {
+
+ @Test
+ public void testIntWritableEncoding() throws Exception {
+ IntWritable value = new IntWritable(42);
+ WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
+
+ CoderProperties.coderDecodeEncodeEqual(coder, value);
+ }
+
+ @Test
+ public void testNullWritableEncoding() throws Exception {
+ NullWritable value = NullWritable.get();
+ WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
+
+ CoderProperties.coderDecodeEncodeEqual(coder, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 1212b0e..f3a1a27 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -94,11 +94,6 @@
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<optional>true</optional>
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index 2a731fb..0e3146f 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.CoderUtils;
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
deleted file mode 100644
index d958cda..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-/**
- * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
- *
- * <p>To use, specify the coder type on a PCollection:
- * <pre>
- * {@code
- * PCollection<MyRecord> records =
- * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder.
- */
-public class WritableCoder<T extends Writable> extends StandardCoder<T> {
- private static final long serialVersionUID = 0L;
-
- /**
- * Returns a {@code WritableCoder} instance for the provided element class.
- * @param <T> the element type
- */
- public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
- return new WritableCoder<>(clazz);
- }
-
- @JsonCreator
- @SuppressWarnings("unchecked")
- public static WritableCoder<?> of(@JsonProperty("type") String classType)
- throws ClassNotFoundException {
- Class<?> clazz = Class.forName(classType);
- if (!Writable.class.isAssignableFrom(clazz)) {
- throw new ClassNotFoundException(
- "Class " + classType + " does not implement Writable");
- }
- return of((Class<? extends Writable>) clazz);
- }
-
- private final Class<T> type;
-
- public WritableCoder(Class<T> type) {
- this.type = type;
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context) throws IOException {
- value.write(new DataOutputStream(outStream));
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public T decode(InputStream inStream, Context context) throws IOException {
- try {
- if (type == NullWritable.class) {
- // NullWritable has no default constructor
- return (T) NullWritable.get();
- }
- T t = type.newInstance();
- t.readFields(new DataInputStream(inStream));
- return t;
- } catch (InstantiationException | IllegalAccessException e) {
- throw new CoderException("unable to deserialize record", e);
- }
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return null;
- }
-
- @Override
- public CloudObject initializeCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
- result.put("type", type.getName());
- return result;
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "Hadoop Writable may be non-deterministic.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
deleted file mode 100644
index e78f850..0000000
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-/**
- * Tests for WritableCoder.
- */
-public class WritableCoderTest {
-
- @Test
- public void testIntWritableEncoding() throws Exception {
- IntWritable value = new IntWritable(42);
- WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
-
- CoderProperties.coderDecodeEncodeEqual(coder, value);
- }
-
- @Test
- public void testNullWritableEncoding() throws Exception {
- NullWritable value = NullWritable.get();
- WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
-
- CoderProperties.coderDecodeEncodeEqual(coder, value);
- }
-}
[2/2] beam git commit: This closes #2118
Posted by am...@apache.org.
This closes #2118
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a81c4578
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a81c4578
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a81c4578
Branch: refs/heads/master
Commit: a81c45781491185569f8ff75f5256d6de84a7150
Parents: b49ec3f 44624c3
Author: Sela <an...@paypal.com>
Authored: Wed Mar 1 22:47:40 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Wed Mar 1 22:47:40 2017 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 6 +
.../runners/spark/coders/NullWritableCoder.java | 76 ------------
.../runners/spark/coders/WritableCoder.java | 122 -------------------
.../runners/spark/coders/WritableCoderTest.java | 45 -------
.../io/hadoop/HadoopFileFormatPipelineTest.java | 2 +-
sdks/java/io/hadoop-common/pom.xml | 10 ++
.../beam/sdk/io/hadoop/WritableCoder.java | 116 ++++++++++++++++++
.../beam/sdk/io/hadoop/WritableCoderTest.java | 45 +++++++
sdks/java/io/hdfs/pom.xml | 5 -
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 1 +
.../apache/beam/sdk/io/hdfs/WritableCoder.java | 116 ------------------
.../beam/sdk/io/hdfs/WritableCoderTest.java | 45 -------
12 files changed, 179 insertions(+), 410 deletions(-)
----------------------------------------------------------------------