You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/03/01 04:10:48 UTC
[1/9] beam git commit: BEAM-1421 Latest should comply with PTransform
style guide
Repository: beam
Updated Branches:
refs/heads/master 63b63f0b5 -> d66029caf
BEAM-1421 Latest should comply with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b9afe8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b9afe8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b9afe8b
Branch: refs/heads/master
Commit: 0b9afe8bf59245d437a33ee11fdcaa8e57ad4ffe
Parents: a7c60cc
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:18:26 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:36 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Latest.java | 80 ++++++++++++--------
1 file changed, 48 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0b9afe8b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index 9c2d715..e6892c5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Iterator;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
@@ -58,12 +59,43 @@ import org.apache.beam.sdk.values.TimestampedValue;
* }
* }</pre>
*
+ * <p>{@link #combineFn} can also be used manually, in combination with state and with the
+ * {@link Combine} transform.
+ *
* <p>For elements with the same timestamp, the element chosen for output is arbitrary.
*/
public class Latest {
// Do not instantiate
private Latest() {}
+ /** Returns a {@link Combine.CombineFn} that selects the latest element among its inputs. */
+ public static <T> Combine.CombineFn<TimestampedValue<T>, ?, T> combineFn() {
+ return new LatestFn<>();
+ }
+
+ /**
+ * Returns a {@link PTransform} that takes as input a {@code PCollection<T>} and returns a
+ * {@code PCollection<T>} whose contents is the latest element according to its event time, or
+ * {@literal null} if there are no elements.
+ *
+ * @param <T> The type of the elements being combined.
+ */
+ public static <T> PTransform<PCollection<T>, PCollection<T>> globally() {
+ return new Globally<>();
+ }
+
+ /**
+ * Returns a {@link PTransform} that takes as input a {@code PCollection<KV<K, V>>} and returns a
+ * {@code PCollection<KV<K, V>>} whose contents is the latest element per-key according to its
+ * event time.
+ *
+ * @param <K> The key type of the elements being combined.
+ * @param <V> The value type of the elements being combined.
+ */
+ public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> perKey() {
+ return new PerKey<>();
+ }
+
/**
* A {@link Combine.CombineFn} that computes the latest element from a set of inputs. This is
* particularly useful as an {@link Aggregator}.
@@ -71,7 +103,8 @@ public class Latest {
* @param <T> Type of input element.
* @see Latest
*/
- public static class LatestFn<T>
+ @VisibleForTesting
+ static class LatestFn<T>
extends Combine.CombineFn<TimestampedValue<T>, TimestampedValue<T>, T> {
/** Construct a new {@link LatestFn} instance. */
public LatestFn() {}
@@ -82,8 +115,8 @@ public class Latest {
}
@Override
- public TimestampedValue<T> addInput(TimestampedValue<T> accumulator,
- TimestampedValue<T> input) {
+ public TimestampedValue<T> addInput(
+ TimestampedValue<T> accumulator, TimestampedValue<T> input) {
checkNotNull(accumulator, "accumulator must be non-null");
checkNotNull(input, "input must be non-null");
@@ -95,16 +128,20 @@ public class Latest {
}
@Override
- public Coder<TimestampedValue<T>> getAccumulatorCoder(CoderRegistry registry,
- Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException {
+ public Coder<TimestampedValue<T>> getAccumulatorCoder(
+ CoderRegistry registry, Coder<TimestampedValue<T>> inputCoder)
+ throws CannotProvideCoderException {
return NullableCoder.of(inputCoder);
}
@Override
- public Coder<T> getDefaultOutputCoder(CoderRegistry registry,
- Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException {
- checkState(inputCoder instanceof TimestampedValue.TimestampedValueCoder,
- "inputCoder must be a TimestampedValueCoder, but was %s", inputCoder);
+ public Coder<T> getDefaultOutputCoder(
+ CoderRegistry registry, Coder<TimestampedValue<T>> inputCoder)
+ throws CannotProvideCoderException {
+ checkState(
+ inputCoder instanceof TimestampedValue.TimestampedValueCoder,
+ "inputCoder must be a TimestampedValueCoder, but was %s",
+ inputCoder);
TimestampedValue.TimestampedValueCoder<T> inputTVCoder =
(TimestampedValue.TimestampedValueCoder<T>) inputCoder;
@@ -134,29 +171,7 @@ public class Latest {
}
}
- /**
- * Returns a {@link PTransform} that takes as input a {@code PCollection<T>} and returns a
- * {@code PCollection<T>} whose contents is the latest element according to its event time, or
- * {@literal null} if there are no elements.
- *
- * @param <T> The type of the elements being combined.
- */
- public static <T> PTransform<PCollection<T>, PCollection<T>> globally() {
- return new Globally<>();
- }
-
- /**
- * Returns a {@link PTransform} that takes as input a {@code PCollection<KV<K, V>>} and returns a
- * {@code PCollection<KV<K, V>>} whose contents is the latest element per-key according to its
- * event time.
- *
- * @param <K> The key type of the elements being combined.
- * @param <V> The value type of the elements being combined.
- */
- public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> perKey() {
- return new PerKey<>();
- }
-
+ /** Implementation of {@link #globally()}. */
private static class Globally<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> expand(PCollection<T> input) {
@@ -175,6 +190,7 @@ public class Latest {
}
}
+ /** Implementation of {@link #perKey()}. */
private static class PerKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
@Override
[7/9] beam git commit: BEAM-1417 Count should comply with PTransform
style guide
Posted by dh...@apache.org.
BEAM-1417 Count should comply with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9957c895
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9957c895
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9957c895
Branch: refs/heads/master
Commit: 9957c895b1b1c3e491f288d17e70445c9864742a
Parents: f5056ef
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Feb 8 16:27:06 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800
----------------------------------------------------------------------
.../beam/examples/complete/AutoComplete.java | 2 +-
.../flink/examples/streaming/AutoComplete.java | 2 +-
.../translation/streaming/CreateStreamTest.java | 4 +++-
.../org/apache/beam/sdk/transforms/Count.java | 24 +++++++++++++-------
.../apache/beam/sdk/testing/TestStreamTest.java | 4 +++-
5 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index c815f27..861a292 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -116,7 +116,7 @@ public class AutoComplete {
public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
PCollection<CompletionCandidate> candidates = input
// First count how often each token appears.
- .apply(new Count.PerElement<String>())
+ .apply(Count.<String>perElement())
// Map the KV outputs of Count into our own CompletionCandiate class.
.apply("CreateCompletionCandidates", ParDo.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index f33e616..d07df29 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -85,7 +85,7 @@ public class AutoComplete {
public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) {
PCollection<CompletionCandidate> candidates = input
// First count how often each token appears.
- .apply(new Count.PerElement<String>())
+ .apply(Count.<String>perElement())
// Map the KV outputs of Count into our own CompletionCandiate class.
.apply("CreateCompletionCandidates", ParDo.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index ff77535..b32f5f3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -114,7 +115,8 @@ public class CreateStreamTest implements Serializable {
.apply(GroupByKey.<Integer, Integer>create())
.apply(Values.<Iterable<Integer>>create())
.apply(Flatten.<Integer>iterables());
- PCollection<Long> count = windowed.apply(Count.<Integer>globally().withoutDefaults());
+ PCollection<Long> count =
+ windowed.apply(Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());
IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));
http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index d164978..fd91430 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -34,31 +34,39 @@ import org.apache.beam.sdk.values.PCollection;
/**
- * {@code PTransorm}s to count the elements in a {@link PCollection}.
+ * {@link PTransform PTransforms} to count the elements in a {@link PCollection}.
*
* <p>{@link Count#perElement()} can be used to count the number of occurrences of each
* distinct element in the PCollection, {@link Count#perKey()} can be used to count the
* number of values per key, and {@link Count#globally()} can be used to count the total
* number of elements in a PCollection.
+ *
+ * <p>{@link #combineFn} can also be used manually, in combination with state and with the
+ * {@link Combine} transform.
*/
public class Count {
private Count() {
// do not instantiate
}
+ /** Returns a {@link CombineFn} that counts the number of its inputs. */
+ public static <T> CombineFn<T, ?, Long> combineFn() {
+ return new CountFn<T>();
+ }
+
/**
- * Returns a {@link Combine.Globally} {@link PTransform} that counts the number of elements in
+ * Returns a {@link PTransform} that counts the number of elements in
* its input {@link PCollection}.
*/
- public static <T> Combine.Globally<T, Long> globally() {
+ public static <T> PTransform<PCollection<T>, PCollection<Long>> globally() {
return Combine.globally(new CountFn<T>());
}
/**
- * Returns a {@link Combine.PerKey} {@link PTransform} that counts the number of elements
+ * Returns a {@link PTransform} that counts the number of elements
* associated with each key of its input {@link PCollection}.
*/
- public static <K, V> Combine.PerKey<K, V, Long> perKey() {
+ public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>> perKey() {
return Combine.<K, V, Long>perKey(new CountFn<V>());
}
@@ -68,7 +76,7 @@ public class Count {
*
* <p>See {@link PerElement Count.PerElement} for more details.
*/
- public static <T> PerElement<T> perElement() {
+ public static <T> PTransform<PCollection<T>, PCollection<KV<T, Long>>> perElement() {
return new PerElement<>();
}
@@ -97,10 +105,10 @@ public class Count {
* @param <T> the type of the elements of the input {@code PCollection}, and the type of the keys
* of the output {@code PCollection}
*/
- public static class PerElement<T>
+ private static class PerElement<T>
extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
- public PerElement() { }
+ private PerElement() { }
@Override
public PCollection<KV<T, Long>> expand(PCollection<T> input) {
http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 1514601..614831d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.TestStream.Builder;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -96,7 +97,8 @@ public class TestStreamTest implements Serializable {
.apply(GroupByKey.<Integer, Integer>create())
.apply(Values.<Iterable<Integer>>create())
.apply(Flatten.<Integer>iterables());
- PCollection<Long> count = windowed.apply(Count.<Integer>globally().withoutDefaults());
+ PCollection<Long> count =
+ windowed.apply(Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());
IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));
[6/9] beam git commit: BEAM-1426 SortValues should comply with
PTransform style guide
Posted by dh...@apache.org.
BEAM-1426 SortValues should comply with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fffd2c55
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fffd2c55
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fffd2c55
Branch: refs/heads/master
Commit: fffd2c55895e4eb03ae0796d3ce9c4f75b9b34c3
Parents: 4f34caa
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:39:33 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800
----------------------------------------------------------------------
.../sorter/BufferedExternalSorter.java | 23 ++++++----
.../sorter/BufferedExternalSorterTest.java | 46 ++++++++++----------
.../sdk/extensions/sorter/SortValuesTest.java | 2 +-
3 files changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fffd2c55/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
index 1a16511..efa251a 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
@@ -29,19 +29,27 @@ import org.apache.beam.sdk.values.KV;
* then fall back to external sorting.
*/
public class BufferedExternalSorter implements Sorter {
+ public static Options options() {
+ return new Options("/tmp", 100);
+ }
+
/** Contains configuration for the sorter. */
public static class Options implements Serializable {
- private String tempLocation = "/tmp";
- private int memoryMB = 100;
+ private final String tempLocation;
+ private final int memoryMB;
+
+ private Options(String tempLocation, int memoryMB) {
+ this.tempLocation = tempLocation;
+ this.memoryMB = memoryMB;
+ }
/** Sets the path to a temporary location where the sorter writes intermediate files. */
- public Options setTempLocation(String tempLocation) {
+ public Options withTempLocation(String tempLocation) {
checkArgument(
!tempLocation.startsWith("gs://"),
"BufferedExternalSorter does not support GCS temporary location");
- this.tempLocation = tempLocation;
- return this;
+ return new Options(tempLocation, memoryMB);
}
/** Returns the configured temporary location. */
@@ -54,13 +62,12 @@ public class BufferedExternalSorter implements Sorter {
* memory sorting and the buffer used when external sorting. Must be greater than zero and less
* than 2048.
*/
- public Options setMemoryMB(int memoryMB) {
+ public Options withMemoryMB(int memoryMB) {
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
// Hadoop's external sort stores the number of available memory bytes in an int, this prevents
// overflow
checkArgument(memoryMB < 2048, "memoryMB must be less than 2048");
- this.memoryMB = memoryMB;
- return this;
+ return new Options(tempLocation, memoryMB);
}
/** Returns the configured size of the memory buffer. */
http://git-wip-us.apache.org/repos/asf/beam/blob/fffd2c55/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
index 3d63b1a..f2be1f5 100644
--- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
@@ -137,29 +137,29 @@ public class BufferedExternalSorterTest {
@Test
public void testEmpty() throws Exception {
- SorterTestUtils.testEmpty(BufferedExternalSorter.create(new BufferedExternalSorter.Options()
- .setTempLocation(tmpLocation.toString())));
+ SorterTestUtils.testEmpty(BufferedExternalSorter.create(BufferedExternalSorter.options()
+ .withTempLocation(tmpLocation.toString())));
}
@Test
public void testSingleElement() throws Exception {
SorterTestUtils.testSingleElement(
- BufferedExternalSorter.create(new BufferedExternalSorter.Options()
- .setTempLocation(tmpLocation.toString())));
+ BufferedExternalSorter.create(BufferedExternalSorter.options()
+ .withTempLocation(tmpLocation.toString())));
}
@Test
public void testEmptyKeyValueElement() throws Exception {
SorterTestUtils.testEmptyKeyValueElement(
- BufferedExternalSorter.create(new BufferedExternalSorter.Options()
- .setTempLocation(tmpLocation.toString())));
+ BufferedExternalSorter.create(BufferedExternalSorter.options()
+ .withTempLocation(tmpLocation.toString())));
}
@Test
public void testMultipleIterations() throws Exception {
SorterTestUtils.testMultipleIterations(
- BufferedExternalSorter.create(new BufferedExternalSorter.Options()
- .setTempLocation(tmpLocation.toString())));
+ BufferedExternalSorter.create(BufferedExternalSorter.options()
+ .withTempLocation(tmpLocation.toString())));
}
@Test
@@ -168,8 +168,8 @@ public class BufferedExternalSorterTest {
new SorterGenerator() {
@Override
public Sorter generateSorter() throws Exception {
- return BufferedExternalSorter.create(new BufferedExternalSorter.Options()
- .setTempLocation(tmpLocation.toString()));
+ return BufferedExternalSorter.create(BufferedExternalSorter.options()
+ .withTempLocation(tmpLocation.toString()));
}
},
1000000,
@@ -182,8 +182,8 @@ public class BufferedExternalSorterTest {
new SorterGenerator() {
@Override
public Sorter generateSorter() throws Exception {
- return BufferedExternalSorter.create(new BufferedExternalSorter.Options()
- .setTempLocation(tmpLocation.toString()));
+ return BufferedExternalSorter.create(BufferedExternalSorter.options()
+ .withTempLocation(tmpLocation.toString()));
}
},
1,
@@ -193,16 +193,16 @@ public class BufferedExternalSorterTest {
@Test
public void testAddAfterSort() throws Exception {
SorterTestUtils.testAddAfterSort(
- BufferedExternalSorter.create(new BufferedExternalSorter.Options()
- .setTempLocation(tmpLocation.toString())), thrown);
+ BufferedExternalSorter.create(BufferedExternalSorter.options()
+ .withTempLocation(tmpLocation.toString())), thrown);
fail();
}
@Test
public void testSortTwice() throws Exception {
SorterTestUtils.testSortTwice(
- BufferedExternalSorter.create(new BufferedExternalSorter.Options()
- .setTempLocation(tmpLocation.toString())), thrown);
+ BufferedExternalSorter.create(BufferedExternalSorter.options()
+ .withTempLocation(tmpLocation.toString())), thrown);
fail();
}
@@ -210,24 +210,24 @@ public class BufferedExternalSorterTest {
public void testNegativeMemory() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be greater than zero");
- BufferedExternalSorter.Options options = new BufferedExternalSorter.Options()
- .setTempLocation(tmpLocation.toString());
- options.setMemoryMB(-1);
+ BufferedExternalSorter.Options options = BufferedExternalSorter.options()
+ .withTempLocation(tmpLocation.toString());
+ options.withMemoryMB(-1);
}
@Test
public void testZeroMemory() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be greater than zero");
- BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
- options.setMemoryMB(0);
+ BufferedExternalSorter.Options options = BufferedExternalSorter.options();
+ options.withMemoryMB(0);
}
@Test
public void testMemoryTooLarge() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("memoryMB must be less than 2048");
- BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
- options.setMemoryMB(2048);
+ BufferedExternalSorter.Options options = BufferedExternalSorter.options();
+ options.withMemoryMB(2048);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fffd2c55/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
index 4f77100..96d6b6a 100644
--- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
@@ -66,7 +66,7 @@ public class SortValuesTest {
// For every Key, sort the iterable of <SecondaryKey, Value> pairs by SecondaryKey.
PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
grouped.apply(
- SortValues.<String, String, Integer>create(new BufferedExternalSorter.Options()));
+ SortValues.<String, String, Integer>create(BufferedExternalSorter.options()));
PAssert.that(groupedAndSorted)
.satisfies(new AssertThatHasExpectedContentsForTestSecondaryKeySorting());
[9/9] beam git commit: This closes #1943
Posted by dh...@apache.org.
This closes #1943
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d66029ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d66029ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d66029ca
Branch: refs/heads/master
Commit: d66029cafde152c0a46ebd276ddfa4c3e7fd3433
Parents: 63b63f0 9957c89
Author: Dan Halperin <dh...@google.com>
Authored: Tue Feb 28 20:10:39 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:39 2017 -0800
----------------------------------------------------------------------
.../beam/examples/complete/AutoComplete.java | 2 +-
.../translation/ApexPipelineTranslator.java | 2 +-
.../FlattenPCollectionTranslator.java | 6 +-
.../operators/ApexFlattenOperator.java | 4 +-
.../EmptyFlattenAsCreateFactory.java | 7 +-
.../core/construction/PTransformMatchers.java | 8 +-
.../construction/PTransformMatchersTest.java | 16 +-
.../beam/runners/direct/EmptyInputProvider.java | 4 +-
.../runners/direct/FlattenEvaluatorFactory.java | 4 +-
.../runners/direct/RootProviderRegistry.java | 4 +-
.../direct/TransformEvaluatorRegistry.java | 4 +-
.../direct/WriteWithShardingFactory.java | 9 +-
.../runners/direct/DirectGraphVisitorTest.java | 4 +-
.../direct/WriteWithShardingFactoryTest.java | 2 +-
.../flink/examples/streaming/AutoComplete.java | 2 +-
.../flink/FlinkBatchTransformTranslators.java | 6 +-
.../FlinkStreamingTransformTranslators.java | 12 +-
.../beam/runners/flink/WriteSinkITCase.java | 2 +-
.../dataflow/DataflowPipelineTranslator.java | 8 +-
.../beam/runners/dataflow/DataflowRunner.java | 13 +-
.../spark/translation/TransformTranslator.java | 8 +-
.../streaming/StreamingTransformTranslator.java | 8 +-
.../translation/streaming/CreateStreamTest.java | 4 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 2 +-
.../main/java/org/apache/beam/sdk/io/Write.java | 727 +++++++++----------
.../org/apache/beam/sdk/transforms/Combine.java | 9 +-
.../org/apache/beam/sdk/transforms/Count.java | 24 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 15 +-
.../apache/beam/sdk/transforms/GroupByKey.java | 8 +-
.../org/apache/beam/sdk/transforms/Latest.java | 80 +-
.../org/apache/beam/sdk/transforms/Sample.java | 121 ++-
.../apache/beam/sdk/transforms/ToString.java | 109 ++-
.../java/org/apache/beam/sdk/io/WriteTest.java | 16 +-
.../beam/sdk/runners/TransformTreeTest.java | 23 +-
.../apache/beam/sdk/testing/TestStreamTest.java | 4 +-
.../apache/beam/sdk/transforms/FlattenTest.java | 12 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 2 +-
.../apache/beam/sdk/transforms/SampleTest.java | 2 +-
.../beam/sdk/transforms/ToStringTest.java | 8 +-
.../sorter/BufferedExternalSorter.java | 23 +-
.../sorter/BufferedExternalSorterTest.java | 46 +-
.../sdk/extensions/sorter/SortValuesTest.java | 2 +-
43 files changed, 716 insertions(+), 658 deletions(-)
----------------------------------------------------------------------
[3/9] beam git commit: BEAM-1419 Flatten should comply with
PTransform style guide
Posted by dh...@apache.org.
BEAM-1419 Flatten should comply with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f5056efc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f5056efc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f5056efc
Branch: refs/heads/master
Commit: f5056efc80bf4f240ec1eeea4e2b50bf567a2d6c
Parents: b87621e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:55:19 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800
----------------------------------------------------------------------
.../apex/translation/ApexPipelineTranslator.java | 2 +-
.../translation/FlattenPCollectionTranslator.java | 6 +++---
.../translation/operators/ApexFlattenOperator.java | 4 ++--
.../construction/EmptyFlattenAsCreateFactory.java | 7 +++----
.../core/construction/PTransformMatchers.java | 4 ++--
.../core/construction/PTransformMatchersTest.java | 6 +++---
.../beam/runners/direct/EmptyInputProvider.java | 4 ++--
.../beam/runners/direct/FlattenEvaluatorFactory.java | 4 ++--
.../beam/runners/direct/RootProviderRegistry.java | 4 ++--
.../runners/direct/TransformEvaluatorRegistry.java | 4 ++--
.../beam/runners/direct/DirectGraphVisitorTest.java | 4 ++--
.../flink/FlinkBatchTransformTranslators.java | 6 +++---
.../flink/FlinkStreamingTransformTranslators.java | 6 +++---
.../runners/dataflow/DataflowPipelineTranslator.java | 8 ++++----
.../spark/translation/TransformTranslator.java | 8 ++++----
.../streaming/StreamingTransformTranslator.java | 8 ++++----
.../java/org/apache/beam/sdk/transforms/Flatten.java | 15 ++++++++-------
.../org/apache/beam/sdk/transforms/FlattenTest.java | 12 ++++++------
18 files changed, 56 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 36d679a..e9d6571 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -64,7 +64,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
- registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+ registerTransformTranslator(Flatten.PCollections.class,
new FlattenPCollectionTranslator());
registerTransformTranslator(PrimitiveCreate.class, new CreateValuesTranslator());
registerTransformTranslator(CreateApexPCollectionView.class,
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 2e31dfc..080c5e9 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -35,14 +35,14 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TaggedPValue;
/**
- * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
+ * {@link Flatten.PCollections} translation to Apex operator.
*/
class FlattenPCollectionTranslator<T> implements
- TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+ TransformTranslator<Flatten.PCollections<T>> {
private static final long serialVersionUID = 1L;
@Override
- public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+ public void translate(Flatten.PCollections<T> transform, TranslationContext context) {
List<PCollection<T>> inputCollections = extractPCollections(context.getInputs());
if (inputCollections.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
index 3d9db51..4594765 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
@@ -21,15 +21,15 @@ import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
-
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.WatermarkTuple;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Apex operator for Beam {@link org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList}.
+ * Apex operator for Beam {@link PCollections}.
*/
public class ApexFlattenOperator<InputT> extends BaseOperator {
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
index 3b29c0a..0168039 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -36,11 +35,11 @@ import org.apache.beam.sdk.values.TaggedPValue;
/**
* A {@link PTransformOverrideFactory} that provides an empty {@link Create} to replace a {@link
- * Flatten.FlattenPCollectionList} that takes no input {@link PCollection PCollections}.
+ * Flatten.PCollections} that takes no input {@link PCollection PCollections}.
*/
public class EmptyFlattenAsCreateFactory<T>
implements PTransformOverrideFactory<
- PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>> {
+ PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>> {
private static final EmptyFlattenAsCreateFactory<Object> INSTANCE =
new EmptyFlattenAsCreateFactory<>();
@@ -52,7 +51,7 @@ public class EmptyFlattenAsCreateFactory<T>
@Override
public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform(
- FlattenPCollectionList<T> transform) {
+ Flatten.PCollections<T> transform) {
return (PTransform) Create.empty(VoidCoder.of());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 7f8d467..efcc455 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -165,14 +165,14 @@ public class PTransformMatchers {
}
/**
- * A {@link PTransformMatcher} which matches a {@link Flatten.FlattenPCollectionList} which
+ * A {@link PTransformMatcher} which matches a {@link Flatten.PCollections} which
* consumes no input {@link PCollection PCollections}.
*/
public static PTransformMatcher emptyFlatten() {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
- return (application.getTransform() instanceof Flatten.FlattenPCollectionList)
+ return (application.getTransform() instanceof Flatten.PCollections)
&& application.getInputs().isEmpty();
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index be3ed6b..491c14f 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -327,7 +327,7 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>>
+ .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
of(
"EmptyFlatten",
Collections.<TaggedPValue>emptyList(),
@@ -346,7 +346,7 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithNonEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>>
+ .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
of(
"Flatten",
Collections.singletonList(
@@ -369,7 +369,7 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithNonFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.FlattenIterables<Object>>
+ .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>
of(
"EmptyFlatten",
Collections.<TaggedPValue>emptyList(),
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
index 1185130..98d4a64 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.PCollectionList;
/** A {@link RootInputProvider} that provides a singleton empty bundle. */
class EmptyInputProvider<T>
- implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.FlattenPCollectionList<T>> {
+ implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.PCollections<T>> {
EmptyInputProvider() {}
/**
@@ -37,7 +37,7 @@ class EmptyInputProvider<T>
*/
@Override
public Collection<CommittedBundle<Void>> getInitialInputs(
- AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>>
+ AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>>
transform,
int targetParallelism) {
return Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 66862ea..8528905 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -22,7 +22,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
@@ -53,7 +53,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
final AppliedPTransform<
- PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
+ PCollectionList<InputT>, PCollection<InputT>, PCollections<InputT>>
application) {
final UncommittedBundle<InputT> outputBundle =
evaluationContext.createBundle(
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
index e8a7665..eb9492c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
@@ -25,7 +25,7 @@ import java.util.Map;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
/**
@@ -42,7 +42,7 @@ class RootProviderRegistry {
.put(
TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class,
new TestStreamEvaluatorFactory.InputProvider(context))
- .put(FlattenPCollectionList.class, new EmptyInputProvider());
+ .put(PCollections.class, new EmptyInputProvider());
return new RootProviderRegistry(defaultProviders.build());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 1ddf9f4..9fdefc3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -32,7 +32,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -53,7 +53,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
.put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
.put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt))
.put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
- .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt))
+ .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
.put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
.put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))
// Runner-specific primitives used in expansion of GroupByKey
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index df49796..8b4573f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -110,7 +110,7 @@ public class DirectGraphVisitorTest implements Serializable {
@Test
public void getRootTransformsContainsEmptyFlatten() {
- FlattenPCollectionList<String> flatten = Flatten.pCollections();
+ PCollections<String> flatten = Flatten.pCollections();
PCollectionList<String> emptyList = PCollectionList.empty(p);
PCollection<String> empty = emptyList.apply(flatten);
empty.setCoder(StringUtf8Coder.of());
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 30e9d68..acc204d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -108,7 +108,7 @@ class FlinkBatchTransformTranslators {
TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
- TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
+ TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());
TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());
@@ -706,12 +706,12 @@ class FlinkBatchTransformTranslators {
private static class FlattenPCollectionTranslatorBatch<T>
implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
- Flatten.FlattenPCollectionList<T>> {
+ Flatten.PCollections<T>> {
@Override
@SuppressWarnings("unchecked")
public void translateNode(
- Flatten.FlattenPCollectionList<T> transform,
+ Flatten.PCollections<T> transform,
FlinkBatchTranslationContext context) {
List<TaggedPValue> allInputs = context.getInputs(transform);
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index a3cceb2..03f567d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -125,7 +125,7 @@ class FlinkStreamingTransformTranslators {
TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
- TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
+ TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
TRANSLATORS.put(
FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
new CreateViewStreamingTranslator());
@@ -999,11 +999,11 @@ class FlinkStreamingTransformTranslators {
private static class FlattenPCollectionTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- Flatten.FlattenPCollectionList<T>> {
+ Flatten.PCollections<T>> {
@Override
public void translateNode(
- Flatten.FlattenPCollectionList<T> transform,
+ Flatten.PCollections<T> transform,
FlinkStreamingTranslationContext context) {
List<TaggedPValue> allInputs = context.getInputs(transform);
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index c672e99..fe5db5a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -744,16 +744,16 @@ public class DataflowPipelineTranslator {
});
registerTransformTranslator(
- Flatten.FlattenPCollectionList.class,
- new TransformTranslator<Flatten.FlattenPCollectionList>() {
+ Flatten.PCollections.class,
+ new TransformTranslator<Flatten.PCollections>() {
@Override
public void translate(
- Flatten.FlattenPCollectionList transform, TranslationContext context) {
+ Flatten.PCollections transform, TranslationContext context) {
flattenHelper(transform, context);
}
private <T> void flattenHelper(
- Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+ Flatten.PCollections<T> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "Flatten");
List<OutputReference> inputs = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index a643651..7fc09ad 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -97,11 +97,11 @@ public final class TransformTranslator {
private TransformTranslator() {
}
- private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
- return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+ private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl() {
+ return new TransformEvaluator<Flatten.PCollections<T>>() {
@SuppressWarnings("unchecked")
@Override
- public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+ public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context) {
List<TaggedPValue> pcs = context.getInputs(transform);
JavaRDD<WindowedValue<T>> unionRDD;
if (pcs.size() == 0) {
@@ -729,7 +729,7 @@ public final class TransformTranslator {
EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
EVALUATORS.put(Combine.Globally.class, combineGlobally());
EVALUATORS.put(Combine.PerKey.class, combinePerKey());
- EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+ EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
EVALUATORS.put(Create.Values.class, create());
EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
EVALUATORS.put(View.AsIterable.class, viewAsIter());
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 4a07741..a856897 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -170,11 +170,11 @@ final class StreamingTransformTranslator {
};
}
- private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
- return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+ private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl() {
+ return new TransformEvaluator<Flatten.PCollections<T>>() {
@SuppressWarnings("unchecked")
@Override
- public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+ public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context) {
List<TaggedPValue> pcs = context.getInputs(transform);
// since this is a streaming pipeline, at least one of the PCollections to "flatten" are
// unbounded, meaning it represents a DStream.
@@ -445,7 +445,7 @@ final class StreamingTransformTranslator {
EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
EVALUATORS.put(CreateStream.class, createFromQueue());
EVALUATORS.put(Window.Bound.class, window());
- EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+ EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 3ef2e55..7b282b5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -64,8 +64,8 @@ public class Flatten {
* @param <T> the type of the elements in the input and output
* {@code PCollection}s.
*/
- public static <T> FlattenPCollectionList<T> pCollections() {
- return new FlattenPCollectionList<>();
+ public static <T> PCollections<T> pCollections() {
+ return new PCollections<>();
}
/**
@@ -86,8 +86,8 @@ public class Flatten {
* @param <T> the type of the elements of the input {@code Iterable} and
* the output {@code PCollection}
*/
- public static <T> FlattenIterables<T> iterables() {
- return new FlattenIterables<>();
+ public static <T> Iterables<T> iterables() {
+ return new Iterables<>();
}
/**
@@ -99,10 +99,10 @@ public class Flatten {
* @param <T> the type of the elements in the input and output
* {@code PCollection}s.
*/
- public static class FlattenPCollectionList<T>
+ public static class PCollections<T>
extends PTransform<PCollectionList<T>, PCollection<T>> {
- private FlattenPCollectionList() { }
+ private PCollections() { }
@Override
public PCollection<T> expand(PCollectionList<T> inputs) {
@@ -159,8 +159,9 @@ public class Flatten {
* @param <T> the type of the elements of the input {@code Iterable}s and
* the output {@code PCollection}
*/
- public static class FlattenIterables<T>
+ public static class Iterables<T>
extends PTransform<PCollection<? extends Iterable<T>>, PCollection<T>> {
+ private Iterables() {}
@Override
public PCollection<T> expand(PCollection<? extends Iterable<T>> in) {
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 3b5011b..bc3e322 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -81,7 +81,7 @@ public class FlattenTest implements Serializable {
@Test
@Category(RunnableOnService.class)
- public void testFlattenPCollectionList() {
+ public void testFlattenPCollections() {
List<List<String>> inputs = Arrays.asList(
LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
@@ -95,7 +95,7 @@ public class FlattenTest implements Serializable {
@Test
@Category(RunnableOnService.class)
- public void testFlattenPCollectionListThenParDo() {
+ public void testFlattenPCollectionsThenParDo() {
List<List<String>> inputs = Arrays.asList(
LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
@@ -110,7 +110,7 @@ public class FlattenTest implements Serializable {
@Test
@Category(RunnableOnService.class)
- public void testFlattenPCollectionListEmpty() {
+ public void testFlattenPCollectionsEmpty() {
PCollection<String> output =
PCollectionList.<String>empty(p)
.apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of());
@@ -198,7 +198,7 @@ public class FlattenTest implements Serializable {
@Test
@Category(RunnableOnService.class)
- public void testFlattenPCollectionListEmptyThenParDo() {
+ public void testFlattenPCollectionsEmptyThenParDo() {
PCollection<String> output =
PCollectionList.<String>empty(p)
.apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of())
@@ -366,8 +366,8 @@ public class FlattenTest implements Serializable {
@Test
public void testFlattenGetName() {
- Assert.assertEquals("Flatten.FlattenIterables", Flatten.<String>iterables().getName());
- Assert.assertEquals("Flatten.FlattenPCollectionList", Flatten.<String>pCollections().getName());
+ Assert.assertEquals("Flatten.Iterables", Flatten.<String>iterables().getName());
+ Assert.assertEquals("Flatten.PCollections", Flatten.<String>pCollections().getName());
}
/////////////////////////////////////////////////////////////////////////////
[2/9] beam git commit: BEAM-1420 GroupByKey should comply with
PTransform style guide
Posted by dh...@apache.org.
BEAM-1420 GroupByKey should comply with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7c60cc0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7c60cc0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7c60cc0
Branch: refs/heads/master
Commit: a7c60cc0b9d59d19398f788b84a17d4686ea3f82
Parents: 63b63f0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:17:21 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:36 2017 -0800
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/transforms/Combine.java | 9 +++++----
.../java/org/apache/beam/sdk/transforms/GroupByKey.java | 8 ++++----
.../java/org/apache/beam/sdk/transforms/GroupByKeyTest.java | 2 +-
3 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a7c60cc0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 51c5e71..b4626e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -43,7 +43,6 @@ import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -1877,9 +1876,11 @@ public class Combine {
@Override
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
return input
- .apply(GroupByKey.<K, InputT>create(fewKeys))
- .apply(Combine.<K, InputT, OutputT>groupedValues(fn, fnDisplayData)
- .withSideInputs(sideInputs));
+ .apply(
+ fewKeys ? GroupByKey.<K, InputT>createWithFewKeys() : GroupByKey.<K, InputT>create())
+ .apply(
+ Combine.<K, InputT, OutputT>groupedValues(fn, fnDisplayData)
+ .withSideInputs(sideInputs));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/a7c60cc0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 1541059..adf189b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -142,17 +142,17 @@ public class GroupByKey<K, V>
}
/**
- * Returns a {@code GroupByKey<K, V>} {@code PTransform}.
+ * Returns a {@code GroupByKey<K, V>} {@code PTransform} that assumes it will be grouping
+ * a small number of keys.
*
* @param <K> the type of the keys of the input and output
* {@code PCollection}s
* @param <V> the type of the values of the input {@code PCollection}
* and the elements of the {@code Iterable}s in the output
* {@code PCollection}
- * @param fewKeys whether it groups just few keys.
*/
- static <K, V> GroupByKey<K, V> create(boolean fewKeys) {
- return new GroupByKey<>(fewKeys);
+ static <K, V> GroupByKey<K, V> createWithFewKeys() {
+ return new GroupByKey<>(true);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a7c60cc0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index f4bec3a..73cedfd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -374,7 +374,7 @@ public class GroupByKeyTest {
@Test
public void testDisplayData() {
GroupByKey<String, String> groupByKey = GroupByKey.create();
- GroupByKey<String, String> groupByFewKeys = GroupByKey.create(true);
+ GroupByKey<String, String> groupByFewKeys = GroupByKey.createWithFewKeys();
DisplayData gbkDisplayData = DisplayData.from(groupByKey);
DisplayData fewKeysDisplayData = DisplayData.from(groupByFewKeys);
[4/9] beam git commit: BEAM-1423 Sample should comply with PTransform
style guide
Posted by dh...@apache.org.
BEAM-1423 Sample should comply with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8e2387d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8e2387d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8e2387d
Branch: refs/heads/master
Commit: a8e2387d37ac2df5c4627ba8f1a9b4ce1ef417b5
Parents: 0b9afe8
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:24:08 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Sample.java | 121 +++++++++++++------
.../beam/sdk/runners/TransformTreeTest.java | 19 +--
.../apache/beam/sdk/transforms/SampleTest.java | 2 +-
3 files changed, 95 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 7d4e630..3734f7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -38,9 +38,17 @@ import org.apache.beam.sdk.values.PCollectionView;
* {@code PTransform}s for taking samples of the elements in a
* {@code PCollection}, or samples of the values associated with each
* key in a {@code PCollection} of {@code KV}s.
+ *
+ * <p>{@link #combineFn} can also be used manually, in combination with state and with the
+ * {@link Combine} transform.
*/
public class Sample {
+ /** Returns a {@link CombineFn} that computes a fixed-sized sample of its inputs. */
+ public static <T> CombineFn<T, ?, Iterable<T>> combineFn(int sampleSize) {
+ return new FixedSizedSampleFn<>(sampleSize);
+ }
+
/**
* {@code Sample#any(long)} takes a {@code PCollection<T>} and a limit, and
* produces a new {@code PCollection<T>} containing up to limit
@@ -64,72 +72,65 @@ public class Sample {
* @param limit the number of elements to take from the input
*/
public static <T> PTransform<PCollection<T>, PCollection<T>> any(long limit) {
- return new SampleAny<>(limit);
+ return new Any<>(limit);
}
/**
- * Returns a {@code PTransform} that takes a {@code PCollection<T>},
- * selects {@code sampleSize} elements, uniformly at random, and returns a
- * {@code PCollection<Iterable<T>>} containing the selected elements.
- * If the input {@code PCollection} has fewer than
- * {@code sampleSize} elements, then the output {@code Iterable<T>}
- * will be all the input's elements.
+ * Returns a {@code PTransform} that takes a {@code PCollection<T>}, selects {@code sampleSize}
+ * elements, uniformly at random, and returns a {@code PCollection<Iterable<T>>} containing the
+ * selected elements. If the input {@code PCollection} has fewer than {@code sampleSize} elements,
+ * then the output {@code Iterable<T>} will be all the input's elements.
*
* <p>Example of use:
- * <pre> {@code
+ *
+ * <pre>{@code
* PCollection<String> pc = ...;
* PCollection<Iterable<String>> sampleOfSize10 =
* pc.apply(Sample.fixedSizeGlobally(10));
- * } </pre>
+ * }
+ * </pre>
*
* @param sampleSize the number of elements to select; must be {@code >= 0}
* @param <T> the type of the elements
*/
- public static <T> PTransform<PCollection<T>, PCollection<Iterable<T>>>
- fixedSizeGlobally(int sampleSize) {
- return Combine.globally(new FixedSizedSampleFn<T>(sampleSize));
+ public static <T> PTransform<PCollection<T>, PCollection<Iterable<T>>> fixedSizeGlobally(
+ int sampleSize) {
+ return new FixedSizeGlobally<>(sampleSize);
}
/**
- * Returns a {@code PTransform} that takes an input
- * {@code PCollection<KV<K, V>>} and returns a
- * {@code PCollection<KV<K, Iterable<V>>>} that contains an output
- * element mapping each distinct key in the input
- * {@code PCollection} to a sample of {@code sampleSize} values
- * associated with that key in the input {@code PCollection}, taken
- * uniformly at random. If a key in the input {@code PCollection}
- * has fewer than {@code sampleSize} values associated with it, then
- * the output {@code Iterable<V>} associated with that key will be
- * all the values associated with that key in the input
- * {@code PCollection}.
+ * Returns a {@code PTransform} that takes an input {@code PCollection<KV<K, V>>} and returns a
+ * {@code PCollection<KV<K, Iterable<V>>>} that contains an output element mapping each distinct
+ * key in the input {@code PCollection} to a sample of {@code sampleSize} values associated with
+ * that key in the input {@code PCollection}, taken uniformly at random. If a key in the input
+ * {@code PCollection} has fewer than {@code sampleSize} values associated with it, then the
+ * output {@code Iterable<V>} associated with that key will be all the values associated with that
+ * key in the input {@code PCollection}.
*
* <p>Example of use:
- * <pre> {@code
+ *
+ * <pre>{@code
* PCollection<KV<String, Integer>> pc = ...;
* PCollection<KV<String, Iterable<Integer>>> sampleOfSize10PerKey =
* pc.apply(Sample.<String, Integer>fixedSizePerKey());
- * } </pre>
+ * }
+ * </pre>
*
- * @param sampleSize the number of values to select for each
- * distinct key; must be {@code >= 0}
+ * @param sampleSize the number of values to select for each distinct key; must be {@code >= 0}
* @param <K> the type of the keys
* @param <V> the type of the values
*/
- public static <K, V> PTransform<PCollection<KV<K, V>>,
- PCollection<KV<K, Iterable<V>>>>
- fixedSizePerKey(int sampleSize) {
- return Combine.perKey(new FixedSizedSampleFn<V>(sampleSize));
+ public static <K, V>
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> fixedSizePerKey(
+ int sampleSize) {
+ return new FixedSizePerKey<>(sampleSize);
}
/////////////////////////////////////////////////////////////////////////////
- /**
- * A {@link PTransform} that takes a {@code PCollection<T>} and a limit, and
- * produces a new {@code PCollection<T>} containing up to limit
- * elements of the input {@code PCollection}.
- */
- public static class SampleAny<T> extends PTransform<PCollection<T>, PCollection<T>> {
+ /** Implementation of {@link #any(long)}. */
+ private static class Any<T> extends PTransform<PCollection<T>, PCollection<T>> {
private final long limit;
/**
@@ -137,7 +138,7 @@ public class Sample {
* produces a new PCollection containing up to {@code limit}
* elements of its input {@code PCollection}.
*/
- private SampleAny(long limit) {
+ private Any(long limit) {
checkArgument(limit >= 0, "Expected non-negative limit, received %s.", limit);
this.limit = limit;
}
@@ -162,6 +163,50 @@ public class Sample {
}
}
+ /** Implementation of {@link #fixedSizeGlobally(int)}. */
+ private static class FixedSizeGlobally<T>
+ extends PTransform<PCollection<T>, PCollection<Iterable<T>>> {
+ private final int sampleSize;
+
+ private FixedSizeGlobally(int sampleSize) {
+ this.sampleSize = sampleSize;
+ }
+
+ @Override
+ public PCollection<Iterable<T>> expand(PCollection<T> input) {
+ return input.apply(Combine.globally(new FixedSizedSampleFn<T>(sampleSize)));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("sampleSize", sampleSize)
+ .withLabel("Sample Size"));
+ }
+ }
+
+ /** Implementation of {@link #fixedSizeGlobally(int)}. */
+ private static class FixedSizePerKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+ private final int sampleSize;
+
+ private FixedSizePerKey(int sampleSize) {
+ this.sampleSize = sampleSize;
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ return input.apply(Combine.<K, V, Iterable<V>>perKey(new FixedSizedSampleFn<V>(sampleSize)));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("sampleSize", sampleSize)
+ .withLabel("Sample Size"));
+ }
+ }
+
/**
* A {@link DoFn} that returns up to limit elements from the side input PCollection.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 6a6e0fc..1f9beb3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -34,8 +34,10 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -62,7 +64,7 @@ public class TransformTreeTest {
enum TransformsSeen {
READ,
WRITE,
- SAMPLE_ANY
+ COMBINE_GLOBALLY
}
/**
@@ -120,7 +122,8 @@ public class TransformTreeTest {
File outputFile = tmpFolder.newFile();
p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
- .apply(Sample.<String>any(10))
+ .apply(Combine.globally(Sample.<String>combineFn(10)))
+ .apply(Flatten.<String>iterables())
.apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
final EnumSet<TransformsSeen> visited =
@@ -132,8 +135,8 @@ public class TransformTreeTest {
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
PTransform<?, ?> transform = node.getTransform();
- if (transform instanceof Sample.SampleAny) {
- assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
+ if (transform instanceof Combine.Globally) {
+ assertTrue(visited.add(TransformsSeen.COMBINE_GLOBALLY));
assertNotNull(node.getEnclosingNode());
assertTrue(node.isCompositeNode());
} else if (transform instanceof Write.Bound) {
@@ -148,8 +151,8 @@ public class TransformTreeTest {
@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
PTransform<?, ?> transform = node.getTransform();
- if (transform instanceof Sample.SampleAny) {
- assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
+ if (transform instanceof Combine.Globally) {
+ assertTrue(left.add(TransformsSeen.COMBINE_GLOBALLY));
}
}
@@ -157,7 +160,7 @@ public class TransformTreeTest {
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
PTransform<?, ?> transform = node.getTransform();
// Pick is a composite, should not be visited here.
- assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
+ assertThat(transform, not(instanceOf(Combine.Globally.class)));
assertThat(transform, not(instanceOf(Write.Bound.class)));
if (transform instanceof Read.Bounded
&& node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
@@ -167,7 +170,7 @@ public class TransformTreeTest {
});
assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class)));
- assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY)));
+ assertTrue(left.equals(EnumSet.of(TransformsSeen.COMBINE_GLOBALLY)));
}
@Test(expected = IllegalArgumentException.class)
http://git-wip-us.apache.org/repos/asf/beam/blob/a8e2387d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 88b0145..8e426c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -318,7 +318,7 @@ public class SampleTest {
@Test
public void testSampleGetName() {
- assertEquals("Sample.SampleAny", Sample.<String>any(1).getName());
+ assertEquals("Sample.Any", Sample.<String>any(1).getName());
}
@Test
[8/9] beam git commit: BEAM-1416 Write transform should comply with
PTransform style guide
Posted by dh...@apache.org.
BEAM-1416 Write transform should comply with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b87621e9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b87621e9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b87621e9
Branch: refs/heads/master
Commit: b87621e9533e772f92a3cf4325299350eb615b62
Parents: fffd2c5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:44:40 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800
----------------------------------------------------------------------
.../core/construction/PTransformMatchers.java | 4 +-
.../construction/PTransformMatchersTest.java | 10 +-
.../direct/WriteWithShardingFactory.java | 9 +-
.../direct/WriteWithShardingFactoryTest.java | 2 +-
.../FlinkStreamingTransformTranslators.java | 6 +-
.../beam/runners/flink/WriteSinkITCase.java | 2 +-
.../beam/runners/dataflow/DataflowRunner.java | 13 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 2 +-
.../main/java/org/apache/beam/sdk/io/Write.java | 727 +++++++++----------
.../java/org/apache/beam/sdk/io/WriteTest.java | 16 +-
.../beam/sdk/runners/TransformTreeTest.java | 4 +-
12 files changed, 394 insertions(+), 403 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 05b632b..7f8d467 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -182,8 +182,8 @@ public class PTransformMatchers {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
- if (application.getTransform() instanceof Write.Bound) {
- return ((Write.Bound) application.getTransform()).getSharding() == null;
+ if (application.getTransform() instanceof Write) {
+ return ((Write) application.getTransform()).getSharding() == null;
}
return false;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index cace033..be3ed6b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -387,7 +387,7 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void writeWithRunnerDeterminedSharding() {
- Write.Bound<Integer> write =
+ Write<Integer> write =
Write.to(
new FileBasedSink<Integer>("foo", "bar") {
@Override
@@ -400,13 +400,13 @@ public class PTransformMatchersTest implements Serializable {
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
is(true));
- Write.Bound<Integer> withStaticSharding = write.withNumShards(3);
+ Write<Integer> withStaticSharding = write.withNumShards(3);
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withStaticSharding)),
is(false));
- Write.Bound<Integer> withCustomSharding =
+ Write<Integer> withCustomSharding =
write.withSharding(Sum.integersGlobally().asSingletonView());
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
@@ -414,8 +414,8 @@ public class PTransformMatchersTest implements Serializable {
is(false));
}
- private AppliedPTransform<?, ?, ?> appliedWrite(Write.Bound<Integer> write) {
- return AppliedPTransform.<PCollection<Integer>, PDone, Write.Bound<Integer>>of(
+ private AppliedPTransform<?, ?, ?> appliedWrite(Write<Integer> write) {
+ return AppliedPTransform.<PCollection<Integer>, PDone, Write<Integer>>of(
"Write",
Collections.<TaggedPValue>emptyList(),
Collections.<TaggedPValue>emptyList(),
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index f206fb0..63122fe 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.io.Write.Bound;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
@@ -48,13 +47,15 @@ import org.apache.beam.sdk.values.TaggedPValue;
* of shards is the log base 10 of the number of input records, with up to 2 additional shards.
*/
class WriteWithShardingFactory<InputT>
- implements PTransformOverrideFactory<PCollection<InputT>, PDone, Bound<InputT>> {
+ implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write<InputT>> {
static final int MAX_RANDOM_EXTRA_SHARDS = 3;
@VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;
@Override
- public PTransform<PCollection<InputT>, PDone> getReplacementTransform(Bound<InputT> transform) {
- return transform.withSharding(new LogElementShardsWithDrift<InputT>());
+ public PTransform<PCollection<InputT>, PDone> getReplacementTransform(
+ Write<InputT> transform) {
+
+ return transform.withSharding(new LogElementShardsWithDrift<InputT>());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 51f3a87..16b6312 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -117,7 +117,7 @@ public class WriteWithShardingFactoryTest {
@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
- Write.Bound<Object> original = Write.to(new TestSink());
+ Write<Object> original = Write.to(new TestSink());
assertThat(factory.getReplacementTransform(original), not(equalTo((Object) original)));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index eaab3d1..a3cceb2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -118,7 +118,7 @@ class FlinkStreamingTransformTranslators {
static {
TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
- TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+ TRANSLATORS.put(Write.class, new WriteSinkStreamingTranslator());
TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
@@ -194,10 +194,10 @@ class FlinkStreamingTransformTranslators {
}
private static class WriteSinkStreamingTranslator<T>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write<T>> {
@Override
- public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
+ public void translateNode(Write<T> transform, FlinkStreamingTranslationContext context) {
String name = transform.getName();
PValue input = context.getInput(transform);
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 6986663..572c291 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.test.util.JavaProgramTestBase;
/**
- * Tests the translation of custom Write.Bound sinks.
+ * Tests the translation of custom Write sinks.
*/
public class WriteSinkITCase extends JavaProgramTestBase {
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0fe3a89..dbf1958 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -86,7 +86,6 @@ import org.apache.beam.sdk.io.PubsubUnboundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.io.Write.Bound;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -367,7 +366,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
"The DataflowRunner in batch mode does not support Read.Unbounded"));
ptoverrides
// Write uses views internally
- .put(PTransformMatchers.classEqualTo(Write.Bound.class), new BatchWriteFactory(this))
+ .put(PTransformMatchers.classEqualTo(Write.class), new BatchWriteFactory(this))
.put(
PTransformMatchers.classEqualTo(View.AsMap.class),
new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsMap.class, this))
@@ -772,14 +771,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
private class BatchWriteFactory<T>
- implements PTransformOverrideFactory<PCollection<T>, PDone, Write.Bound<T>> {
+ implements PTransformOverrideFactory<PCollection<T>, PDone, Write<T>> {
private final DataflowRunner runner;
private BatchWriteFactory(DataflowRunner dataflowRunner) {
this.runner = dataflowRunner;
}
@Override
- public PTransform<PCollection<T>, PDone> getReplacementTransform(Bound<T> transform) {
+ public PTransform<PCollection<T>, PDone> getReplacementTransform(Write<T> transform) {
return new BatchWrite<>(runner, transform);
}
@@ -797,17 +796,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
/**
* Specialized implementation which overrides
- * {@link org.apache.beam.sdk.io.Write.Bound Write.Bound} to provide Google
+ * {@link org.apache.beam.sdk.io.Write Write} to provide Google
* Cloud Dataflow specific path validation of {@link FileBasedSink}s.
*/
private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> {
private final DataflowRunner runner;
- private final Write.Bound<T> transform;
+ private final Write<T> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public BatchWrite(DataflowRunner runner, Write.Bound<T> transform) {
+ public BatchWrite(DataflowRunner runner, Write<T> transform) {
this.runner = runner;
this.transform = transform;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 388d9f0..67a4381 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -804,7 +804,7 @@ public class AvroIO {
throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
}
- org.apache.beam.sdk.io.Write.Bound<T> write =
+ org.apache.beam.sdk.io.Write<T> write =
org.apache.beam.sdk.io.Write.to(
new AvroSink<>(
filenamePrefix,
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 6e23a28..fe8d0fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -662,7 +662,7 @@ public class TextIO {
throw new IllegalStateException(
"need to set the filename prefix of a TextIO.Write transform");
}
- org.apache.beam.sdk.io.Write.Bound<String> write =
+ org.apache.beam.sdk.io.Write<String> write =
org.apache.beam.sdk.io.Write.to(
new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
writableByteChannelFactory));
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index acbbb97..948a65b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -62,7 +62,7 @@ import org.slf4j.LoggerFactory;
* <p>By default, every bundle in the input {@link PCollection} will be processed by a
* {@link WriteOperation}, so the number of outputs will vary based on runner behavior, though at
* least 1 output will always be produced. The exact parallelism of the write stage can be
- * controlled using {@link Write.Bound#withNumShards}, typically used to control how many files are
+ * controlled using {@link Write#withNumShards}, typically used to control how many files are
* produced or to globally limit the number of workers connecting to an external service. However,
* this option can often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
*
@@ -78,421 +78,412 @@ import org.slf4j.LoggerFactory;
* <pre>{@code p.apply(Write.to(new MySink(...)).withNumShards(3));}</pre>
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
-public class Write {
+public class Write<T> extends PTransform<PCollection<T>, PDone> {
private static final Logger LOG = LoggerFactory.getLogger(Write.class);
+ private final Sink<T> sink;
+ @Nullable
+ private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
+
/**
* Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
* control how many different shards are produced.
*/
- public static <T> Bound<T> to(Sink<T> sink) {
+ public static <T> Write<T> to(Sink<T> sink) {
checkNotNull(sink, "sink");
- return new Bound<>(sink, null /* runner-determined sharding */);
+ return new Write<>(sink, null /* runner-determined sharding */);
}
- /**
- * A {@link PTransform} that writes to a {@link Sink}. See the class-level Javadoc for more
- * information.
- *
- * @see Write
- * @see Sink
- */
- public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
- private final Sink<T> sink;
- @Nullable
- private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
-
- private Bound(
- Sink<T> sink,
- @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards) {
- this.sink = sink;
- this.computeNumShards = computeNumShards;
- }
+ private Write(
+ Sink<T> sink,
+ @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards) {
+ this.sink = sink;
+ this.computeNumShards = computeNumShards;
+ }
- @Override
- public PDone expand(PCollection<T> input) {
- checkArgument(
- IsBounded.BOUNDED == input.isBounded(),
- "%s can only be applied to a Bounded PCollection",
- Write.class.getSimpleName());
- PipelineOptions options = input.getPipeline().getOptions();
- sink.validate(options);
- return createWrite(input, sink.createWriteOperation(options));
- }
+ @Override
+ public PDone expand(PCollection<T> input) {
+ checkArgument(
+ IsBounded.BOUNDED == input.isBounded(),
+ "%s can only be applied to a Bounded PCollection",
+ Write.class.getSimpleName());
+ PipelineOptions options = input.getPipeline().getOptions();
+ sink.validate(options);
+ return createWrite(input, sink.createWriteOperation(options));
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
- .include("sink", sink);
- if (getSharding() != null) {
- builder.include("sharding", getSharding());
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
+ .include("sink", sink);
+ if (getSharding() != null) {
+ builder.include("sharding", getSharding());
}
+ }
- /**
- * Returns the {@link Sink} associated with this PTransform.
- */
- public Sink<T> getSink() {
- return sink;
- }
+ /**
+ * Returns the {@link Sink} associated with this PTransform.
+ */
+ public Sink<T> getSink() {
+ return sink;
+ }
- /**
- * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
- * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
- * {@link #withSharding(PTransform)}), or runner-determined (by {@link
- * #withRunnerDeterminedSharding()}.
- */
- @Nullable
- public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
- return computeNumShards;
- }
+ /**
+ * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
+ * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
+ * {@link #withSharding(PTransform)}), or runner-determined (by {@link
+ * #withRunnerDeterminedSharding()}.
+ */
+ @Nullable
+ public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
+ return computeNumShards;
+ }
- /**
- * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
- * specified number of shards.
- *
- * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
- * more information.
- *
- * <p>A value less than or equal to 0 will be equivalent to the default behavior of
- * runner-determined sharding.
- */
- public Bound<T> withNumShards(int numShards) {
- if (numShards > 0) {
- return withNumShards(StaticValueProvider.of(numShards));
- }
- return withRunnerDeterminedSharding();
+ /**
+ * Returns a new {@link Write} that will write to the current {@link Sink} using the
+ * specified number of shards.
+ *
+ * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+ * more information.
+ *
+ * <p>A value less than or equal to 0 will be equivalent to the default behavior of
+ * runner-determined sharding.
+ */
+ public Write<T> withNumShards(int numShards) {
+ if (numShards > 0) {
+ return withNumShards(StaticValueProvider.of(numShards));
}
+ return withRunnerDeterminedSharding();
+ }
- /**
- * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
- * {@link ValueProvider} specified number of shards.
- *
- * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
- * more information.
- */
- public Bound<T> withNumShards(ValueProvider<Integer> numShards) {
- return new Bound<>(sink, new ConstantShards<T>(numShards));
- }
+ /**
+ * Returns a new {@link Write} that will write to the current {@link Sink} using the
+ * {@link ValueProvider} specified number of shards.
+ *
+ * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+ * more information.
+ */
+ public Write<T> withNumShards(ValueProvider<Integer> numShards) {
+ return new Write<>(sink, new ConstantShards<T>(numShards));
+ }
- /**
- * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
- * specified {@link PTransform} to compute the number of shards.
- *
- * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
- * more information.
- */
- public Bound<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
- checkNotNull(
- sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
- return new Bound<>(sink, sharding);
- }
+ /**
+ * Returns a new {@link Write} that will write to the current {@link Sink} using the
+ * specified {@link PTransform} to compute the number of shards.
+ *
+ * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+ * more information.
+ */
+ public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
+ checkNotNull(
+ sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
+ return new Write<>(sink, sharding);
+ }
+
+ /**
+ * Returns a new {@link Write} that will write to the current {@link Sink} with
+ * runner-determined sharding.
+ */
+ public Write<T> withRunnerDeterminedSharding() {
+ return new Write<>(sink, null);
+ }
- /**
- * Returns a new {@link Write.Bound} that will write to the current {@link Sink} with
- * runner-determined sharding.
- */
- public Bound<T> withRunnerDeterminedSharding() {
- return new Bound<>(sink, null);
+ /**
+ * Writes all the elements in a bundle using a {@link Writer} produced by the
+ * {@link WriteOperation} associated with the {@link Sink}.
+ */
+ private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
+ // Writer that will write the records in this bundle. Lazily
+ // initialized in processElement.
+ private Writer<T, WriteT> writer = null;
+ private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+
+ WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+ this.writeOperationView = writeOperationView;
}
- /**
- * Writes all the elements in a bundle using a {@link Writer} produced by the
- * {@link WriteOperation} associated with the {@link Sink}.
- */
- private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
- // Writer that will write the records in this bundle. Lazily
- // initialized in processElement.
- private Writer<T, WriteT> writer = null;
- private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-
- WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
- this.writeOperationView = writeOperationView;
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ // Lazily initialize the Writer
+ if (writer == null) {
+ WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+ LOG.info("Opening writer for write operation {}", writeOperation);
+ writer = writeOperation.createWriter(c.getPipelineOptions());
+ writer.open(UUID.randomUUID().toString());
+ LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
}
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- // Lazily initialize the Writer
- if (writer == null) {
- WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
- LOG.info("Opening writer for write operation {}", writeOperation);
- writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.open(UUID.randomUUID().toString());
- LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
- }
+ try {
+ writer.write(c.element());
+ } catch (Exception e) {
+ // Discard write result and close the write.
try {
- writer.write(c.element());
- } catch (Exception e) {
- // Discard write result and close the write.
- try {
- writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused.
- } catch (Exception closeException) {
- if (closeException instanceof InterruptedException) {
- // Do not silently ignore interrupted state.
- Thread.currentThread().interrupt();
- }
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
+ writer.close();
+ // The writer does not need to be reset, as this DoFn cannot be reused.
+ } catch (Exception closeException) {
+ if (closeException instanceof InterruptedException) {
+ // Do not silently ignore interrupted state.
+ Thread.currentThread().interrupt();
}
- throw e;
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
}
+ throw e;
}
+ }
- @FinishBundle
- public void finishBundle(Context c) throws Exception {
- if (writer != null) {
- WriteT result = writer.close();
- c.output(result);
- // Reset state in case of reuse.
- writer = null;
- }
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ if (writer != null) {
+ WriteT result = writer.close();
+ c.output(result);
+ // Reset state in case of reuse.
+ writer = null;
}
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(Write.Bound.this);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.delegate(Write.this);
}
+ }
- /**
- * Like {@link WriteBundles}, but where the elements for each shard have been collected into
- * a single iterable.
- *
- * @see WriteBundles
- */
- private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
- private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-
- WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
- this.writeOperationView = writeOperationView;
- }
+ /**
+ * Like {@link WriteBundles}, but where the elements for each shard have been collected into
+ * a single iterable.
+ *
+ * @see WriteBundles
+ */
+ private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
+ private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- // In a sharded write, single input element represents one shard. We can open and close
- // the writer in each call to processElement.
- WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
- LOG.info("Opening writer for write operation {}", writeOperation);
- Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.open(UUID.randomUUID().toString());
- LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+ WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+ this.writeOperationView = writeOperationView;
+ }
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ // In a sharded write, single input element represents one shard. We can open and close
+ // the writer in each call to processElement.
+ WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+ LOG.info("Opening writer for write operation {}", writeOperation);
+ Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+ writer.open(UUID.randomUUID().toString());
+ LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+
+ try {
+ for (T t : c.element().getValue()) {
+ writer.write(t);
+ }
+ } catch (Exception e) {
try {
- for (T t : c.element().getValue()) {
- writer.write(t);
+ writer.close();
+ } catch (Exception closeException) {
+ if (closeException instanceof InterruptedException) {
+ // Do not silently ignore interrupted state.
+ Thread.currentThread().interrupt();
}
- } catch (Exception e) {
- try {
- writer.close();
- } catch (Exception closeException) {
- if (closeException instanceof InterruptedException) {
- // Do not silently ignore interrupted state.
- Thread.currentThread().interrupt();
- }
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
- }
- throw e;
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
}
-
- // Close the writer; if this throws let the error propagate.
- WriteT result = writer.close();
- c.output(result);
+ throw e;
}
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(Write.Bound.this);
- }
+ // Close the writer; if this throws let the error propagate.
+ WriteT result = writer.close();
+ c.output(result);
}
- private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
- private final PCollectionView<Integer> numShards;
- private int shardNumber;
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.delegate(Write.this);
+ }
+ }
- ApplyShardingKey(PCollectionView<Integer> numShards) {
- this.numShards = numShards;
- shardNumber = -1;
- }
+ private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
+ private final PCollectionView<Integer> numShards;
+ private int shardNumber;
- @ProcessElement
- public void processElement(ProcessContext context) {
- Integer shardCount = context.sideInput(numShards);
- checkArgument(
- shardCount > 0,
- "Must have a positive number of shards specified for non-runner-determined sharding."
- + " Got %s",
- shardCount);
- if (shardNumber == -1) {
- // We want to desynchronize the first record sharding key for each instance of
- // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
- shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
- } else {
- shardNumber = (shardNumber + 1) % shardCount;
- }
- context.output(KV.of(shardNumber, context.element()));
- }
+ ApplyShardingKey(PCollectionView<Integer> numShards) {
+ this.numShards = numShards;
+ shardNumber = -1;
}
- /**
- * A write is performed as sequence of three {@link ParDo}'s.
- *
- * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
- * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
- * called. The output of this ParDo is a singleton PCollection
- * containing the WriteOperation.
- *
- * <p>This singleton collection containing the WriteOperation is then used as a side input to a
- * ParDo over the PCollection of elements to write. In this bundle-writing phase,
- * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
- * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
- * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
- * every element in the bundle. The output of this ParDo is a PCollection of
- * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
- * each bundle.
- *
- * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
- * the collection of writer results as a side-input. In this ParDo,
- * {@link WriteOperation#finalize} is called to finalize the write.
- *
- * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
- * before the exception that caused the write to fail is propagated and the write result will be
- * discarded.
- *
- * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
- * deserialized in the bundle-writing and finalization phases, any state change to the
- * WriteOperation object that occurs during initialization is visible in the latter phases.
- * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
- * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
- * WriteOperation).
- */
- private <WriteT> PDone createWrite(
- PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
- Pipeline p = input.getPipeline();
-
- // A coder to use for the WriteOperation.
- @SuppressWarnings("unchecked")
- Coder<WriteOperation<T, WriteT>> operationCoder =
- (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
-
- // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
- // the sink.
- PCollection<WriteOperation<T, WriteT>> operationCollection =
- p.apply(Create.of(writeOperation).withCoder(operationCoder));
-
- // Initialize the resource in a do-once ParDo on the WriteOperation.
- operationCollection = operationCollection
- .apply("Initialize", ParDo.of(
- new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- WriteOperation<T, WriteT> writeOperation = c.element();
- LOG.info("Initializing write operation {}", writeOperation);
- writeOperation.initialize(c.getPipelineOptions());
- LOG.debug("Done initializing write operation {}", writeOperation);
- // The WriteOperation is also the output of this ParDo, so it can have mutable
- // state.
- c.output(writeOperation);
- }
- }))
- .setCoder(operationCoder);
-
- // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
- final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
- operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
-
- // Re-window the data into the global window and remove any existing triggers.
- PCollection<T> inputInGlobalWindow =
- input.apply(
- Window.<T>into(new GlobalWindows())
- .triggering(DefaultTrigger.of())
- .discardingFiredPanes());
-
- // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
- // as a side input) and collect the results of the writes in a PCollection.
- // There is a dependency between this ParDo and the first (the WriteOperation PCollection
- // as a side input), so this will happen after the initial ParDo.
- PCollection<WriteT> results;
- final PCollectionView<Integer> numShards;
- if (computeNumShards == null) {
- numShards = null;
- results =
- inputInGlobalWindow.apply(
- "WriteBundles",
- ParDo.of(new WriteBundles<>(writeOperationView))
- .withSideInputs(writeOperationView));
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ Integer shardCount = context.sideInput(numShards);
+ checkArgument(
+ shardCount > 0,
+ "Must have a positive number of shards specified for non-runner-determined sharding."
+ + " Got %s",
+ shardCount);
+ if (shardNumber == -1) {
+ // We want to desynchronize the first record sharding key for each instance of
+ // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
+ shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
} else {
- numShards = inputInGlobalWindow.apply(computeNumShards);
- results =
- inputInGlobalWindow
- .apply(
- "ApplyShardLabel",
- ParDo.of(new ApplyShardingKey<T>(numShards)).withSideInputs(numShards))
- .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
- .apply(
- "WriteShardedBundles",
- ParDo.of(new WriteShardedBundles<>(writeOperationView))
- .withSideInputs(writeOperationView));
- }
- results.setCoder(writeOperation.getWriterResultCoder());
-
- final PCollectionView<Iterable<WriteT>> resultsView =
- results.apply(View.<WriteT>asIterable());
-
- // Finalize the write in another do-once ParDo on the singleton collection containing the
- // Writer. The results from the per-bundle writes are given as an Iterable side input.
- // The WriteOperation's state is the same as after its initialization in the first do-once
- // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
- // collection as a side input), so it will happen after the parallel write.
- ImmutableList.Builder<PCollectionView<?>> sideInputs =
- ImmutableList.<PCollectionView<?>>builder().add(resultsView);
- if (numShards != null) {
- sideInputs.add(numShards);
+ shardNumber = (shardNumber + 1) % shardCount;
}
- operationCollection
- .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- WriteOperation<T, WriteT> writeOperation = c.element();
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
- LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
- // We must always output at least 1 shard, and honor user-specified numShards if set.
- int minShardsNeeded;
- if (numShards == null) {
- minShardsNeeded = 1;
- } else {
- minShardsNeeded = c.sideInput(numShards);
- checkArgument(
- minShardsNeeded > 0,
- "Must have a positive number of shards for non-runner-determined sharding."
- + " Got %s",
- minShardsNeeded);
- }
- int extraShardsNeeded = minShardsNeeded - results.size();
- if (extraShardsNeeded > 0) {
- LOG.info(
- "Creating {} empty output shards in addition to {} written for a total of {}.",
- extraShardsNeeded, results.size(), minShardsNeeded);
- for (int i = 0; i < extraShardsNeeded; ++i) {
- Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.open(UUID.randomUUID().toString());
- WriteT emptyWrite = writer.close();
- results.add(emptyWrite);
- }
- LOG.debug("Done creating extra shards.");
- }
+ context.output(KV.of(shardNumber, context.element()));
+ }
+ }
- writeOperation.finalize(results, c.getPipelineOptions());
- LOG.debug("Done finalizing write operation {}", writeOperation);
- }
- }).withSideInputs(sideInputs.build()));
- return PDone.in(input.getPipeline());
+ /**
+ * A write is performed as sequence of three {@link ParDo}'s.
+ *
+ * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
+ * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
+ * called. The output of this ParDo is a singleton PCollection
+ * containing the WriteOperation.
+ *
+ * <p>This singleton collection containing the WriteOperation is then used as a side input to a
+ * ParDo over the PCollection of elements to write. In this bundle-writing phase,
+ * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
+ * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn.StartBundle} and
+ * {@link DoFn.FinishBundle}, respectively, and {@link Writer#write} method is called for
+ * every element in the bundle. The output of this ParDo is a PCollection of
+ * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
+ * each bundle.
+ *
+ * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
+ * the collection of writer results as a side-input. In this ParDo,
+ * {@link WriteOperation#finalize} is called to finalize the write.
+ *
+ * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
+ * before the exception that caused the write to fail is propagated and the write result will be
+ * discarded.
+ *
+ * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
+ * deserialized in the bundle-writing and finalization phases, any state change to the
+ * WriteOperation object that occurs during initialization is visible in the latter phases.
+ * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
+ * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
+ * WriteOperation).
+ */
+ private <WriteT> PDone createWrite(
+ PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
+ Pipeline p = input.getPipeline();
+
+ // A coder to use for the WriteOperation.
+ @SuppressWarnings("unchecked")
+ Coder<WriteOperation<T, WriteT>> operationCoder =
+ (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
+
+ // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
+ // the sink.
+ PCollection<WriteOperation<T, WriteT>> operationCollection =
+ p.apply(Create.of(writeOperation).withCoder(operationCoder));
+
+ // Initialize the resource in a do-once ParDo on the WriteOperation.
+ operationCollection = operationCollection
+ .apply("Initialize", ParDo.of(
+ new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ WriteOperation<T, WriteT> writeOperation = c.element();
+ LOG.info("Initializing write operation {}", writeOperation);
+ writeOperation.initialize(c.getPipelineOptions());
+ LOG.debug("Done initializing write operation {}", writeOperation);
+ // The WriteOperation is also the output of this ParDo, so it can have mutable
+ // state.
+ c.output(writeOperation);
+ }
+ }))
+ .setCoder(operationCoder);
+
+ // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
+ final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
+ operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
+
+ // Re-window the data into the global window and remove any existing triggers.
+ PCollection<T> inputInGlobalWindow =
+ input.apply(
+ Window.<T>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+
+ // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
+ // as a side input) and collect the results of the writes in a PCollection.
+ // There is a dependency between this ParDo and the first (the WriteOperation PCollection
+ // as a side input), so this will happen after the initial ParDo.
+ PCollection<WriteT> results;
+ final PCollectionView<Integer> numShards;
+ if (computeNumShards == null) {
+ numShards = null;
+ results =
+ inputInGlobalWindow.apply(
+ "WriteBundles",
+ ParDo.of(new WriteBundles<>(writeOperationView))
+ .withSideInputs(writeOperationView));
+ } else {
+ numShards = inputInGlobalWindow.apply(computeNumShards);
+ results =
+ inputInGlobalWindow
+ .apply(
+ "ApplyShardLabel",
+ ParDo.of(new ApplyShardingKey<T>(numShards)).withSideInputs(numShards))
+ .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+ .apply(
+ "WriteShardedBundles",
+ ParDo.of(new WriteShardedBundles<>(writeOperationView))
+ .withSideInputs(writeOperationView));
}
+ results.setCoder(writeOperation.getWriterResultCoder());
+
+ final PCollectionView<Iterable<WriteT>> resultsView =
+ results.apply(View.<WriteT>asIterable());
+
+ // Finalize the write in another do-once ParDo on the singleton collection containing the
+ // Writer. The results from the per-bundle writes are given as an Iterable side input.
+ // The WriteOperation's state is the same as after its initialization in the first do-once
+ // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
+ // collection as a side input), so it will happen after the parallel write.
+ ImmutableList.Builder<PCollectionView<?>> sideInputs =
+ ImmutableList.<PCollectionView<?>>builder().add(resultsView);
+ if (numShards != null) {
+ sideInputs.add(numShards);
+ }
+ operationCollection
+ .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ WriteOperation<T, WriteT> writeOperation = c.element();
+ LOG.info("Finalizing write operation {}.", writeOperation);
+ List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
+ LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+ // We must always output at least 1 shard, and honor user-specified numShards if set.
+ int minShardsNeeded;
+ if (numShards == null) {
+ minShardsNeeded = 1;
+ } else {
+ minShardsNeeded = c.sideInput(numShards);
+ checkArgument(
+ minShardsNeeded > 0,
+ "Must have a positive number of shards for non-runner-determined sharding."
+ + " Got %s",
+ minShardsNeeded);
+ }
+ int extraShardsNeeded = minShardsNeeded - results.size();
+ if (extraShardsNeeded > 0) {
+ LOG.info(
+ "Creating {} empty output shards in addition to {} written for a total of {}.",
+ extraShardsNeeded, results.size(), minShardsNeeded);
+ for (int i = 0; i < extraShardsNeeded; ++i) {
+ Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+ writer.open(UUID.randomUUID().toString());
+ WriteT emptyWrite = writer.close();
+ results.add(emptyWrite);
+ }
+ LOG.debug("Done creating extra shards.");
+ }
+
+ writeOperation.finalize(results, c.getPipelineOptions());
+ LOG.debug("Done finalizing write operation {}", writeOperation);
+ }
+ }).withSideInputs(sideInputs.build()));
+ return PDone.in(input.getPipeline());
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index fd349e2..80f6f66 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -210,7 +210,7 @@ public class WriteTest {
}
TestSink sink = new TestSink();
- Write.Bound<String> write = Write.to(sink).withSharding(new LargestInt());
+ Write<String> write = Write.to(sink).withSharding(new LargestInt());
p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
.apply(IDENTITY_MAP)
.apply(write);
@@ -307,7 +307,7 @@ public class WriteTest {
@Test
public void testBuildWrite() {
Sink<String> sink = new TestSink() {};
- Write.Bound<String> write = Write.to(sink).withNumShards(3);
+ Write<String> write = Write.to(sink).withNumShards(3);
assertThat(write.getSink(), is(sink));
PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
write.getSharding();
@@ -315,12 +315,12 @@ public class WriteTest {
assertThat(((ConstantShards<String>) write.getSharding()).getNumShards().get(), equalTo(3));
assertThat(write.getSharding(), equalTo(originalSharding));
- Write.Bound<String> write2 = write.withSharding(SHARDING_TRANSFORM);
+ Write<String> write2 = write.withSharding(SHARDING_TRANSFORM);
assertThat(write2.getSink(), is(sink));
assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM));
// original unchanged
- Write.Bound<String> writeUnsharded = write2.withRunnerDeterminedSharding();
+ Write<String> writeUnsharded = write2.withRunnerDeterminedSharding();
assertThat(writeUnsharded.getSharding(), nullValue());
assertThat(write.getSharding(), equalTo(originalSharding));
}
@@ -333,7 +333,7 @@ public class WriteTest {
builder.add(DisplayData.item("foo", "bar"));
}
};
- Write.Bound<String> write = Write.to(sink);
+ Write<String> write = Write.to(sink);
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
@@ -348,7 +348,7 @@ public class WriteTest {
builder.add(DisplayData.item("foo", "bar"));
}
};
- Write.Bound<String> write = Write.to(sink).withNumShards(1);
+ Write<String> write = Write.to(sink).withNumShards(1);
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
assertThat(displayData, includesDisplayDataFor("sink", sink));
@@ -363,7 +363,7 @@ public class WriteTest {
builder.add(DisplayData.item("foo", "bar"));
}
};
- Write.Bound<String> write =
+ Write<String> write =
Write.to(sink)
.withSharding(
new PTransform<PCollection<String>, PCollectionView<Integer>>() {
@@ -433,7 +433,7 @@ public class WriteTest {
}
TestSink sink = new TestSink();
- Write.Bound<String> write = Write.to(sink);
+ Write<String> write = Write.to(sink);
if (numConfiguredShards.isPresent()) {
write = write.withNumShards(numConfiguredShards.get());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 1f9beb3..53bc114 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -139,7 +139,7 @@ public class TransformTreeTest {
assertTrue(visited.add(TransformsSeen.COMBINE_GLOBALLY));
assertNotNull(node.getEnclosingNode());
assertTrue(node.isCompositeNode());
- } else if (transform instanceof Write.Bound) {
+ } else if (transform instanceof Write) {
assertTrue(visited.add(TransformsSeen.WRITE));
assertNotNull(node.getEnclosingNode());
assertTrue(node.isCompositeNode());
@@ -161,7 +161,7 @@ public class TransformTreeTest {
PTransform<?, ?> transform = node.getTransform();
// Pick is a composite, should not be visited here.
assertThat(transform, not(instanceOf(Combine.Globally.class)));
- assertThat(transform, not(instanceOf(Write.Bound.class)));
+ assertThat(transform, not(instanceOf(Write.class)));
if (transform instanceof Read.Bounded
&& node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
assertTrue(visited.add(TransformsSeen.READ));
[5/9] beam git commit: BEAM-1424 ToString should comply with
PTransform style guide
Posted by dh...@apache.org.
BEAM-1424 ToString should comply with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f34caa7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f34caa7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f34caa7
Branch: refs/heads/master
Commit: 4f34caa72182a0ae10df7b43c820195280c22c0c
Parents: a8e2387
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:34:31 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/ToString.java | 109 ++++++++-----------
.../beam/sdk/transforms/ToStringTest.java | 8 +-
2 files changed, 50 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4f34caa7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
index 5069a3c..e5a8d38 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
@@ -18,8 +18,7 @@
package org.apache.beam.sdk.transforms;
-import java.util.Iterator;
-
+import com.google.common.base.Joiner;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -38,74 +37,70 @@ public final class ToString {
}
/**
- * Returns a {@code PTransform<PCollection, PCollection<String>>} which transforms each
- * element of the input {@link PCollection} to a {@link String} using the
- * {@link Object#toString} method.
+ * Transforms each element of the input {@link PCollection} to a {@link String} using the {@link
+ * Object#toString} method.
*/
public static PTransform<PCollection<?>, PCollection<String>> elements() {
- return new SimpleToString();
+ return new Elements();
}
/**
- * Returns a {@code PTransform<PCollection<KV<?,?>, PCollection<String>>} which transforms each
- * element of the input {@link PCollection} to a {@link String} by using the
+ * Transforms each element of the input {@link PCollection} to a {@link String} by using the
* {@link Object#toString} on the key followed by a "," followed by the {@link Object#toString}
* of the value.
*/
- public static PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> kv() {
- return kv(",");
+ public static PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> kvs() {
+ return kvs(",");
}
/**
- * Returns a {@code PTransform<PCollection<KV<?,?>, PCollection<String>>} which transforms each
- * element of the input {@link PCollection} to a {@link String} by using the
- * {@link Object#toString} on the key followed by the specified delimeter followed by the
- * {@link Object#toString} of the value.
+ * Transforms each element of the input {@link PCollection} to a {@link String} by using the
+ * {@link Object#toString} on the key followed by the specified delimiter followed by the {@link
+ * Object#toString} of the value.
+ *
* @param delimiter The delimiter to put between the key and value
*/
- public static PTransform<PCollection<? extends KV<?, ?>>,
- PCollection<String>> kv(String delimiter) {
- return new KVToString(delimiter);
+ public static PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> kvs(
+ String delimiter) {
+ return new KVs(delimiter);
}
/**
- * Returns a {@code PTransform<PCollection<Iterable<?>, PCollection<String>>} which
- * transforms each item in the iterable of the input {@link PCollection} to a {@link String}
+ * Transforms each item in the iterable of the input {@link PCollection} to a {@link String}
* using the {@link Object#toString} method followed by a "," until
* the last element in the iterable. There is no trailing delimiter.
*/
- public static PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> iterable() {
- return iterable(",");
+ public static PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> iterables() {
+ return iterables(",");
}
/**
- * Returns a {@code PTransform<PCollection<Iterable<?>, PCollection<String>>} which
- * transforms each item in the iterable of the input {@link PCollection} to a {@link String}
- * using the {@link Object#toString} method followed by the specified delimiter until
- * the last element in the iterable. There is no trailing delimiter.
+ * Transforms each item in the iterable of the input {@link PCollection} to a {@link String} using
+ * the {@link Object#toString} method followed by the specified delimiter until the last element
+ * in the iterable. There is no trailing delimiter.
+ *
* @param delimiter The delimiter to put between the items in the iterable.
*/
- public static PTransform<PCollection<? extends Iterable<?>>,
- PCollection<String>> iterable(String delimiter) {
- return new IterablesToString(delimiter);
+ public static PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> iterables(
+ String delimiter) {
+ return new Iterables(delimiter);
}
/**
- * A {@link PTransform} that converts a {@code PCollection} to a {@code PCollection<String>}
- * using the {@link Object#toString} method.
+ * A {@link PTransform} that converts a {@code PCollection} to a {@code PCollection<String>} using
+ * the {@link Object#toString} method.
*
* <p>Example of use:
+ *
* <pre>{@code
* PCollection<Long> longs = ...;
* PCollection<String> strings = longs.apply(ToString.elements());
* }</pre>
*
- *
* <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
* {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
*/
- private static final class SimpleToString extends
- PTransform<PCollection<?>, PCollection<String>> {
+ private static final class Elements extends PTransform<PCollection<?>, PCollection<String>> {
@Override
public PCollection<String> expand(PCollection<?> input) {
return input.apply(MapElements.via(new SimpleFunction<Object, String>() {
@@ -118,25 +113,25 @@ public final class ToString {
}
/**
- * A {@link PTransform} that converts a {@code PCollection} of {@code KV} to a
- * {@code PCollection<String>} using the {@link Object#toString} method for
- * the key and value and an optional delimiter.
+ * A {@link PTransform} that converts a {@code PCollection} of {@code KV} to a {@code
+ * PCollection<String>} using the {@link Object#toString} method for the key and value and an
+ * optional delimiter.
*
* <p>Example of use:
+ *
* <pre>{@code
* PCollection<KV<String, Long>> nameToLong = ...;
* PCollection<String> strings = nameToLong.apply(ToString.kv());
* }</pre>
*
- *
- * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your
- * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+ * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
+ * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
*/
- private static final class KVToString extends
- PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> {
+ private static final class KVs
+ extends PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> {
private final String delimiter;
- public KVToString(String delimiter) {
+ public KVs(String delimiter) {
this.delimiter = delimiter;
}
@@ -152,25 +147,24 @@ public final class ToString {
}
/**
- * A {@link PTransform} that converts a {@code PCollection} of {@link Iterable} to a
- * {@code PCollection<String>} using the {@link Object#toString} method and
- * an optional delimiter.
+ * A {@link PTransform} that converts a {@code PCollection} of {@link Iterable} to a {@code
+ * PCollection<String>} using the {@link Object#toString} method and an optional delimiter.
*
* <p>Example of use:
+ *
* <pre>{@code
* PCollection<Iterable<Long>> longs = ...;
* PCollection<String> strings = nameToLong.apply(ToString.iterable());
* }</pre>
*
- *
- * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your
- * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+ * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
+ * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
*/
- private static final class IterablesToString extends
- PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> {
+ private static final class Iterables
+ extends PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> {
private final String delimiter;
- public IterablesToString(String delimiter) {
+ public Iterables(String delimiter) {
this.delimiter = delimiter;
}
@@ -179,18 +173,7 @@ public final class ToString {
return input.apply(MapElements.via(new SimpleFunction<Iterable<?>, String>() {
@Override
public String apply(Iterable<?> input) {
- StringBuilder builder = new StringBuilder();
- Iterator iterator = input.iterator();
-
- while (iterator.hasNext()) {
- builder.append(iterator.next().toString());
-
- if (iterator.hasNext()) {
- builder.append(delimiter);
- }
- }
-
- return builder.toString();
+ return Joiner.on(delimiter).join(input);
}
}));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f34caa7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
index d2116da..ab446a4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
@@ -65,7 +65,7 @@ public class ToStringTest {
expected.add("two,2");
PCollection<KV<String, Integer>> input = p.apply(Create.of(kvs));
- PCollection<String> output = input.apply(ToString.kv());
+ PCollection<String> output = input.apply(ToString.kvs());
PAssert.that(output).containsInAnyOrder(expected);
p.run();
}
@@ -82,7 +82,7 @@ public class ToStringTest {
expected.add("two\t2");
PCollection<KV<String, Integer>> input = p.apply(Create.of(kvs));
- PCollection<String> output = input.apply(ToString.kv("\t"));
+ PCollection<String> output = input.apply(ToString.kvs("\t"));
PAssert.that(output).containsInAnyOrder(expected);
p.run();
}
@@ -100,7 +100,7 @@ public class ToStringTest {
PCollection<Iterable<String>> input = p.apply(Create.of(iterables)
.withCoder(IterableCoder.of(StringUtf8Coder.of())));
- PCollection<String> output = input.apply(ToString.iterable());
+ PCollection<String> output = input.apply(ToString.iterables());
PAssert.that(output).containsInAnyOrder(expected);
p.run();
}
@@ -118,7 +118,7 @@ public class ToStringTest {
PCollection<Iterable<String>> input = p.apply(Create.of(iterables)
.withCoder(IterableCoder.of(StringUtf8Coder.of())));
- PCollection<String> output = input.apply(ToString.iterable("\t"));
+ PCollection<String> output = input.apply(ToString.iterables("\t"));
PAssert.that(output).containsInAnyOrder(expected);
p.run();
}