You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/24 20:14:17 UTC

[8/9] beam git commit: Rename ReadTranslator to ReadTranslation

Rename ReadTranslator to ReadTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b35e91d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b35e91d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b35e91d4

Branch: refs/heads/master
Commit: b35e91d4ed99b74d37a08a1385018b4ca326b3a0
Parents: bc4f44f
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 15:30:22 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 15:53:41 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ReadTranslation.java      | 127 +++++++++++++
 .../core/construction/ReadTranslator.java       | 127 -------------
 .../core/construction/ReadTranslationTest.java  | 179 +++++++++++++++++++
 .../core/construction/ReadTranslatorTest.java   | 179 -------------------
 4 files changed, 306 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
new file mode 100644
index 0000000..d6c3400
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.SerializableUtils;
+
+/**
+ * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
+ * {@link PTransform PTransforms} into {@link ReadPayload} protos.
+ */
+public class ReadTranslation {
+  private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1";
+  private static final String JAVA_SERIALIZED_UNBOUNDED_SOURCE = "urn:beam:java:unboundedsource:v1";
+
+  public static ReadPayload toProto(Read.Bounded<?> read) {
+    return ReadPayload.newBuilder()
+        .setIsBounded(IsBounded.BOUNDED)
+        .setSource(toProto(read.getSource()))
+        .build();
+  }
+
+  public static ReadPayload toProto(Read.Unbounded<?> read) {
+    return ReadPayload.newBuilder()
+        .setIsBounded(IsBounded.UNBOUNDED)
+        .setSource(toProto(read.getSource()))
+        .build();
+  }
+
+  public static SdkFunctionSpec toProto(Source<?> source) {
+    if (source instanceof BoundedSource) {
+      return toProto((BoundedSource) source);
+    } else if (source instanceof UnboundedSource) {
+      return toProto((UnboundedSource<?, ?>) source);
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Unknown %s type %s", Source.class.getSimpleName(), source.getClass()));
+    }
+  }
+
+  private static SdkFunctionSpec toProto(BoundedSource<?> source) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
+                            .build())))
+        .build();
+  }
+
+  public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload)
+      throws InvalidProtocolBufferException {
+    checkArgument(payload.getIsBounded().equals(IsBounded.BOUNDED));
+    return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray(
+        payload
+            .getSource()
+            .getSpec()
+            .getParameter()
+            .unpack(BytesValue.class)
+            .getValue()
+            .toByteArray(),
+        "BoundedSource");
+  }
+
+  private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
+                            .build())))
+        .build();
+  }
+
+  public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload)
+      throws InvalidProtocolBufferException {
+    checkArgument(payload.getIsBounded().equals(IsBounded.UNBOUNDED));
+    return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray(
+        payload
+            .getSource()
+            .getSpec()
+            .getParameter()
+            .unpack(BytesValue.class)
+            .getValue()
+            .toByteArray(),
+        "BoundedSource");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java
deleted file mode 100644
index f944938..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.SerializableUtils;
-
-/**
- * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
- * {@link PTransform PTransforms} into {@link ReadPayload} protos.
- */
-public class ReadTranslator {
-  private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1";
-  private static final String JAVA_SERIALIZED_UNBOUNDED_SOURCE = "urn:beam:java:unboundedsource:v1";
-
-  public static ReadPayload toProto(Read.Bounded<?> read) {
-    return ReadPayload.newBuilder()
-        .setIsBounded(IsBounded.BOUNDED)
-        .setSource(toProto(read.getSource()))
-        .build();
-  }
-
-  public static ReadPayload toProto(Read.Unbounded<?> read) {
-    return ReadPayload.newBuilder()
-        .setIsBounded(IsBounded.UNBOUNDED)
-        .setSource(toProto(read.getSource()))
-        .build();
-  }
-
-  public static SdkFunctionSpec toProto(Source<?> source) {
-    if (source instanceof BoundedSource) {
-      return toProto((BoundedSource) source);
-    } else if (source instanceof UnboundedSource) {
-      return toProto((UnboundedSource<?, ?>) source);
-    } else {
-      throw new IllegalArgumentException(
-          String.format("Unknown %s type %s", Source.class.getSimpleName(), source.getClass()));
-    }
-  }
-
-  private static SdkFunctionSpec toProto(BoundedSource<?> source) {
-    return SdkFunctionSpec.newBuilder()
-        .setSpec(
-            FunctionSpec.newBuilder()
-                .setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
-                            .build())))
-        .build();
-  }
-
-  public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload)
-      throws InvalidProtocolBufferException {
-    checkArgument(payload.getIsBounded().equals(IsBounded.BOUNDED));
-    return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray(
-        payload
-            .getSource()
-            .getSpec()
-            .getParameter()
-            .unpack(BytesValue.class)
-            .getValue()
-            .toByteArray(),
-        "BoundedSource");
-  }
-
-  private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) {
-    return SdkFunctionSpec.newBuilder()
-        .setSpec(
-            FunctionSpec.newBuilder()
-                .setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
-                            .build())))
-        .build();
-  }
-
-  public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload)
-      throws InvalidProtocolBufferException {
-    checkArgument(payload.getIsBounded().equals(IsBounded.UNBOUNDED));
-    return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray(
-        payload
-            .getSource()
-            .getSpec()
-            .getParameter()
-            .unpack(BytesValue.class)
-            .getValue()
-            .toByteArray(),
-        "BoundedSource");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
new file mode 100644
index 0000000..740b324
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for {@link ReadTranslation}.
+ */
+@RunWith(Parameterized.class)
+public class ReadTranslationTest {
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<Source<?>> data() {
+    return ImmutableList.<Source<?>>of(
+        CountingSource.unbounded(),
+        CountingSource.upTo(100L),
+        new TestBoundedSource(),
+        new TestUnboundedSource());
+  }
+
+  @Parameter(0)
+  public Source<?> source;
+
+  @Test
+  public void testToFromProtoBounded() throws Exception {
+    // TODO: Split into two tests.
+    assumeThat(source, instanceOf(BoundedSource.class));
+    BoundedSource<?> boundedSource = (BoundedSource<?>) this.source;
+    Read.Bounded<?> boundedRead = Read.from(boundedSource);
+    ReadPayload payload = ReadTranslation.toProto(boundedRead);
+    assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.BOUNDED));
+    BoundedSource<?> deserializedSource = ReadTranslation.boundedSourceFromProto(payload);
+    assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
+  }
+
+  @Test
+  public void testToFromProtoUnbounded() throws Exception {
+    assumeThat(source, instanceOf(UnboundedSource.class));
+    UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) this.source;
+    Read.Unbounded<?> unboundedRead = Read.from(unboundedSource);
+    ReadPayload payload = ReadTranslation.toProto(unboundedRead);
+    assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.UNBOUNDED));
+    UnboundedSource<?, ?> deserializedSource = ReadTranslation.unboundedSourceFromProto(payload);
+    assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
+  }
+
+  private static class TestBoundedSource extends BoundedSource<String> {
+    @Override
+    public List<? extends BoundedSource<String>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void validate() {}
+
+    @Override
+    public Coder<String> getDefaultOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other != null && other.getClass().equals(TestBoundedSource.class);
+    }
+
+    @Override
+    public int hashCode() {
+      return TestBoundedSource.class.hashCode();
+    }
+  }
+
+  private static class TestUnboundedSource extends UnboundedSource<byte[], CheckpointMark> {
+    @Override
+    public void validate() {}
+
+    @Override
+    public Coder<byte[]> getDefaultOutputCoder() {
+      return ByteArrayCoder.of();
+    }
+
+    @Override
+    public List<? extends UnboundedSource<byte[], CheckpointMark>> split(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public UnboundedReader<byte[]> createReader(
+        PipelineOptions options, @Nullable CheckpointMark checkpointMark) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Coder<CheckpointMark> getCheckpointMarkCoder() {
+      return new TestCheckpointMarkCoder();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other != null && other.getClass().equals(TestUnboundedSource.class);
+    }
+
+    @Override
+    public int hashCode() {
+      return TestUnboundedSource.class.hashCode();
+    }
+
+    private class TestCheckpointMarkCoder extends AtomicCoder<CheckpointMark> {
+      @Override
+      public void encode(CheckpointMark value, OutputStream outStream)
+          throws CoderException, IOException {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public CheckpointMark decode(InputStream inStream) throws CoderException, IOException {
+        throw new UnsupportedOperationException();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java
deleted file mode 100644
index a603e34..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assume.assumeThat;
-
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/**
- * Tests for {@link ReadTranslator}.
- */
-@RunWith(Parameterized.class)
-public class ReadTranslatorTest {
-
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<Source<?>> data() {
-    return ImmutableList.<Source<?>>of(
-        CountingSource.unbounded(),
-        CountingSource.upTo(100L),
-        new TestBoundedSource(),
-        new TestUnboundedSource());
-  }
-
-  @Parameter(0)
-  public Source<?> source;
-
-  @Test
-  public void testToFromProtoBounded() throws Exception {
-    // TODO: Split into two tests.
-    assumeThat(source, instanceOf(BoundedSource.class));
-    BoundedSource<?> boundedSource = (BoundedSource<?>) this.source;
-    Read.Bounded<?> boundedRead = Read.from(boundedSource);
-    ReadPayload payload = ReadTranslator.toProto(boundedRead);
-    assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.BOUNDED));
-    BoundedSource<?> deserializedSource = ReadTranslator.boundedSourceFromProto(payload);
-    assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
-  }
-
-  @Test
-  public void testToFromProtoUnbounded() throws Exception {
-    assumeThat(source, instanceOf(UnboundedSource.class));
-    UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) this.source;
-    Read.Unbounded<?> unboundedRead = Read.from(unboundedSource);
-    ReadPayload payload = ReadTranslator.toProto(unboundedRead);
-    assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.UNBOUNDED));
-    UnboundedSource<?, ?> deserializedSource = ReadTranslator.unboundedSourceFromProto(payload);
-    assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
-  }
-
-  private static class TestBoundedSource extends BoundedSource<String> {
-    @Override
-    public List<? extends BoundedSource<String>> split(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void validate() {}
-
-    @Override
-    public Coder<String> getDefaultOutputCoder() {
-      return StringUtf8Coder.of();
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      return other != null && other.getClass().equals(TestBoundedSource.class);
-    }
-
-    @Override
-    public int hashCode() {
-      return TestBoundedSource.class.hashCode();
-    }
-  }
-
-  private static class TestUnboundedSource extends UnboundedSource<byte[], CheckpointMark> {
-    @Override
-    public void validate() {}
-
-    @Override
-    public Coder<byte[]> getDefaultOutputCoder() {
-      return ByteArrayCoder.of();
-    }
-
-    @Override
-    public List<? extends UnboundedSource<byte[], CheckpointMark>> split(
-        int desiredNumSplits, PipelineOptions options) throws Exception {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public UnboundedReader<byte[]> createReader(
-        PipelineOptions options, @Nullable CheckpointMark checkpointMark) throws IOException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Coder<CheckpointMark> getCheckpointMarkCoder() {
-      return new TestCheckpointMarkCoder();
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      return other != null && other.getClass().equals(TestUnboundedSource.class);
-    }
-
-    @Override
-    public int hashCode() {
-      return TestUnboundedSource.class.hashCode();
-    }
-
-    private class TestCheckpointMarkCoder extends AtomicCoder<CheckpointMark> {
-      @Override
-      public void encode(CheckpointMark value, OutputStream outStream)
-          throws CoderException, IOException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public CheckpointMark decode(InputStream inStream) throws CoderException, IOException {
-        throw new UnsupportedOperationException();
-      }
-    }
-  }
-}