You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/29 16:22:26 UTC

[40/50] beam git commit: Refactored existing code. Added iterable and KV. Changed from element to of.

Refactored existing code. Added iterable and KV. Changed from element to of.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e01ce864
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e01ce864
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e01ce864

Branch: refs/heads/python-sdk
Commit: e01ce864edf551afefe861041541bb2a05340a08
Parents: 83f8c46
Author: Jesse Anderson <je...@smokinghand.com>
Authored: Tue Jan 24 08:37:33 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jan 26 22:52:09 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ToString.java    | 168 ++++++++++++++++---
 .../java/org/apache/beam/sdk/io/WriteTest.java  |   2 +-
 .../beam/sdk/transforms/ToStringTest.java       |  86 ++++++++--
 3 files changed, 226 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e01ce864/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
index ef49267..d5c9784 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
@@ -18,51 +18,181 @@
 
 package org.apache.beam.sdk.transforms;
 
+import java.util.Iterator;
+
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
- * {@link PTransform PTransforms} for converting a {@link PCollection PCollection&lt;T&gt;} to a
- * {@link PCollection PCollection&lt;String&gt;}.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<Long> longs = ...;
- * PCollection<String> strings = longs.apply(ToString.<Long>element());
- * } </pre>
- *
+ * {@link PTransform PTransforms} for converting a {@link PCollection PCollection&lt;?&gt;},
+ * {@link PCollection PCollection&lt;KV&lt;?,?&gt;&gt;}, or
+ * {@link PCollection PCollection&lt;Iterable&lt;?&gt;&gt;}
+ * to a {@link PCollection PCollection&lt;String&gt;}.
  *
  * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
  * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
  */
 public final class ToString {
+  private ToString() {
+    // do not instantiate
+  }
 
   /**
    * Returns a {@code PTransform<PCollection, PCollection<String>>} which transforms each
    * element of the input {@link PCollection} to a {@link String} using the
    * {@link Object#toString} method.
    */
-  public static PTransform<PCollection<?>, PCollection<String>> element() {
-    return new Default();
+  public static PTransform<PCollection<?>, PCollection<String>> of() {
+    return new SimpleToString();
   }
 
-  private ToString() {
+  /**
+   * Returns a {@code PTransform<PCollection<KV<?,?>, PCollection<String>>} which transforms each
+   * element of the input {@link PCollection} to a {@link String} by using the
+   * {@link Object#toString} on the key followed by a "," followed by the {@link Object#toString}
+   * of the value.
+   */
+  public static PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> kv() {
+    return kv(",");
+  }
+
+  /**
+   * Returns a {@code PTransform<PCollection<KV<?,?>, PCollection<String>>} which transforms each
+   * element of the input {@link PCollection} to a {@link String} by using the
+   * {@link Object#toString} on the key followed by the specified delimeter followed by the
+   * {@link Object#toString} of the value.
+   * @param delimiter The delimiter to put between the key and value
+   */
+  public static PTransform<PCollection<? extends KV<?, ?>>,
+          PCollection<String>> kv(String delimiter) {
+    return new KVToString(delimiter);
+  }
+
+  /**
+   * Returns a {@code PTransform<PCollection<Iterable<?>, PCollection<String>>} which
+   * transforms each item in the iterable of the input {@link PCollection} to a {@link String}
+   * using the {@link Object#toString} method followed by a "," until
+   * the last element in the iterable. There is no trailing delimiter.
+   */
+  public static PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> iterable() {
+    return iterable(",");
+  }
+
+  /**
+   * Returns a {@code PTransform<PCollection<Iterable<?>, PCollection<String>>} which
+   * transforms each item in the iterable of the input {@link PCollection} to a {@link String}
+   * using the {@link Object#toString} method followed by the specified delimiter until
+   * the last element in the iterable. There is no trailing delimiter.
+   * @param delimiter The delimiter to put between the items in the iterable.
+   */
+  public static PTransform<PCollection<? extends Iterable<?>>,
+          PCollection<String>> iterable(String delimiter) {
+    return new IterablesToString(delimiter);
   }
 
   /**
    * A {@link PTransform} that converts a {@code PCollection} to a {@code PCollection<String>}
    * using the {@link  Object#toString} method.
+   *
+   * <p>Example of use:
+   * <pre>{@code
+   * PCollection<Long> longs = ...;
+   * PCollection<String> strings = longs.apply(ToString.of());
+   * }</pre>
+   *
+   *
+   * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
+   * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
    */
-  private static final class Default extends PTransform<PCollection<?>, PCollection<String>> {
+  private static final class SimpleToString extends
+          PTransform<PCollection<?>, PCollection<String>> {
     @Override
     public PCollection<String> expand(PCollection<?> input) {
-      return input.apply(MapElements.via(new ToStringFunction<>()));
+      return input.apply(MapElements.via(new SimpleFunction<Object, String>() {
+        @Override
+        public String apply(Object input) {
+          return input.toString();
+        }
+      }));
     }
+  }
+
+  /**
+   * A {@link PTransform} that converts a {@code PCollection} of {@code KV} to a
+   * {@code PCollection<String>} using the {@link  Object#toString} method for
+   * the key and value and an optional delimiter.
+   *
+   * <p>Example of use:
+   * <pre>{@code
+   * PCollection<KV<String, Long>> nameToLong = ...;
+   * PCollection<String> strings = nameToLong.apply(ToString.kv());
+   * }</pre>
+   *
+   *
+   * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your
+   * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+   */
+  private static final class KVToString extends
+          PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> {
+    private final String delimiter;
+
+    public KVToString(String delimiter) {
+      this.delimiter = delimiter;
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<? extends KV<?, ?>> input) {
+      return input.apply(MapElements.via(new SimpleFunction<KV<?, ?>, String>() {
+        @Override
+        public String apply(KV<?, ?> input) {
+          return input.getKey().toString() + delimiter + input.getValue().toString();
+        }
+      }));
+    }
+  }
+
+  /**
+   * A {@link PTransform} that converts a {@code PCollection} of {@link Iterable} to a
+   * {@code PCollection<String>} using the {@link  Object#toString} method and
+   * an optional delimiter.
+   *
+   * <p>Example of use:
+   * <pre>{@code
+   * PCollection<Iterable<Long>> longs = ...;
+   * PCollection<String> strings = nameToLong.apply(ToString.iterable());
+   * }</pre>
+   *
+   *
+   * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your
+   * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+   */
+  private static final class IterablesToString extends
+          PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> {
+    private final String delimiter;
+
+    public IterablesToString(String delimiter) {
+      this.delimiter = delimiter;
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<? extends Iterable<?>> input) {
+      return input.apply(MapElements.via(new SimpleFunction<Iterable<?>, String>() {
+        @Override
+        public String apply(Iterable<?> input) {
+          StringBuilder builder = new StringBuilder();
+          Iterator iterator = input.iterator();
+
+          while (iterator.hasNext()) {
+            builder.append(iterator.next().toString());
+
+            if (iterator.hasNext()) {
+              builder.append(delimiter);
+            }
+          }
 
-    private static class ToStringFunction<T> extends SimpleFunction<T, String> {
-      @Override
-      public String apply(T input) {
-        return input.toString();
-      }
+          return builder.toString();
+        }
+      }));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e01ce864/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 9772b9b..f81cc0c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -297,7 +297,7 @@ public class WriteTest {
   @Test
   public void testWriteUnbounded() {
     PCollection<String> unbounded = p.apply(CountingInput.unbounded())
-        .apply(ToString.element());
+        .apply(ToString.of());
 
     TestSink sink = new TestSink();
     thrown.expect(IllegalArgumentException.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/e01ce864/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
index e5c9f05..ab984f1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
@@ -20,10 +20,13 @@ package org.apache.beam.sdk.transforms;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
+
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
 import org.junit.Test;
@@ -41,19 +44,82 @@ public class ToStringTest {
 
   @Test
   @Category(RunnableOnService.class)
-  public void testToStringElement() {
+  public void testToStringOf() {
     Integer[] ints = {1, 2, 3, 4, 5};
+    String[] strings = {"1", "2", "3", "4", "5"};
     PCollection<Integer> input = p.apply(Create.of(Arrays.asList(ints)));
-    PCollection<String> output = input.apply(ToString.<Integer>element());
-    PAssert.that(output).containsInAnyOrder(toStringList(ints));
+    PCollection<String> output = input.apply(ToString.of());
+    PAssert.that(output).containsInAnyOrder(strings);
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testToStringKV() {
+    ArrayList<KV<String, Integer>> kvs = new ArrayList<>();
+    kvs.add(KV.of("one", 1));
+    kvs.add(KV.of("two", 2));
+
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add("one,1");
+    expected.add("two,2");
+
+    PCollection<KV<String, Integer>> input = p.apply(Create.of(kvs));
+    PCollection<String> output = input.apply(ToString.kv());
+    PAssert.that(output).containsInAnyOrder(expected);
     p.run();
   }
 
-  private List<String> toStringList(Object[] ints) {
-    List<String> ll = new ArrayList<>(ints.length);
-    for (Object i : ints) {
-      ll.add(i.toString());
-    }
-    return ll;
+  @Test
+  @Category(RunnableOnService.class)
+  public void testToStringKVWithDelimiter() {
+    ArrayList<KV<String, Integer>> kvs = new ArrayList<>();
+    kvs.add(KV.of("one", 1));
+    kvs.add(KV.of("two", 2));
+
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add("one\t1");
+    expected.add("two\t2");
+
+    PCollection<KV<String, Integer>> input = p.apply(Create.of(kvs));
+    PCollection<String> output = input.apply(ToString.kv("\t"));
+    PAssert.that(output).containsInAnyOrder(expected);
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testToStringIterable() {
+    ArrayList<Iterable<String>> iterables = new ArrayList<>();
+    iterables.add(Arrays.asList(new String[]{"one", "two", "three"}));
+    iterables.add(Arrays.asList(new String[]{"four", "five", "six"}));
+
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add("one,two,three");
+    expected.add("four,five,six");
+
+    PCollection<Iterable<String>> input = p.apply(Create.of(iterables)
+            .withCoder(IterableCoder.of(StringUtf8Coder.of())));
+    PCollection<String> output = input.apply(ToString.iterable());
+    PAssert.that(output).containsInAnyOrder(expected);
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testToStringIterableWithDelimiter() {
+    ArrayList<Iterable<String>> iterables = new ArrayList<>();
+    iterables.add(Arrays.asList(new String[]{"one", "two", "three"}));
+    iterables.add(Arrays.asList(new String[]{"four", "five", "six"}));
+
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add("one\ttwo\tthree");
+    expected.add("four\tfive\tsix");
+
+    PCollection<Iterable<String>> input = p.apply(Create.of(iterables)
+            .withCoder(IterableCoder.of(StringUtf8Coder.of())));
+    PCollection<String> output = input.apply(ToString.iterable("\t"));
+    PAssert.that(output).containsInAnyOrder(expected);
+    p.run();
   }
 }