You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/24 20:14:17 UTC
[8/9] beam git commit: Rename ReadTranslator to ReadTranslation
Rename ReadTranslator to ReadTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b35e91d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b35e91d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b35e91d4
Branch: refs/heads/master
Commit: b35e91d4ed99b74d37a08a1385018b4ca326b3a0
Parents: bc4f44f
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 15:30:22 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 15:53:41 2017 -0700
----------------------------------------------------------------------
.../core/construction/ReadTranslation.java | 127 +++++++++++++
.../core/construction/ReadTranslator.java | 127 -------------
.../core/construction/ReadTranslationTest.java | 179 +++++++++++++++++++
.../core/construction/ReadTranslatorTest.java | 179 -------------------
4 files changed, 306 insertions(+), 306 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
new file mode 100644
index 0000000..d6c3400
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.SerializableUtils;
+
+/**
+ * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
+ * {@link PTransform PTransforms} into {@link ReadPayload} protos.
+ */
+public class ReadTranslation {
+ private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1";
+ private static final String JAVA_SERIALIZED_UNBOUNDED_SOURCE = "urn:beam:java:unboundedsource:v1";
+
+ public static ReadPayload toProto(Read.Bounded<?> read) {
+ return ReadPayload.newBuilder()
+ .setIsBounded(IsBounded.BOUNDED)
+ .setSource(toProto(read.getSource()))
+ .build();
+ }
+
+ public static ReadPayload toProto(Read.Unbounded<?> read) {
+ return ReadPayload.newBuilder()
+ .setIsBounded(IsBounded.UNBOUNDED)
+ .setSource(toProto(read.getSource()))
+ .build();
+ }
+
+ public static SdkFunctionSpec toProto(Source<?> source) {
+ if (source instanceof BoundedSource) {
+ return toProto((BoundedSource) source);
+ } else if (source instanceof UnboundedSource) {
+ return toProto((UnboundedSource<?, ?>) source);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unknown %s type %s", Source.class.getSimpleName(), source.getClass()));
+ }
+ }
+
+ private static SdkFunctionSpec toProto(BoundedSource<?> source) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
+ .build())))
+ .build();
+ }
+
+ public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload)
+ throws InvalidProtocolBufferException {
+ checkArgument(payload.getIsBounded().equals(IsBounded.BOUNDED));
+ return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray(
+ payload
+ .getSource()
+ .getSpec()
+ .getParameter()
+ .unpack(BytesValue.class)
+ .getValue()
+ .toByteArray(),
+ "BoundedSource");
+ }
+
+ private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
+ .build())))
+ .build();
+ }
+
+ public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload)
+ throws InvalidProtocolBufferException {
+ checkArgument(payload.getIsBounded().equals(IsBounded.UNBOUNDED));
+ return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray(
+ payload
+ .getSource()
+ .getSpec()
+ .getParameter()
+ .unpack(BytesValue.class)
+ .getValue()
+ .toByteArray(),
+ "BoundedSource");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java
deleted file mode 100644
index f944938..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.SerializableUtils;
-
-/**
- * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
- * {@link PTransform PTransforms} into {@link ReadPayload} protos.
- */
-public class ReadTranslator {
- private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1";
- private static final String JAVA_SERIALIZED_UNBOUNDED_SOURCE = "urn:beam:java:unboundedsource:v1";
-
- public static ReadPayload toProto(Read.Bounded<?> read) {
- return ReadPayload.newBuilder()
- .setIsBounded(IsBounded.BOUNDED)
- .setSource(toProto(read.getSource()))
- .build();
- }
-
- public static ReadPayload toProto(Read.Unbounded<?> read) {
- return ReadPayload.newBuilder()
- .setIsBounded(IsBounded.UNBOUNDED)
- .setSource(toProto(read.getSource()))
- .build();
- }
-
- public static SdkFunctionSpec toProto(Source<?> source) {
- if (source instanceof BoundedSource) {
- return toProto((BoundedSource) source);
- } else if (source instanceof UnboundedSource) {
- return toProto((UnboundedSource<?, ?>) source);
- } else {
- throw new IllegalArgumentException(
- String.format("Unknown %s type %s", Source.class.getSimpleName(), source.getClass()));
- }
- }
-
- private static SdkFunctionSpec toProto(BoundedSource<?> source) {
- return SdkFunctionSpec.newBuilder()
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
- .build())))
- .build();
- }
-
- public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload)
- throws InvalidProtocolBufferException {
- checkArgument(payload.getIsBounded().equals(IsBounded.BOUNDED));
- return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray(
- payload
- .getSource()
- .getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
- .toByteArray(),
- "BoundedSource");
- }
-
- private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) {
- return SdkFunctionSpec.newBuilder()
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
- .build())))
- .build();
- }
-
- public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload)
- throws InvalidProtocolBufferException {
- checkArgument(payload.getIsBounded().equals(IsBounded.UNBOUNDED));
- return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray(
- payload
- .getSource()
- .getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
- .toByteArray(),
- "BoundedSource");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
new file mode 100644
index 0000000..740b324
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for {@link ReadTranslation}.
+ */
+@RunWith(Parameterized.class)
+public class ReadTranslationTest {
+
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<Source<?>> data() {
+ return ImmutableList.<Source<?>>of(
+ CountingSource.unbounded(),
+ CountingSource.upTo(100L),
+ new TestBoundedSource(),
+ new TestUnboundedSource());
+ }
+
+ @Parameter(0)
+ public Source<?> source;
+
+ @Test
+ public void testToFromProtoBounded() throws Exception {
+ // TODO: Split into two tests.
+ assumeThat(source, instanceOf(BoundedSource.class));
+ BoundedSource<?> boundedSource = (BoundedSource<?>) this.source;
+ Read.Bounded<?> boundedRead = Read.from(boundedSource);
+ ReadPayload payload = ReadTranslation.toProto(boundedRead);
+ assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.BOUNDED));
+ BoundedSource<?> deserializedSource = ReadTranslation.boundedSourceFromProto(payload);
+ assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
+ }
+
+ @Test
+ public void testToFromProtoUnbounded() throws Exception {
+ assumeThat(source, instanceOf(UnboundedSource.class));
+ UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) this.source;
+ Read.Unbounded<?> unboundedRead = Read.from(unboundedSource);
+ ReadPayload payload = ReadTranslation.toProto(unboundedRead);
+ assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.UNBOUNDED));
+ UnboundedSource<?, ?> deserializedSource = ReadTranslation.unboundedSourceFromProto(payload);
+ assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
+ }
+
+ private static class TestBoundedSource extends BoundedSource<String> {
+ @Override
+ public List<? extends BoundedSource<String>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void validate() {}
+
+ @Override
+ public Coder<String> getDefaultOutputCoder() {
+ return StringUtf8Coder.of();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other != null && other.getClass().equals(TestBoundedSource.class);
+ }
+
+ @Override
+ public int hashCode() {
+ return TestBoundedSource.class.hashCode();
+ }
+ }
+
+ private static class TestUnboundedSource extends UnboundedSource<byte[], CheckpointMark> {
+ @Override
+ public void validate() {}
+
+ @Override
+ public Coder<byte[]> getDefaultOutputCoder() {
+ return ByteArrayCoder.of();
+ }
+
+ @Override
+ public List<? extends UnboundedSource<byte[], CheckpointMark>> split(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public UnboundedReader<byte[]> createReader(
+ PipelineOptions options, @Nullable CheckpointMark checkpointMark) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Coder<CheckpointMark> getCheckpointMarkCoder() {
+ return new TestCheckpointMarkCoder();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other != null && other.getClass().equals(TestUnboundedSource.class);
+ }
+
+ @Override
+ public int hashCode() {
+ return TestUnboundedSource.class.hashCode();
+ }
+
+ private class TestCheckpointMarkCoder extends AtomicCoder<CheckpointMark> {
+ @Override
+ public void encode(CheckpointMark value, OutputStream outStream)
+ throws CoderException, IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CheckpointMark decode(InputStream inStream) throws CoderException, IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java
deleted file mode 100644
index a603e34..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assume.assumeThat;
-
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/**
- * Tests for {@link ReadTranslator}.
- */
-@RunWith(Parameterized.class)
-public class ReadTranslatorTest {
-
- @Parameters(name = "{index}: {0}")
- public static Iterable<Source<?>> data() {
- return ImmutableList.<Source<?>>of(
- CountingSource.unbounded(),
- CountingSource.upTo(100L),
- new TestBoundedSource(),
- new TestUnboundedSource());
- }
-
- @Parameter(0)
- public Source<?> source;
-
- @Test
- public void testToFromProtoBounded() throws Exception {
- // TODO: Split into two tests.
- assumeThat(source, instanceOf(BoundedSource.class));
- BoundedSource<?> boundedSource = (BoundedSource<?>) this.source;
- Read.Bounded<?> boundedRead = Read.from(boundedSource);
- ReadPayload payload = ReadTranslator.toProto(boundedRead);
- assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.BOUNDED));
- BoundedSource<?> deserializedSource = ReadTranslator.boundedSourceFromProto(payload);
- assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
- }
-
- @Test
- public void testToFromProtoUnbounded() throws Exception {
- assumeThat(source, instanceOf(UnboundedSource.class));
- UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) this.source;
- Read.Unbounded<?> unboundedRead = Read.from(unboundedSource);
- ReadPayload payload = ReadTranslator.toProto(unboundedRead);
- assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.UNBOUNDED));
- UnboundedSource<?, ?> deserializedSource = ReadTranslator.unboundedSourceFromProto(payload);
- assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
- }
-
- private static class TestBoundedSource extends BoundedSource<String> {
- @Override
- public List<? extends BoundedSource<String>> split(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void validate() {}
-
- @Override
- public Coder<String> getDefaultOutputCoder() {
- return StringUtf8Coder.of();
- }
-
- @Override
- public boolean equals(Object other) {
- return other != null && other.getClass().equals(TestBoundedSource.class);
- }
-
- @Override
- public int hashCode() {
- return TestBoundedSource.class.hashCode();
- }
- }
-
- private static class TestUnboundedSource extends UnboundedSource<byte[], CheckpointMark> {
- @Override
- public void validate() {}
-
- @Override
- public Coder<byte[]> getDefaultOutputCoder() {
- return ByteArrayCoder.of();
- }
-
- @Override
- public List<? extends UnboundedSource<byte[], CheckpointMark>> split(
- int desiredNumSplits, PipelineOptions options) throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public UnboundedReader<byte[]> createReader(
- PipelineOptions options, @Nullable CheckpointMark checkpointMark) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Coder<CheckpointMark> getCheckpointMarkCoder() {
- return new TestCheckpointMarkCoder();
- }
-
- @Override
- public boolean equals(Object other) {
- return other != null && other.getClass().equals(TestUnboundedSource.class);
- }
-
- @Override
- public int hashCode() {
- return TestUnboundedSource.class.hashCode();
- }
-
- private class TestCheckpointMarkCoder extends AtomicCoder<CheckpointMark> {
- @Override
- public void encode(CheckpointMark value, OutputStream outStream)
- throws CoderException, IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CheckpointMark decode(InputStream inStream) throws CoderException, IOException {
- throw new UnsupportedOperationException();
- }
- }
- }
-}