You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/29 16:22:26 UTC
[40/50] beam git commit: Refactored existing code. Added iterable and
KV. Changed from element to of.
Refactored existing code. Added iterable and KV. Changed from element to of.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e01ce864
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e01ce864
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e01ce864
Branch: refs/heads/python-sdk
Commit: e01ce864edf551afefe861041541bb2a05340a08
Parents: 83f8c46
Author: Jesse Anderson <je...@smokinghand.com>
Authored: Tue Jan 24 08:37:33 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jan 26 22:52:09 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/ToString.java | 168 ++++++++++++++++---
.../java/org/apache/beam/sdk/io/WriteTest.java | 2 +-
.../beam/sdk/transforms/ToStringTest.java | 86 ++++++++--
3 files changed, 226 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e01ce864/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
index ef49267..d5c9784 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
@@ -18,51 +18,181 @@
package org.apache.beam.sdk.transforms;
+import java.util.Iterator;
+
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
- * {@link PTransform PTransforms} for converting a {@link PCollection PCollection<T>} to a
- * {@link PCollection PCollection<String>}.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<Long> longs = ...;
- * PCollection<String> strings = longs.apply(ToString.<Long>element());
- * } </pre>
- *
+ * {@link PTransform PTransforms} for converting a {@link PCollection PCollection<?>},
+ * {@link PCollection PCollection<KV<?,?>>}, or
+ * {@link PCollection PCollection<Iterable<?>>}
+ * to a {@link PCollection PCollection<String>}.
*
* <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
* {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
*/
public final class ToString {
+ private ToString() {
+ // do not instantiate
+ }
/**
* Returns a {@code PTransform<PCollection, PCollection<String>>} which transforms each
* element of the input {@link PCollection} to a {@link String} using the
* {@link Object#toString} method.
*/
- public static PTransform<PCollection<?>, PCollection<String>> element() {
- return new Default();
+ public static PTransform<PCollection<?>, PCollection<String>> of() {
+ return new SimpleToString();
}
- private ToString() {
+ /**
+ * Returns a {@code PTransform<PCollection<KV<?,?>, PCollection<String>>} which transforms each
+ * element of the input {@link PCollection} to a {@link String} by using the
+ * {@link Object#toString} on the key followed by a "," followed by the {@link Object#toString}
+ * of the value.
+ */
+ public static PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> kv() {
+ return kv(",");
+ }
+
+ /**
+ * Returns a {@code PTransform<PCollection<KV<?,?>, PCollection<String>>} which transforms each
+ * element of the input {@link PCollection} to a {@link String} by using the
+ * {@link Object#toString} on the key followed by the specified delimeter followed by the
+ * {@link Object#toString} of the value.
+ * @param delimiter The delimiter to put between the key and value
+ */
+ public static PTransform<PCollection<? extends KV<?, ?>>,
+ PCollection<String>> kv(String delimiter) {
+ return new KVToString(delimiter);
+ }
+
+ /**
+ * Returns a {@code PTransform<PCollection<Iterable<?>, PCollection<String>>} which
+ * transforms each item in the iterable of the input {@link PCollection} to a {@link String}
+ * using the {@link Object#toString} method followed by a "," until
+ * the last element in the iterable. There is no trailing delimiter.
+ */
+ public static PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> iterable() {
+ return iterable(",");
+ }
+
+ /**
+ * Returns a {@code PTransform<PCollection<Iterable<?>, PCollection<String>>} which
+ * transforms each item in the iterable of the input {@link PCollection} to a {@link String}
+ * using the {@link Object#toString} method followed by the specified delimiter until
+ * the last element in the iterable. There is no trailing delimiter.
+ * @param delimiter The delimiter to put between the items in the iterable.
+ */
+ public static PTransform<PCollection<? extends Iterable<?>>,
+ PCollection<String>> iterable(String delimiter) {
+ return new IterablesToString(delimiter);
}
/**
* A {@link PTransform} that converts a {@code PCollection} to a {@code PCollection<String>}
* using the {@link Object#toString} method.
+ *
+ * <p>Example of use:
+ * <pre>{@code
+ * PCollection<Long> longs = ...;
+ * PCollection<String> strings = longs.apply(ToString.of());
+ * }</pre>
+ *
+ *
+ * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
+ * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
*/
- private static final class Default extends PTransform<PCollection<?>, PCollection<String>> {
+ private static final class SimpleToString extends
+ PTransform<PCollection<?>, PCollection<String>> {
@Override
public PCollection<String> expand(PCollection<?> input) {
- return input.apply(MapElements.via(new ToStringFunction<>()));
+ return input.apply(MapElements.via(new SimpleFunction<Object, String>() {
+ @Override
+ public String apply(Object input) {
+ return input.toString();
+ }
+ }));
}
+ }
+
+ /**
+ * A {@link PTransform} that converts a {@code PCollection} of {@code KV} to a
+ * {@code PCollection<String>} using the {@link Object#toString} method for
+ * the key and value and an optional delimiter.
+ *
+ * <p>Example of use:
+ * <pre>{@code
+ * PCollection<KV<String, Long>> nameToLong = ...;
+ * PCollection<String> strings = nameToLong.apply(ToString.kv());
+ * }</pre>
+ *
+ *
+ * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your
+ * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+ */
+ private static final class KVToString extends
+ PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> {
+ private final String delimiter;
+
+ public KVToString(String delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ @Override
+ public PCollection<String> expand(PCollection<? extends KV<?, ?>> input) {
+ return input.apply(MapElements.via(new SimpleFunction<KV<?, ?>, String>() {
+ @Override
+ public String apply(KV<?, ?> input) {
+ return input.getKey().toString() + delimiter + input.getValue().toString();
+ }
+ }));
+ }
+ }
+
+ /**
+ * A {@link PTransform} that converts a {@code PCollection} of {@link Iterable} to a
+ * {@code PCollection<String>} using the {@link Object#toString} method and
+ * an optional delimiter.
+ *
+ * <p>Example of use:
+ * <pre>{@code
+ * PCollection<Iterable<Long>> longs = ...;
+ * PCollection<String> strings = nameToLong.apply(ToString.iterable());
+ * }</pre>
+ *
+ *
+ * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your
+ * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+ */
+ private static final class IterablesToString extends
+ PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> {
+ private final String delimiter;
+
+ public IterablesToString(String delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ @Override
+ public PCollection<String> expand(PCollection<? extends Iterable<?>> input) {
+ return input.apply(MapElements.via(new SimpleFunction<Iterable<?>, String>() {
+ @Override
+ public String apply(Iterable<?> input) {
+ StringBuilder builder = new StringBuilder();
+ Iterator iterator = input.iterator();
+
+ while (iterator.hasNext()) {
+ builder.append(iterator.next().toString());
+
+ if (iterator.hasNext()) {
+ builder.append(delimiter);
+ }
+ }
- private static class ToStringFunction<T> extends SimpleFunction<T, String> {
- @Override
- public String apply(T input) {
- return input.toString();
- }
+ return builder.toString();
+ }
+ }));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e01ce864/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 9772b9b..f81cc0c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -297,7 +297,7 @@ public class WriteTest {
@Test
public void testWriteUnbounded() {
PCollection<String> unbounded = p.apply(CountingInput.unbounded())
- .apply(ToString.element());
+ .apply(ToString.of());
TestSink sink = new TestSink();
thrown.expect(IllegalArgumentException.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/e01ce864/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
index e5c9f05..ab984f1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
@@ -20,10 +20,13 @@ package org.apache.beam.sdk.transforms;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
+
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;
@@ -41,19 +44,82 @@ public class ToStringTest {
@Test
@Category(RunnableOnService.class)
- public void testToStringElement() {
+ public void testToStringOf() {
Integer[] ints = {1, 2, 3, 4, 5};
+ String[] strings = {"1", "2", "3", "4", "5"};
PCollection<Integer> input = p.apply(Create.of(Arrays.asList(ints)));
- PCollection<String> output = input.apply(ToString.<Integer>element());
- PAssert.that(output).containsInAnyOrder(toStringList(ints));
+ PCollection<String> output = input.apply(ToString.of());
+ PAssert.that(output).containsInAnyOrder(strings);
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testToStringKV() {
+ ArrayList<KV<String, Integer>> kvs = new ArrayList<>();
+ kvs.add(KV.of("one", 1));
+ kvs.add(KV.of("two", 2));
+
+ ArrayList<String> expected = new ArrayList<>();
+ expected.add("one,1");
+ expected.add("two,2");
+
+ PCollection<KV<String, Integer>> input = p.apply(Create.of(kvs));
+ PCollection<String> output = input.apply(ToString.kv());
+ PAssert.that(output).containsInAnyOrder(expected);
p.run();
}
- private List<String> toStringList(Object[] ints) {
- List<String> ll = new ArrayList<>(ints.length);
- for (Object i : ints) {
- ll.add(i.toString());
- }
- return ll;
+ @Test
+ @Category(RunnableOnService.class)
+ public void testToStringKVWithDelimiter() {
+ ArrayList<KV<String, Integer>> kvs = new ArrayList<>();
+ kvs.add(KV.of("one", 1));
+ kvs.add(KV.of("two", 2));
+
+ ArrayList<String> expected = new ArrayList<>();
+ expected.add("one\t1");
+ expected.add("two\t2");
+
+ PCollection<KV<String, Integer>> input = p.apply(Create.of(kvs));
+ PCollection<String> output = input.apply(ToString.kv("\t"));
+ PAssert.that(output).containsInAnyOrder(expected);
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testToStringIterable() {
+ ArrayList<Iterable<String>> iterables = new ArrayList<>();
+ iterables.add(Arrays.asList(new String[]{"one", "two", "three"}));
+ iterables.add(Arrays.asList(new String[]{"four", "five", "six"}));
+
+ ArrayList<String> expected = new ArrayList<>();
+ expected.add("one,two,three");
+ expected.add("four,five,six");
+
+ PCollection<Iterable<String>> input = p.apply(Create.of(iterables)
+ .withCoder(IterableCoder.of(StringUtf8Coder.of())));
+ PCollection<String> output = input.apply(ToString.iterable());
+ PAssert.that(output).containsInAnyOrder(expected);
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testToStringIterableWithDelimiter() {
+ ArrayList<Iterable<String>> iterables = new ArrayList<>();
+ iterables.add(Arrays.asList(new String[]{"one", "two", "three"}));
+ iterables.add(Arrays.asList(new String[]{"four", "five", "six"}));
+
+ ArrayList<String> expected = new ArrayList<>();
+ expected.add("one\ttwo\tthree");
+ expected.add("four\tfive\tsix");
+
+ PCollection<Iterable<String>> input = p.apply(Create.of(iterables)
+ .withCoder(IterableCoder.of(StringUtf8Coder.of())));
+ PCollection<String> output = input.apply(ToString.iterable("\t"));
+ PAssert.that(output).containsInAnyOrder(expected);
+ p.run();
}
}