You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/07 18:45:44 UTC
[2/2] incubator-beam git commit: Add TextIO and AvroIO withNumShards
tests
Add TextIO and AvroIO withNumShards tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/65eebc76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/65eebc76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/65eebc76
Branch: refs/heads/master
Commit: 65eebc76bfe81d173e68db9f39de8e6e2ab20ad6
Parents: 363d4ec
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 29 18:37:30 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 7 09:45:17 2016 -0700
----------------------------------------------------------------------
.../cloud/dataflow/sdk/io/AvroIOTest.java | 68 ++++++++++++++------
.../cloud/dataflow/sdk/io/TextIOTest.java | 56 +++++++++++-----
2 files changed, 91 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/65eebc76/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
index f0a9b1a..ccc24d5 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound;
import com.google.cloud.dataflow.sdk.runners.DirectPipeline;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
@@ -45,6 +46,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -198,30 +200,60 @@ public class AvroIOTest {
}
@SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests.
- @Test
- public void testAvroSinkWrite() throws Exception {
- String outputFilePrefix = new File(tmpFolder.getRoot(), "prefix").getAbsolutePath();
- String[] expectedElements = new String[] {"first", "second", "third"};
-
+ private void runTestWrite(String[] expectedElements, int numShards) throws IOException {
+ File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
+ String outputFilePrefix = baseOutputFile.getAbsolutePath();
TestPipeline p = TestPipeline.create();
- p.apply(Create.<String>of(expectedElements))
- .apply(AvroIO.Write.to(outputFilePrefix).withSchema(String.class));
+ Bound<String> write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class);
+ if (numShards > 1) {
+ write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
+ } else {
+ write = write.withoutSharding();
+ }
+ p.apply(Create.<String>of(expectedElements)).apply(write);
p.run();
// Validate that the data written matches the expected elements in the expected order
- String expectedName =
- IOChannelUtils.constructName(
- outputFilePrefix, ShardNameTemplate.INDEX_OF_MAX, "" /* no suffix */, 0, 1);
- File outputFile = new File(expectedName);
- assertTrue("Expected output file " + expectedName, outputFile.exists());
- try (DataFileReader<String> reader =
- new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) {
- List<String> actualElements = new ArrayList<>();
- Iterators.addAll(actualElements, reader);
- assertThat(actualElements, containsInAnyOrder(expectedElements));
+ List<File> expectedFiles = new ArrayList<>();
+ if (numShards == 1) {
+ expectedFiles.add(baseOutputFile);
+ } else {
+ for (int i = 0; i < numShards; i++) {
+ expectedFiles.add(
+ new File(
+ IOChannelUtils.constructName(
+ outputFilePrefix,
+ ShardNameTemplate.INDEX_OF_MAX,
+ "" /* no suffix */,
+ i,
+ numShards)));
+ }
+ }
+
+ List<String> actualElements = new ArrayList<>();
+ for (File outputFile : expectedFiles) {
+ assertTrue("Expected output file " + outputFile.getName(), outputFile.exists());
+ try (DataFileReader<String> reader =
+ new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) {
+ Iterators.addAll(actualElements, reader);
+ }
}
+ assertThat(actualElements, containsInAnyOrder(expectedElements));
}
- // TODO: for Write only, test withSuffix, withNumShards,
+ @Test
+ public void testAvroSinkWrite() throws Exception {
+ String[] expectedElements = new String[] {"first", "second", "third"};
+
+ runTestWrite(expectedElements, 1);
+ }
+
+ @Test
+ public void testAvroSinkShardedWrite() throws Exception {
+ String[] expectedElements = new String[] {"first", "second", "third", "fourth", "fifth"};
+
+ runTestWrite(expectedElements, 4);
+ }
+ // TODO: for Write only, test withSuffix,
// withShardNameTemplate and withoutSharding.
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/65eebc76/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
index 0bd6ce7..6d4d6e7 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
@@ -43,6 +43,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.TestCredential;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -199,36 +200,57 @@ public class TextIOTest {
}
<T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception {
- File tmpFile = tmpFolder.newFile("file.txt");
- String filename = tmpFile.getPath();
+ runTestWrite(elems, coder, 1);
+ }
+
+ <T> void runTestWrite(T[] elems, Coder<T> coder, int numShards) throws Exception {
+ String filename = tmpFolder.newFile("file.txt").getPath();
Pipeline p = TestPipeline.create();
- PCollection<T> input =
- p.apply(Create.of(Arrays.asList(elems)).withCoder(coder));
+ PCollection<T> input = p.apply(Create.of(Arrays.asList(elems)).withCoder(coder));
TextIO.Write.Bound<T> write;
if (coder.equals(StringUtf8Coder.of())) {
- TextIO.Write.Bound<String> writeStrings =
- TextIO.Write.to(filename).withoutSharding();
+ TextIO.Write.Bound<String> writeStrings = TextIO.Write.to(filename);
// T==String
write = (TextIO.Write.Bound<T>) writeStrings;
} else {
- write = TextIO.Write.to(filename).withCoder(coder).withoutSharding();
+ write = TextIO.Write.to(filename).withCoder(coder);
+ }
+ if (numShards == 1) {
+ write = write.withoutSharding();
+ } else {
+ write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
}
input.apply(write);
p.run();
+ List<File> expectedFiles = new ArrayList<>();
+ if (numShards == 1) {
+ expectedFiles.add(new File(filename));
+ } else {
+ for (int i = 0; i < numShards; i++) {
+ expectedFiles.add(
+ new File(
+ tmpFolder.getRoot(),
+ IOChannelUtils.constructName(
+ "file.txt", ShardNameTemplate.INDEX_OF_MAX, "", i, numShards)));
+ }
+ }
+
List<String> actual = new ArrayList<>();
- try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
- for (;;) {
- String line = reader.readLine();
- if (line == null) {
- break;
+ for (File tmpFile : expectedFiles) {
+ try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
+ for (;;) {
+ String line = reader.readLine();
+ if (line == null) {
+ break;
+ }
+ actual.add(line);
}
- actual.add(line);
}
}
@@ -240,8 +262,7 @@ public class TextIOTest {
expected[i] = line;
}
- assertThat(actual,
- containsInAnyOrder(expected));
+ assertThat(actual, containsInAnyOrder(expected));
}
@Test
@@ -286,6 +307,11 @@ public class TextIOTest {
}
@Test
+ public void testShardedWrite() throws Exception {
+ runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5);
+ }
+
+ @Test
public void testUnsupportedFilePattern() throws IOException {
File outFolder = tmpFolder.newFolder();
// Windows doesn't like resolving paths with * in them.