You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/07 18:45:43 UTC

[1/2] incubator-beam git commit: Closes #92

Repository: incubator-beam
Updated Branches:
  refs/heads/master 363d4ec6f -> d2b46936f


Closes #92


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d2b46936
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d2b46936
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d2b46936

Branch: refs/heads/master
Commit: d2b46936f22e6f6b87df5bee78b3b288df67718e
Parents: 363d4ec 65eebc7
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 7 09:45:17 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 7 09:45:17 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/io/AvroIOTest.java       | 68 ++++++++++++++------
 .../cloud/dataflow/sdk/io/TextIOTest.java       | 56 +++++++++++-----
 2 files changed, 91 insertions(+), 33 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Add TextIO and AvroIO withNumShards tests

Posted by dh...@apache.org.
Add TextIO and AvroIO withNumShards tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/65eebc76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/65eebc76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/65eebc76

Branch: refs/heads/master
Commit: 65eebc76bfe81d173e68db9f39de8e6e2ab20ad6
Parents: 363d4ec
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 29 18:37:30 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 7 09:45:17 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/io/AvroIOTest.java       | 68 ++++++++++++++------
 .../cloud/dataflow/sdk/io/TextIOTest.java       | 56 +++++++++++-----
 2 files changed, 91 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/65eebc76/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
index f0a9b1a..ccc24d5 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.cloud.dataflow.sdk.coders.AvroCoder;
 import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound;
 import com.google.cloud.dataflow.sdk.runners.DirectPipeline;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
@@ -45,6 +46,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -198,30 +200,60 @@ public class AvroIOTest {
   }
 
   @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests.
-  @Test
-  public void testAvroSinkWrite() throws Exception {
-    String outputFilePrefix = new File(tmpFolder.getRoot(), "prefix").getAbsolutePath();
-    String[] expectedElements = new String[] {"first", "second", "third"};
-
+  private void runTestWrite(String[] expectedElements, int numShards) throws IOException {
+    File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
+    String outputFilePrefix = baseOutputFile.getAbsolutePath();
     TestPipeline p = TestPipeline.create();
-    p.apply(Create.<String>of(expectedElements))
-        .apply(AvroIO.Write.to(outputFilePrefix).withSchema(String.class));
+    Bound<String> write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class);
+    if (numShards > 1) {
+      write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
+    } else {
+      write = write.withoutSharding();
+    }
+    p.apply(Create.<String>of(expectedElements)).apply(write);
     p.run();
 
     // Validate that the data written matches the expected elements in the expected order
-    String expectedName =
-        IOChannelUtils.constructName(
-            outputFilePrefix, ShardNameTemplate.INDEX_OF_MAX, "" /* no suffix */, 0, 1);
-    File outputFile = new File(expectedName);
-    assertTrue("Expected output file " + expectedName, outputFile.exists());
-    try (DataFileReader<String> reader =
-            new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) {
-      List<String> actualElements = new ArrayList<>();
-      Iterators.addAll(actualElements, reader);
-      assertThat(actualElements, containsInAnyOrder(expectedElements));
+    List<File> expectedFiles = new ArrayList<>();
+    if (numShards == 1) {
+      expectedFiles.add(baseOutputFile);
+    } else {
+      for (int i = 0; i < numShards; i++) {
+        expectedFiles.add(
+            new File(
+                IOChannelUtils.constructName(
+                    outputFilePrefix,
+                    ShardNameTemplate.INDEX_OF_MAX,
+                    "" /* no suffix */,
+                    i,
+                    numShards)));
+      }
+    }
+
+    List<String> actualElements = new ArrayList<>();
+    for (File outputFile : expectedFiles) {
+      assertTrue("Expected output file " + outputFile.getName(), outputFile.exists());
+      try (DataFileReader<String> reader =
+              new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) {
+        Iterators.addAll(actualElements, reader);
+      }
     }
+    assertThat(actualElements, containsInAnyOrder(expectedElements));
   }
 
-  // TODO: for Write only, test withSuffix, withNumShards,
+  @Test
+  public void testAvroSinkWrite() throws Exception {
+    String[] expectedElements = new String[] {"first", "second", "third"};
+
+    runTestWrite(expectedElements, 1);
+  }
+
+  @Test
+  public void testAvroSinkShardedWrite() throws Exception {
+    String[] expectedElements = new String[] {"first", "second", "third", "fourth", "fifth"};
+
+    runTestWrite(expectedElements, 4);
+  }
+  // TODO: for Write only, test withSuffix,
   // withShardNameTemplate and withoutSharding.
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/65eebc76/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
index 0bd6ce7..6d4d6e7 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
@@ -43,6 +43,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.util.CoderUtils;
 import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
 import com.google.cloud.dataflow.sdk.util.TestCredential;
 import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
 import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -199,36 +200,57 @@ public class TextIOTest {
   }
 
   <T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception {
-    File tmpFile = tmpFolder.newFile("file.txt");
-    String filename = tmpFile.getPath();
+    runTestWrite(elems, coder, 1);
+  }
+
+  <T> void runTestWrite(T[] elems, Coder<T> coder, int numShards) throws Exception {
+    String filename = tmpFolder.newFile("file.txt").getPath();
 
     Pipeline p = TestPipeline.create();
 
-    PCollection<T> input =
-        p.apply(Create.of(Arrays.asList(elems)).withCoder(coder));
+    PCollection<T> input = p.apply(Create.of(Arrays.asList(elems)).withCoder(coder));
 
     TextIO.Write.Bound<T> write;
     if (coder.equals(StringUtf8Coder.of())) {
-      TextIO.Write.Bound<String> writeStrings =
-          TextIO.Write.to(filename).withoutSharding();
+      TextIO.Write.Bound<String> writeStrings = TextIO.Write.to(filename);
       // T==String
       write = (TextIO.Write.Bound<T>) writeStrings;
     } else {
-      write = TextIO.Write.to(filename).withCoder(coder).withoutSharding();
+      write = TextIO.Write.to(filename).withCoder(coder);
+    }
+    if (numShards == 1) {
+      write = write.withoutSharding();
+    } else {
+      write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
     }
 
     input.apply(write);
 
     p.run();
 
+    List<File> expectedFiles = new ArrayList<>();
+    if (numShards == 1) {
+      expectedFiles.add(new File(filename));
+    } else {
+      for (int i = 0; i < numShards; i++) {
+        expectedFiles.add(
+            new File(
+                tmpFolder.getRoot(),
+                IOChannelUtils.constructName(
+                    "file.txt", ShardNameTemplate.INDEX_OF_MAX, "", i, numShards)));
+      }
+    }
+
     List<String> actual = new ArrayList<>();
-    try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
-      for (;;) {
-        String line = reader.readLine();
-        if (line == null) {
-          break;
+    for (File tmpFile : expectedFiles) {
+      try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
+        for (;;) {
+          String line = reader.readLine();
+          if (line == null) {
+            break;
+          }
+          actual.add(line);
         }
-        actual.add(line);
       }
     }
 
@@ -240,8 +262,7 @@ public class TextIOTest {
       expected[i] = line;
     }
 
-    assertThat(actual,
-               containsInAnyOrder(expected));
+    assertThat(actual, containsInAnyOrder(expected));
   }
 
   @Test
@@ -286,6 +307,11 @@ public class TextIOTest {
   }
 
   @Test
+  public void testShardedWrite() throws Exception {
+    runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5);
+  }
+
+  @Test
   public void testUnsupportedFilePattern() throws IOException {
     File outFolder = tmpFolder.newFolder();
     // Windows doesn't like resolving paths with * in them.