You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/26 22:48:18 UTC
[1/2] incubator-beam git commit: [BEAM-589] Fixing IO.Read
transformation
Repository: incubator-beam
Updated Branches:
refs/heads/master 9abd0bc8a -> 95ab43809
[BEAM-589] Fixing IO.Read transformation
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/310ea749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/310ea749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/310ea749
Branch: refs/heads/master
Commit: 310ea7497a151d1a9567f3e9a3b18e54ddcdc7f0
Parents: 9abd0bc
Author: gaurav gupta <ga...@cisco.com>
Authored: Thu Aug 25 14:00:06 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 26 15:14:11 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/examples/complete/TfIdf.java | 6 +++---
.../runners/core/UnboundedReadFromBoundedSource.java | 6 +++---
.../org/apache/beam/runners/flink/examples/TFIDF.java | 6 +++---
.../apache/beam/runners/dataflow/DataflowRunner.java | 6 +++---
.../DataflowUnboundedReadFromBoundedSource.java | 6 +++---
.../beam/runners/dataflow/DataflowRunnerTest.java | 4 ++--
.../org/apache/beam/runners/spark/io/CreateStream.java | 7 +++----
.../java/org/apache/beam/runners/spark/io/KafkaIO.java | 6 +++---
.../apache/beam/runners/spark/io/hadoop/HadoopIO.java | 6 +++---
.../src/main/java/org/apache/beam/sdk/io/AvroIO.java | 6 +++---
.../beam/sdk/io/BoundedReadFromUnboundedSource.java | 6 +++---
.../src/main/java/org/apache/beam/sdk/io/PubsubIO.java | 6 +++---
.../core/src/main/java/org/apache/beam/sdk/io/Read.java | 10 +++++-----
.../src/main/java/org/apache/beam/sdk/io/TextIO.java | 6 +++---
.../java/org/apache/beam/sdk/transforms/Create.java | 12 ++++++------
.../test/java/org/apache/beam/sdk/io/AvroIOTest.java | 2 +-
.../test/java/org/apache/beam/sdk/io/PubsubIOTest.java | 2 +-
.../test/java/org/apache/beam/sdk/io/TextIOTest.java | 2 +-
.../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 8 ++++----
.../apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 4 ++--
.../src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 3 +--
.../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 3 +--
22 files changed, 60 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 87023ed..6684553 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -51,11 +51,11 @@ import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -152,7 +152,7 @@ public class TfIdf {
* from the documents tagged with which document they are from.
*/
public static class ReadDocuments
- extends PTransform<PInput, PCollection<KV<URI, String>>> {
+ extends PTransform<PBegin, PCollection<KV<URI, String>>> {
private Iterable<URI> uris;
public ReadDocuments(Iterable<URI> uris) {
@@ -165,7 +165,7 @@ public class TfIdf {
}
@Override
- public PCollection<KV<URI, String>> apply(PInput input) {
+ public PCollection<KV<URI, String>> apply(PBegin input) {
Pipeline pipeline = input.getPipeline();
// Create one TextIO.Read transform for each document
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 73688d4..91a1715 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -51,8 +51,8 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory;
* <p>This transform is intended to be used by a runner during pipeline translation to convert
* a Read.Bounded into a Read.Unbounded.
*/
-public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PCollection<T>> {
+public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);
@@ -88,7 +88,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle
}
@Override
- public PCollection<T> apply(PInput input) {
+ public PCollection<T> apply(PBegin input) {
return input.getPipeline().apply(
Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 0ca94a1..a92d339 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -53,11 +53,11 @@ import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,7 +154,7 @@ public class TFIDF {
* from the documents tagged with which document they are from.
*/
public static class ReadDocuments
- extends PTransform<PInput, PCollection<KV<URI, String>>> {
+ extends PTransform<PBegin, PCollection<KV<URI, String>>> {
private static final long serialVersionUID = 0;
private Iterable<URI> uris;
@@ -169,7 +169,7 @@ public class TFIDF {
}
@Override
- public PCollection<KV<URI, String>> apply(PInput input) {
+ public PCollection<KV<URI, String>> apply(PBegin input) {
Pipeline pipeline = input.getPipeline();
// Create one TextIO.Read transform for each document
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a0e24b1..0ce4b58 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -400,7 +400,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
return windowed;
} else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
&& ((PCollectionList<?>) input).size() == 0) {
- return (OutputT) Pipeline.applyTransform(input, Create.of());
+ return (OutputT) Pipeline.applyTransform((PBegin) input, Create.of());
} else if (overrides.containsKey(transform.getClass())) {
// It is the responsibility of whoever constructs overrides to ensure this is type safe.
@SuppressWarnings("unchecked")
@@ -2318,7 +2318,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the
* Dataflow runner in streaming mode.
*/
- private static class StreamingBoundedRead<T> extends PTransform<PInput, PCollection<T>> {
+ private static class StreamingBoundedRead<T> extends PTransform<PBegin, PCollection<T>> {
private final BoundedSource<T> source;
/** Builds an instance of this class from the overridden transform. */
@@ -2333,7 +2333,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public final PCollection<T> apply(PInput input) {
+ public final PCollection<T> apply(PBegin input) {
source.validate();
return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index 85f5e73..866da13 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -51,8 +51,8 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -78,7 +78,7 @@ import org.slf4j.LoggerFactory;
* time dependency. It should be replaced in the dataflow worker as an execution time dependency.
*/
@Deprecated
-public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PInput, PCollection<T>> {
+public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
private static final Logger LOG =
LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
@@ -93,7 +93,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PInput
}
@Override
- public PCollection<T> apply(PInput input) {
+ public PCollection<T> apply(PBegin input) {
return input.getPipeline().apply(
Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 208e84c..58a01aa 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -101,8 +101,8 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.hamcrest.Description;
@@ -970,7 +970,7 @@ public class DataflowRunnerTest {
return options;
}
- private void testUnsupportedSource(PTransform<PInput, ?> source, String name, boolean streaming)
+ private void testUnsupportedSource(PTransform<PBegin, ?> source, String name, boolean streaming)
throws Exception {
String mode = streaming ? "streaming" : "batch";
thrown.expect(UnsupportedOperationException.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index b3beae6..a08c54e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -21,9 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-
/**
* Create an input stream from Queue.
@@ -49,7 +48,7 @@ public final class CreateStream<T> {
/**
* {@link PTransform} for queueing values.
*/
- public static final class QueuedValues<T> extends PTransform<PInput, PCollection<T>> {
+ public static final class QueuedValues<T> extends PTransform<PBegin, PCollection<T>> {
private final Iterable<Iterable<T>> queuedValues;
@@ -64,7 +63,7 @@ public final class CreateStream<T> {
}
@Override
- public PCollection<T> apply(PInput input) {
+ public PCollection<T> apply(PBegin input) {
// Spark streaming micro batches are bounded by default
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
index f57c114..8cf2083 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
@@ -25,8 +25,8 @@ import kafka.serializer.Decoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
/**
* Read stream from Kafka.
@@ -68,7 +68,7 @@ public final class KafkaIO {
/**
* A {@link PTransform} reading from Kafka topics and providing {@link PCollection}.
*/
- public static class Unbound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> {
+ public static class Unbound<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
private final Class<? extends Decoder<K>> keyDecoderClass;
private final Class<? extends Decoder<V>> valueDecoderClass;
@@ -120,7 +120,7 @@ public final class KafkaIO {
}
@Override
- public PCollection<KV<K, V>> apply(PInput input) {
+ public PCollection<KV<K, V>> apply(PBegin input) {
// Spark streaming micro batches are bounded by default
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
index 70bec78..042c316 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
@@ -26,9 +26,9 @@ import org.apache.beam.sdk.io.ShardNameTemplate;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -58,7 +58,7 @@ public final class HadoopIO {
* @param <K> the type of the keys
* @param <V> the type of the values
*/
- public static class Bound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> {
+ public static class Bound<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
private final String filepattern;
private final Class<? extends FileInputFormat<K, V>> formatClass;
@@ -94,7 +94,7 @@ public final class HadoopIO {
}
@Override
- public PCollection<KV<K, V>> apply(PInput input) {
+ public PCollection<KV<K, V>> apply(PBegin input) {
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index e7c302b..267265d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -40,9 +40,9 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
/**
* {@link PTransform}s for reading and writing Avro files.
@@ -184,7 +184,7 @@ public class AvroIO {
* @param <T> the type of each of the elements of the resulting
* PCollection
*/
- public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
+ public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
/** The filepattern to read from. */
@Nullable
final String filepattern;
@@ -270,7 +270,7 @@ public class AvroIO {
}
@Override
- public PCollection<T> apply(PInput input) {
+ public PCollection<T> apply(PBegin input) {
if (filepattern == null) {
throw new IllegalStateException(
"need to set the filepattern of an AvroIO.Read transform");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index ede65a9..28d7746 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
import org.apache.beam.sdk.util.ValueWithRecordId;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -48,7 +48,7 @@ import org.joda.time.Instant;
*
* <p>Created by {@link Read}.
*/
-class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, PCollection<T>> {
+class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> {
private final UnboundedSource<T, ?> source;
private final long maxNumRecords;
private final Duration maxReadTime;
@@ -82,7 +82,7 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, PCollection<T
}
@Override
- public PCollection<T> apply(PInput input) {
+ public PCollection<T> apply(PBegin input) {
PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime)));
if (source.requiresDeduping()) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index b137f15..d113457 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -46,9 +46,9 @@ import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.util.PubsubClient.TopicPath;
import org.apache.beam.sdk.util.PubsubJsonClient;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -481,7 +481,7 @@ public class PubsubIO {
* A {@link PTransform} that reads from a Cloud Pub/Sub source and returns
* a unbounded {@link PCollection} containing the items from the stream.
*/
- public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
+ public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
/** The Cloud Pub/Sub topic to read from. */
@Nullable private final PubsubTopic topic;
@@ -610,7 +610,7 @@ public class PubsubIO {
}
@Override
- public PCollection<T> apply(PInput input) {
+ public PCollection<T> apply(PBegin input) {
if (topic == null && subscription == null) {
throw new IllegalStateException("Need to set either the topic or the subscription for "
+ "a PubsubIO.Read transform");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index f99877d..29c4e47 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -25,9 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PInput;
import org.joda.time.Duration;
/**
@@ -87,7 +87,7 @@ public class Read {
/**
* {@link PTransform} that reads from a {@link BoundedSource}.
*/
- public static class Bounded<T> extends PTransform<PInput, PCollection<T>> {
+ public static class Bounded<T> extends PTransform<PBegin, PCollection<T>> {
private final BoundedSource<T> source;
private Bounded(@Nullable String name, BoundedSource<T> source) {
@@ -101,7 +101,7 @@ public class Read {
}
@Override
- public final PCollection<T> apply(PInput input) {
+ public final PCollection<T> apply(PBegin input) {
source.validate();
return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
@@ -134,7 +134,7 @@ public class Read {
/**
* {@link PTransform} that reads from a {@link UnboundedSource}.
*/
- public static class Unbounded<T> extends PTransform<PInput, PCollection<T>> {
+ public static class Unbounded<T> extends PTransform<PBegin, PCollection<T>> {
private final UnboundedSource<T, ?> source;
private Unbounded(@Nullable String name, UnboundedSource<T, ?> source) {
@@ -169,7 +169,7 @@ public class Read {
}
@Override
- public final PCollection<T> apply(PInput input) {
+ public final PCollection<T> apply(PBegin input) {
source.validate();
return PCollection.<T>createPrimitiveOutputInternal(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index ed9a627..242470b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -43,9 +43,9 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
/**
* {@link PTransform}s for reading and writing text files.
@@ -189,7 +189,7 @@ public class TextIO {
* may use {@link #withCoder(Coder)} to supply a {@code Coder<T>} to produce a
* {@code PCollection<T>} instead.
*/
- public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
+ public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
/** The filepattern to read from. */
@Nullable private final String filepattern;
@@ -269,7 +269,7 @@ public class TextIO {
}
@Override
- public PCollection<T> apply(PInput input) {
+ public PCollection<T> apply(PBegin input) {
if (filepattern == null) {
throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index e261db2..7cd4711 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -46,8 +46,8 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -218,7 +218,7 @@ public class Create<T> {
/**
* A {@code PTransform} that creates a {@code PCollection} from a set of in-memory objects.
*/
- public static class Values<T> extends PTransform<PInput, PCollection<T>> {
+ public static class Values<T> extends PTransform<PBegin, PCollection<T>> {
/**
* Returns a {@link Create.Values} PTransform like this one that uses the given
* {@code Coder<T>} to decode each of the objects into a
@@ -240,7 +240,7 @@ public class Create<T> {
}
@Override
- public PCollection<T> apply(PInput input) {
+ public PCollection<T> apply(PBegin input) {
try {
Coder<T> coder = getDefaultOutputCoder(input);
try {
@@ -257,7 +257,7 @@ public class Create<T> {
}
@Override
- public Coder<T> getDefaultOutputCoder(PInput input) throws CannotProvideCoderException {
+ public Coder<T> getDefaultOutputCoder(PBegin input) throws CannotProvideCoderException {
if (coder.isPresent()) {
return coder.get();
} else {
@@ -421,7 +421,7 @@ public class Create<T> {
* A {@code PTransform} that creates a {@code PCollection} whose elements have
* associated timestamps.
*/
- public static class TimestampedValues<T> extends PTransform<PInput, PCollection<T>>{
+ public static class TimestampedValues<T> extends PTransform<PBegin, PCollection<T>>{
/**
* Returns a {@link Create.TimestampedValues} PTransform like this one that uses the given
* {@code Coder<T>} to decode each of the objects into a
@@ -440,7 +440,7 @@ public class Create<T> {
}
@Override
- public PCollection<T> apply(PInput input) {
+ public PCollection<T> apply(PBegin input) {
try {
Iterable<T> rawElements =
Iterables.transform(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index a8a7746..81f05d7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -291,7 +291,7 @@ public class AvroIOTest {
.withSchema(Schema.create(Schema.Type.STRING))
.withoutValidation();
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("AvroIO.Read should include the file pattern in its primitive transform",
displayData, hasItem(hasDisplayItem("filePattern")));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 4067055..086b726 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -111,7 +111,7 @@ public class PubsubIOTest {
PubsubIO.Read.subscription("projects/project/subscriptions/subscription")
.maxNumRecords(1);
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("PubsubIO.Read should include the subscription in its primitive display data",
displayData, hasItem(hasDisplayItem("subscription")));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 358a30f..8f94766 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -210,7 +210,7 @@ public class TextIOTest {
.from("foobar")
.withoutValidation();
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("TextIO.Read should include the file prefix in its primitive display data",
displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 01a8a1c..304dc82 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -122,11 +122,11 @@ import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Instant;
@@ -377,7 +377,7 @@ public class BigQueryIO {
* A {@link PTransform} that reads from a BigQuery table and returns a bounded
* {@link PCollection} of {@link TableRow TableRows}.
*/
- public static class Bound extends PTransform<PInput, PCollection<TableRow>> {
+ public static class Bound extends PTransform<PBegin, PCollection<TableRow>> {
@Nullable final String jsonTableRef;
@Nullable final String query;
@@ -480,7 +480,7 @@ public class BigQueryIO {
}
@Override
- public void validate(PInput input) {
+ public void validate(PBegin input) {
// Even if existence validation is disabled, we need to make sure that the BigQueryIO
// read is properly specified.
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
@@ -524,7 +524,7 @@ public class BigQueryIO {
}
@Override
- public PCollection<TableRow> apply(PInput input) {
+ public PCollection<TableRow> apply(PBegin input) {
String uuid = randomUUIDString();
final String jobIdToken = "beam_job_" + uuid;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 7a7575b..57eb4ff 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -671,7 +671,7 @@ public class BigQueryIOTest implements Serializable {
.withJobService(mockJobService))
.withoutValidation();
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("BigQueryIO.Read should include the table spec in its primitive display data",
displayData, hasItem(hasDisplayItem("table")));
}
@@ -688,7 +688,7 @@ public class BigQueryIOTest implements Serializable {
.withJobService(mockJobService))
.withoutValidation();
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("BigQueryIO.Read should include the query in its primitive display data",
displayData, hasItem(hasDisplayItem("query")));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index f92dbd4..29d0c5f 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -51,7 +51,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -140,7 +139,7 @@ public class JmsIO {
// handles unbounded source to bounded conversion if maxNumRecords is set.
Unbounded<JmsRecord> unbounded = org.apache.beam.sdk.io.Read.from(createSource());
- PTransform<PInput, PCollection<JmsRecord>> transform = unbounded;
+ PTransform<PBegin, PCollection<JmsRecord>> transform = unbounded;
if (maxNumRecords != Long.MAX_VALUE) {
transform = unbounded.withMaxNumRecords(maxNumRecords);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 885d5d1..f639422 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -73,7 +73,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -450,7 +449,7 @@ public class KafkaIO {
Unbounded<KafkaRecord<K, V>> unbounded =
org.apache.beam.sdk.io.Read.from(makeSource());
- PTransform<PInput, PCollection<KafkaRecord<K, V>>> transform = unbounded;
+ PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;
if (maxNumRecords < Long.MAX_VALUE) {
transform = unbounded.withMaxNumRecords(maxNumRecords);
[2/2] incubator-beam git commit: [BEAM-589] Fixing IO.Read
transformation
Posted by lc...@apache.org.
[BEAM-589] Fixing IO.Read transformation
This closes #889
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/95ab4380
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/95ab4380
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/95ab4380
Branch: refs/heads/master
Commit: 95ab438095b04a3defbc1a21fcd65ac2f3910715
Parents: 9abd0bc 310ea74
Author: Luke Cwik <lc...@google.com>
Authored: Fri Aug 26 15:14:45 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 26 15:14:45 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/examples/complete/TfIdf.java | 6 +++---
.../runners/core/UnboundedReadFromBoundedSource.java | 6 +++---
.../org/apache/beam/runners/flink/examples/TFIDF.java | 6 +++---
.../apache/beam/runners/dataflow/DataflowRunner.java | 6 +++---
.../DataflowUnboundedReadFromBoundedSource.java | 6 +++---
.../beam/runners/dataflow/DataflowRunnerTest.java | 4 ++--
.../org/apache/beam/runners/spark/io/CreateStream.java | 7 +++----
.../java/org/apache/beam/runners/spark/io/KafkaIO.java | 6 +++---
.../apache/beam/runners/spark/io/hadoop/HadoopIO.java | 6 +++---
.../src/main/java/org/apache/beam/sdk/io/AvroIO.java | 6 +++---
.../beam/sdk/io/BoundedReadFromUnboundedSource.java | 6 +++---
.../src/main/java/org/apache/beam/sdk/io/PubsubIO.java | 6 +++---
.../core/src/main/java/org/apache/beam/sdk/io/Read.java | 10 +++++-----
.../src/main/java/org/apache/beam/sdk/io/TextIO.java | 6 +++---
.../java/org/apache/beam/sdk/transforms/Create.java | 12 ++++++------
.../test/java/org/apache/beam/sdk/io/AvroIOTest.java | 2 +-
.../test/java/org/apache/beam/sdk/io/PubsubIOTest.java | 2 +-
.../test/java/org/apache/beam/sdk/io/TextIOTest.java | 2 +-
.../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 8 ++++----
.../apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 4 ++--
.../src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 3 +--
.../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 3 +--
22 files changed, 60 insertions(+), 63 deletions(-)
----------------------------------------------------------------------