You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/05 01:14:38 UTC
beam git commit: Revert "This closes #2905"
Repository: beam
Updated Branches:
refs/heads/master 9f27c33ec -> b130d7aac
Revert "This closes #2905"
I misread Jenkins and should not have merged. I am sorry.
This reverts commit 9f27c33ec7e7c61afbca0395f932275b354eb428, reversing
changes made to 5fc3d335919207c23bc6fd2047e9e38351754ff1.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b130d7aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b130d7aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b130d7aa
Branch: refs/heads/master
Commit: b130d7aac466860ed5a7abec0bef33d9e0dd3c6d
Parents: 9f27c33
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 4 18:14:13 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 18:14:13 2017 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 12 -
.../beam/examples/WindowedWordCountIT.java | 8 +-
.../org/apache/beam/examples/WordCountIT.java | 2 +-
.../examples/testing/ExplicitShardedFile.java | 126 -----------
.../examples/testing/FileChecksumMatcher.java | 168 --------------
.../testing/FileChecksumMatcherTest.java | 148 ------------
.../examples/testing/NumberedShardedFile.java | 226 -------------------
.../testing/NumberedShardedFileTest.java | 182 ---------------
.../beam/examples/testing/ShardedFile.java | 42 ----
.../beam/sdk/coders/StructuralByteArray.java | 5 +-
.../beam/sdk/testing/FileChecksumMatcher.java | 169 ++++++++++++++
.../beam/sdk/testing/MatcherDeserializer.java | 4 +-
.../beam/sdk/testing/MatcherSerializer.java | 4 +-
.../org/apache/beam/sdk/util/CoderUtils.java | 8 +-
.../beam/sdk/util/ExplicitShardedFile.java | 125 ++++++++++
.../beam/sdk/util/NumberedShardedFile.java | 225 ++++++++++++++++++
.../org/apache/beam/sdk/util/ShardedFile.java | 42 ++++
.../sdk/testing/FileChecksumMatcherTest.java | 147 ++++++++++++
.../beam/sdk/util/NumberedShardedFileTest.java | 182 +++++++++++++++
19 files changed, 905 insertions(+), 920 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 09473cd..d673da2 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -492,11 +492,6 @@
</dependency>
<dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
@@ -559,13 +554,6 @@
-->
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-core</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 01bc402..b5eddb5 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -33,21 +33,21 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles;
-import org.apache.beam.examples.testing.ExplicitShardedFile;
-import org.apache.beam.examples.testing.FileChecksumMatcher;
-import org.apache.beam.examples.testing.NumberedShardedFile;
-import org.apache.beam.examples.testing.ShardedFile;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.StreamingIT;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.ExplicitShardedFile;
import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.NumberedShardedFile;
+import org.apache.beam.sdk.util.ShardedFile;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.joda.time.Duration;
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index 236ca9c..1660b61 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -20,10 +20,10 @@ package org.apache.beam.examples;
import java.util.Date;
import org.apache.beam.examples.WordCount.WordCountOptions;
-import org.apache.beam.examples.testing.FileChecksumMatcher;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/ExplicitShardedFile.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/ExplicitShardedFile.java b/examples/java/src/test/java/org/apache/beam/examples/testing/ExplicitShardedFile.java
deleted file mode 100644
index 1dc7a62..0000000
--- a/examples/java/src/test/java/org/apache/beam/examples/testing/ExplicitShardedFile.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.examples.testing;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.io.CharStreams;
-import java.io.IOException;
-import java.io.Reader;
-import java.nio.channels.Channels;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** A sharded file where the file names are simply provided. */
-public class ExplicitShardedFile implements ShardedFile {
-
- private static final Logger LOG = LoggerFactory.getLogger(ExplicitShardedFile.class);
-
- private static final int MAX_READ_RETRIES = 4;
- private static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
- static final FluentBackoff BACK_OFF_FACTORY =
- FluentBackoff.DEFAULT
- .withInitialBackoff(DEFAULT_SLEEP_DURATION)
- .withMaxRetries(MAX_READ_RETRIES);
-
- private final List<Metadata> files;
-
- /** Constructs an {@link ExplicitShardedFile} for the given files. */
- public ExplicitShardedFile(Collection<String> files) throws IOException {
- this.files = new LinkedList<>();
- for (String file: files) {
- this.files.add(FileSystems.matchSingleFileSpec(file));
- }
- }
-
- @Override
- public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
- throws IOException, InterruptedException {
- if (files.isEmpty()) {
- return Collections.emptyList();
- }
-
- IOException lastException = null;
-
- do {
- try {
- // Read data from file paths
- return readLines(files);
- } catch (IOException e) {
- // Ignore and retry
- lastException = e;
- LOG.warn("Error in file reading. Ignore and retry.");
- }
- } while (BackOffUtils.next(sleeper, backOff));
- // Failed after max retries
- throw new IOException(
- String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES),
- lastException);
- }
-
- /**
- * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}.
- *
- * <p>Because of eventual consistency, reads may discover no files or fewer files than the shard
- * template implies. In this case, the read is considered to have failed.
- */
- public List<String> readFilesWithRetries() throws IOException, InterruptedException {
- return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
- }
-
- @Override
- public String toString() {
- return String.format("explicit sharded file (%s)", Joiner.on(", ").join(files));
- }
-
- /**
- * Reads all the lines of all the files.
- *
- * <p>Not suitable for use except in testing of small data, since the data size may be far more
- * than can be reasonably processed serially, in-memory, by a single thread.
- */
- @VisibleForTesting
- List<String> readLines(Collection<Metadata> files) throws IOException {
- List<String> allLines = Lists.newArrayList();
- int i = 1;
- for (Metadata file : files) {
- try (Reader reader = Channels.newReader(FileSystems.open(file.resourceId()),
- StandardCharsets.UTF_8.name())) {
- List<String> lines = CharStreams.readLines(reader);
- allLines.addAll(lines);
- LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
- }
- i++;
- }
- return allLines;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcher.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcher.java b/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcher.java
deleted file mode 100644
index 8a0af11..0000000
--- a/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcher.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.examples.testing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.api.client.util.Sleeper;
-import com.google.common.base.Strings;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Pattern;
-import javax.annotation.Nonnull;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.testing.SerializableMatcher;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Matcher to verify file checksum in E2E test.
- *
- * <p>For example:
- * <pre>{@code
- * assertThat(job, new FileChecksumMatcher(checksumString, filePath));
- * }</pre>
- * or
- * <pre>{@code
- * assertThat(job, new FileChecksumMatcher(checksumString, filePath, shardTemplate));
- * }</pre>
- *
- * <p>Checksum of outputs is generated based on SHA-1 algorithm. If output file is empty,
- * SHA-1 hash of empty string (da39a3ee5e6b4b0d3255bfef95601890afd80709) is used as expected.
- */
-public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
- implements SerializableMatcher<PipelineResult> {
-
- private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class);
-
- static final int MAX_READ_RETRIES = 4;
- static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
- static final FluentBackoff BACK_OFF_FACTORY =
- FluentBackoff.DEFAULT
- .withInitialBackoff(DEFAULT_SLEEP_DURATION)
- .withMaxRetries(MAX_READ_RETRIES);
-
- private static final Pattern DEFAULT_SHARD_TEMPLATE =
- Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)");
-
- private final String expectedChecksum;
- private String actualChecksum;
- private final ShardedFile shardedFile;
-
- /**
- * Constructor that uses default shard template.
- *
- * @param checksum expected checksum string used to verify file content.
- * @param filePath path of files that's to be verified.
- */
- public FileChecksumMatcher(String checksum, String filePath) {
- this(checksum, filePath, DEFAULT_SHARD_TEMPLATE);
- }
-
- /**
- * Constructor using a custom shard template.
- *
- * @param checksum expected checksum string used to verify file content.
- * @param filePath path of files that's to be verified.
- * @param shardTemplate template of shard name to parse out the total number of shards
- * which is used in I/O retry to avoid inconsistency of filesystem.
- * Customized template should assign name "numshards" to capturing
- * group - total shard number.
- */
- public FileChecksumMatcher(String checksum, String filePath, Pattern shardTemplate) {
- checkArgument(
- !Strings.isNullOrEmpty(checksum),
- "Expected valid checksum, but received %s", checksum);
- checkArgument(
- !Strings.isNullOrEmpty(filePath),
- "Expected valid file path, but received %s", filePath);
- checkNotNull(
- shardTemplate,
- "Expected non-null shard pattern. "
- + "Please call the other constructor to use default pattern: %s",
- DEFAULT_SHARD_TEMPLATE);
-
- this.expectedChecksum = checksum;
- this.shardedFile = new NumberedShardedFile(filePath, shardTemplate);
- }
-
- /**
- * Constructor using an entirely custom {@link ShardedFile} implementation.
- *
- * <p>For internal use only.
- */
- public FileChecksumMatcher(String expectedChecksum, ShardedFile shardedFile) {
- this.expectedChecksum = expectedChecksum;
- this.shardedFile = shardedFile;
- }
-
- @Override
- public boolean matchesSafely(PipelineResult pipelineResult) {
- // Load output data
- List<String> outputs;
- try {
- outputs = shardedFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("Failed to read from: %s", shardedFile), e);
- }
-
- // Verify outputs. Checksum is computed using SHA-1 algorithm
- actualChecksum = computeHash(outputs);
- LOG.debug("Generated checksum: {}", actualChecksum);
-
- return actualChecksum.equals(expectedChecksum);
- }
-
- private String computeHash(@Nonnull List<String> strs) {
- if (strs.isEmpty()) {
- return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
- }
-
- List<HashCode> hashCodes = new ArrayList<>();
- for (String str : strs) {
- hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
- }
- return Hashing.combineUnordered(hashCodes).toString();
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("Expected checksum is (")
- .appendText(expectedChecksum)
- .appendText(")");
- }
-
- @Override
- public void describeMismatchSafely(PipelineResult pResult, Description description) {
- description
- .appendText("was (")
- .appendText(actualChecksum)
- .appendText(")");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcherTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcherTest.java b/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcherTest.java
deleted file mode 100644
index 4d6eb6b..0000000
--- a/examples/java/src/test/java/org/apache/beam/examples/testing/FileChecksumMatcherTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.testing;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-
-import com.google.api.client.util.BackOff;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.regex.Pattern;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-
-/** Tests for {@link FileChecksumMatcher}. */
-@RunWith(JUnit4.class)
-public class FileChecksumMatcherTest {
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
- @Rule
- public ExpectedException thrown = ExpectedException.none();
- @Rule
- public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
-
- @Mock
- private PipelineResult pResult = Mockito.mock(PipelineResult.class);
-
- private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff();
-
- @Test
- public void testPreconditionChecksumIsNull() throws IOException {
- String tmpPath = tmpFolder.newFile().getPath();
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(containsString("Expected valid checksum, but received"));
- new FileChecksumMatcher(null, tmpPath);
- }
-
- @Test
- public void testPreconditionChecksumIsEmpty() throws IOException {
- String tmpPath = tmpFolder.newFile().getPath();
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(containsString("Expected valid checksum, but received"));
- new FileChecksumMatcher("", tmpPath);
- }
-
- @Test
- public void testPreconditionFilePathIsEmpty() {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(containsString("Expected valid file path, but received"));
- new FileChecksumMatcher("checksumString", "");
- }
-
- @Test
- public void testPreconditionShardTemplateIsNull() throws IOException {
- String tmpPath = tmpFolder.newFile().getPath();
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage(
- containsString(
- "Expected non-null shard pattern. "
- + "Please call the other constructor to use default pattern:"));
- new FileChecksumMatcher("checksumString", tmpPath, null);
- }
-
- @Test
- public void testMatcherThatVerifiesSingleFile() throws IOException{
- File tmpFile = tmpFolder.newFile("result-000-of-001");
- Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
- FileChecksumMatcher matcher =
- new FileChecksumMatcher("a8772322f5d7b851777f820fc79d050f9d302915", tmpFile.getPath());
-
- assertThat(pResult, matcher);
- }
-
- @Test
- public void testMatcherThatVerifiesMultipleFiles() throws IOException {
- File tmpFile1 = tmpFolder.newFile("result-000-of-002");
- File tmpFile2 = tmpFolder.newFile("result-001-of-002");
- File tmpFile3 = tmpFolder.newFile("tmp");
- Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
- Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
- Files.write("tmp", tmpFile3, StandardCharsets.UTF_8);
-
- FileChecksumMatcher matcher =
- new FileChecksumMatcher(
- "90552392c28396935fe4f123bd0b5c2d0f6260c8",
- tmpFolder.getRoot().toPath().resolve("result-*").toString());
-
- assertThat(pResult, matcher);
- }
-
- @Test
- public void testMatcherThatVerifiesFileWithEmptyContent() throws IOException {
- File emptyFile = tmpFolder.newFile("result-000-of-001");
- Files.write("", emptyFile, StandardCharsets.UTF_8);
- FileChecksumMatcher matcher =
- new FileChecksumMatcher(
- "da39a3ee5e6b4b0d3255bfef95601890afd80709",
- tmpFolder.getRoot().toPath().resolve("*").toString());
-
- assertThat(pResult, matcher);
- }
-
- @Test
- public void testMatcherThatUsesCustomizedTemplate() throws Exception {
- // Customized template: resultSSS-totalNNN
- File tmpFile1 = tmpFolder.newFile("result0-total2");
- File tmpFile2 = tmpFolder.newFile("result1-total2");
- Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
- Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
-
- Pattern customizedTemplate =
- Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)");
- FileChecksumMatcher matcher = new FileChecksumMatcher(
- "90552392c28396935fe4f123bd0b5c2d0f6260c8",
- tmpFolder.getRoot().toPath().resolve("*").toString(),
- customizedTemplate);
-
- assertThat(pResult, matcher);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFile.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFile.java b/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFile.java
deleted file mode 100644
index f0b9c2d..0000000
--- a/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFile.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.examples.testing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
-import com.google.common.io.CharStreams;
-import java.io.IOException;
-import java.io.Reader;
-import java.nio.channels.Channels;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.annotation.Nonnull;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility methods for working with sharded files. For internal use only; many parameters
- * are just hardcoded to allow existing uses to work OK.
- */
-public class NumberedShardedFile implements ShardedFile {
-
- private static final Logger LOG = LoggerFactory.getLogger(NumberedShardedFile.class);
-
- static final int MAX_READ_RETRIES = 4;
- static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
- static final FluentBackoff BACK_OFF_FACTORY =
- FluentBackoff.DEFAULT
- .withInitialBackoff(DEFAULT_SLEEP_DURATION)
- .withMaxRetries(MAX_READ_RETRIES);
-
- private static final Pattern DEFAULT_SHARD_TEMPLATE =
- Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)");
-
- private final String filePattern;
- private final Pattern shardTemplate;
-
- /**
- * Constructor that uses default shard template.
- *
- * @param filePattern path or glob of files to include
- */
- public NumberedShardedFile(String filePattern) {
- this(filePattern, DEFAULT_SHARD_TEMPLATE);
- }
-
- /**
- * Constructor.
- *
- * @param filePattern path or glob of files to include
- * @param shardTemplate template of shard name to parse out the total number of shards
- * which is used in I/O retry to avoid inconsistency of filesystem.
- * Customized template should assign name "numshards" to capturing
- * group - total shard number.
- */
- public NumberedShardedFile(String filePattern, Pattern shardTemplate) {
- checkArgument(
- !Strings.isNullOrEmpty(filePattern),
- "Expected valid file path, but received %s", filePattern);
- checkNotNull(
- shardTemplate,
- "Expected non-null shard pattern. "
- + "Please call the other constructor to use default pattern: %s",
- DEFAULT_SHARD_TEMPLATE);
-
- this.filePattern = filePattern;
- this.shardTemplate = shardTemplate;
- }
-
- public String getFilePattern() {
- return filePattern;
- }
-
- /**
- * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}.
- *
- * <p>Because of eventual consistency, reads may discover no files or fewer files than
- * the shard template implies. In this case, the read is considered to have failed.
- */
- @Override
- public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
- throws IOException, InterruptedException {
- IOException lastException = null;
-
- do {
- try {
- // Match inputPath which may contains glob
- Collection<Metadata> files = Iterables.getOnlyElement(
- FileSystems.match(Collections.singletonList(filePattern))).metadata();
-
- LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePattern);
-
- if (files.isEmpty() || !checkTotalNumOfFiles(files)) {
- continue;
- }
-
- // Read data from file paths
- return readLines(files);
- } catch (IOException e) {
- // Ignore and retry
- lastException = e;
- LOG.warn("Error in file reading. Ignore and retry.");
- }
- } while(BackOffUtils.next(sleeper, backOff));
- // Failed after max retries
- throw new IOException(
- String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES),
- lastException);
- }
-
- /**
- * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}.
- *
- * <p>Because of eventual consistency, reads may discover no files or fewer files than
- * the shard template implies. In this case, the read is considered to have failed.
- */
- public List<String> readFilesWithRetries()
- throws IOException, InterruptedException {
- return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
- }
-
- @Override
- public String toString() {
- return String.format("%s with shard template '%s'", filePattern, shardTemplate);
- }
-
- /**
- * Reads all the lines of all the files.
- *
- * <p>Not suitable for use except in testing of small data, since the data size may be far more
- * than can be reasonably processed serially, in-memory, by a single thread.
- */
- @VisibleForTesting
- List<String> readLines(Collection<Metadata> files) throws IOException {
- List<String> allLines = Lists.newArrayList();
- int i = 1;
- for (Metadata file : files) {
- try (Reader reader =
- Channels.newReader(FileSystems.open(file.resourceId()),
- StandardCharsets.UTF_8.name())) {
- List<String> lines = CharStreams.readLines(reader);
- allLines.addAll(lines);
- LOG.debug(
- "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
- }
- i++;
- }
- return allLines;
- }
-
- /**
- * Check if total number of files is correct by comparing with the number that
- * is parsed from shard name using a name template. If no template is specified,
- * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total
- * number of files.
- *
- * @return {@code true} if at least one shard name matches template and total number
- * of given files equals the number that is parsed from shard name.
- */
- @VisibleForTesting
- boolean checkTotalNumOfFiles(Collection<Metadata> files) {
- for (Metadata fileMedadata : files) {
- String fileName = fileMedadata.resourceId().getFilename();
-
- if (fileName == null) {
- // this path has zero elements
- continue;
- }
- Matcher matcher = shardTemplate.matcher(fileName);
- if (!matcher.matches()) {
- // shard name doesn't match the pattern, check with the next shard
- continue;
- }
- // once match, extract total number of shards and compare to file list
- return files.size() == Integer.parseInt(matcher.group("numshards"));
- }
- return false;
- }
-
- private String computeHash(@Nonnull List<String> strs) {
- if (strs.isEmpty()) {
- return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
- }
-
- List<HashCode> hashCodes = new ArrayList<>();
- for (String str : strs) {
- hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
- }
- return Hashing.combineUnordered(hashCodes).toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFileTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFileTest.java b/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFileTest.java
deleted file mode 100644
index 83b8a4f..0000000
--- a/examples/java/src/test/java/org/apache/beam/examples/testing/NumberedShardedFileTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.testing;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.empty;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
-
-import com.google.api.client.util.BackOff;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.regex.Pattern;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.LocalResources;
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-
-/** Tests for {@link NumberedShardedFile}. */
-@RunWith(JUnit4.class)
-public class NumberedShardedFileTest {
- @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
- @Rule public ExpectedException thrown = ExpectedException.none();
- @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
-
- @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class);
-
- private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff();
- private String filePattern;
-
- @Before
- public void setup() throws IOException {
- filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve(
- "*", StandardResolveOptions.RESOLVE_FILE).toString();
- }
-
- @Test
- public void testPreconditionFilePathIsNull() {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(containsString("Expected valid file path, but received"));
- new NumberedShardedFile(null);
- }
-
- @Test
- public void testPreconditionFilePathIsEmpty() {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(containsString("Expected valid file path, but received"));
- new NumberedShardedFile("");
- }
-
- @Test
- public void testReadMultipleShards() throws Exception {
- String
- contents1 = "To be or not to be, ",
- contents2 = "it is not a question.",
- contents3 = "should not be included";
-
- File tmpFile1 = tmpFolder.newFile("result-000-of-002");
- File tmpFile2 = tmpFolder.newFile("result-001-of-002");
- File tmpFile3 = tmpFolder.newFile("tmp");
- Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
- Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
- Files.write(contents3, tmpFile3, StandardCharsets.UTF_8);
-
- filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve(
- "result-*", StandardResolveOptions.RESOLVE_FILE).toString();
- NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
-
- assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2));
- }
-
- @Test
- public void testReadEmpty() throws Exception {
- File emptyFile = tmpFolder.newFile("result-000-of-001");
- Files.write("", emptyFile, StandardCharsets.UTF_8);
- NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
-
- assertThat(shardedFile.readFilesWithRetries(), empty());
- }
-
- @Test
- public void testReadCustomTemplate() throws Exception {
- String contents1 = "To be or not to be, ", contents2 = "it is not a question.";
-
- // Customized template: resultSSS-totalNNN
- File tmpFile1 = tmpFolder.newFile("result0-total2");
- File tmpFile2 = tmpFolder.newFile("result1-total2");
- Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
- Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
-
- Pattern customizedTemplate =
- Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)");
- NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern, customizedTemplate);
-
- assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2));
- }
-
- @Test
- public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception {
- File tmpFile = tmpFolder.newFile();
- Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
-
- NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern,
- Pattern.compile("incorrect-template"));
-
- thrown.expect(IOException.class);
- thrown.expectMessage(
- containsString(
- "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
- shardedFile.readFilesWithRetries(fastClock, backOff);
- }
-
- @Test
- public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
- File tmpFile = tmpFolder.newFile();
- Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
- NumberedShardedFile shardedFile = spy(new NumberedShardedFile(filePattern));
- doThrow(IOException.class)
- .when(shardedFile)
- .readLines(anyCollection());
-
- thrown.expect(IOException.class);
- thrown.expectMessage(
- containsString(
- "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
- shardedFile.readFilesWithRetries(fastClock, backOff);
- }
-
- @Test
- public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception {
- NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
-
- thrown.expect(IOException.class);
- thrown.expectMessage(
- containsString(
- "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
- shardedFile.readFilesWithRetries(fastClock, backOff);
- }
-
- @Test
- public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception {
- tmpFolder.newFile("result-000-of-001");
- tmpFolder.newFile("tmp-result-000-of-001");
-
- NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
-
- thrown.expect(IOException.class);
- thrown.expectMessage(
- containsString(
- "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
- shardedFile.readFilesWithRetries(fastClock, backOff);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/examples/java/src/test/java/org/apache/beam/examples/testing/ShardedFile.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/testing/ShardedFile.java b/examples/java/src/test/java/org/apache/beam/examples/testing/ShardedFile.java
deleted file mode 100644
index cd9537c..0000000
--- a/examples/java/src/test/java/org/apache/beam/examples/testing/ShardedFile.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.examples.testing;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.testing.SerializableMatcher;
-
-/**
- * Bare-bones class for using sharded files.
- *
- * <p>For internal use only; used only in SDK tests. Must be {@link Serializable} so it can be
- * shipped as a {@link SerializableMatcher}.
- */
-public interface ShardedFile extends Serializable {
-
- /**
- * Reads the lines from all shards of this file using the provided {@link Sleeper} and {@link
- * BackOff}.
- */
- List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
- throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java
index 0ab0dea..226f79c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuralByteArray.java
@@ -17,7 +17,8 @@
*/
package org.apache.beam.sdk.coders;
-import com.google.common.io.BaseEncoding;
+import static com.google.api.client.util.Base64.encodeBase64String;
+
import java.util.Arrays;
/**
@@ -52,6 +53,6 @@ public class StructuralByteArray {
@Override
public String toString() {
- return "base64:" + BaseEncoding.base64().encode(value);
+ return "base64:" + encodeBase64String(value);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
new file mode 100644
index 0000000..82a6b71
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.util.Sleeper;
+import com.google.common.base.Strings;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.NumberedShardedFile;
+import org.apache.beam.sdk.util.ShardedFile;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Matcher to verify file checksum in E2E test.
+ *
+ * <p>For example:
+ * <pre>{@code
+ * assertThat(job, new FileChecksumMatcher(checksumString, filePath));
+ * }</pre>
+ * or
+ * <pre>{@code
+ * assertThat(job, new FileChecksumMatcher(checksumString, filePath, shardTemplate));
+ * }</pre>
+ *
+ * <p>Checksum of outputs is generated based on SHA-1 algorithm. If output file is empty,
+ * SHA-1 hash of empty string (da39a3ee5e6b4b0d3255bfef95601890afd80709) is used as expected.
+ */
+public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult>
+ implements SerializableMatcher<PipelineResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class);
+
+ static final int MAX_READ_RETRIES = 4;
+ static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
+ static final FluentBackoff BACK_OFF_FACTORY =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+ .withMaxRetries(MAX_READ_RETRIES);
+
+ private static final Pattern DEFAULT_SHARD_TEMPLATE =
+ Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)");
+
+ private final String expectedChecksum;
+ private String actualChecksum;
+ private final ShardedFile shardedFile;
+
+ /**
+ * Constructor that uses default shard template.
+ *
+ * @param checksum expected checksum string used to verify file content.
+ * @param filePath path of files that's to be verified.
+ */
+ public FileChecksumMatcher(String checksum, String filePath) {
+ this(checksum, filePath, DEFAULT_SHARD_TEMPLATE);
+ }
+
+ /**
+ * Constructor using a custom shard template.
+ *
+ * @param checksum expected checksum string used to verify file content.
+ * @param filePath path of files that's to be verified.
+ * @param shardTemplate template of shard name to parse out the total number of shards
+ * which is used in I/O retry to avoid inconsistency of filesystem.
+ * Customized template should assign name "numshards" to capturing
+ * group - total shard number.
+ */
+ public FileChecksumMatcher(String checksum, String filePath, Pattern shardTemplate) {
+ checkArgument(
+ !Strings.isNullOrEmpty(checksum),
+ "Expected valid checksum, but received %s", checksum);
+ checkArgument(
+ !Strings.isNullOrEmpty(filePath),
+ "Expected valid file path, but received %s", filePath);
+ checkNotNull(
+ shardTemplate,
+ "Expected non-null shard pattern. "
+ + "Please call the other constructor to use default pattern: %s",
+ DEFAULT_SHARD_TEMPLATE);
+
+ this.expectedChecksum = checksum;
+ this.shardedFile = new NumberedShardedFile(filePath, shardTemplate);
+ }
+
+ /**
+ * Constructor using an entirely custom {@link ShardedFile} implementation.
+ *
+ * <p>For internal use only.
+ */
+ public FileChecksumMatcher(String expectedChecksum, ShardedFile shardedFile) {
+ this.expectedChecksum = expectedChecksum;
+ this.shardedFile = shardedFile;
+ }
+
+ @Override
+ public boolean matchesSafely(PipelineResult pipelineResult) {
+ // Load output data
+ List<String> outputs;
+ try {
+ outputs = shardedFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to read from: %s", shardedFile), e);
+ }
+
+ // Verify outputs. Checksum is computed using SHA-1 algorithm
+ actualChecksum = computeHash(outputs);
+ LOG.debug("Generated checksum: {}", actualChecksum);
+
+ return actualChecksum.equals(expectedChecksum);
+ }
+
+ private String computeHash(@Nonnull List<String> strs) {
+ if (strs.isEmpty()) {
+ return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
+ }
+
+ List<HashCode> hashCodes = new ArrayList<>();
+ for (String str : strs) {
+ hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
+ }
+ return Hashing.combineUnordered(hashCodes).toString();
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("Expected checksum is (")
+ .appendText(expectedChecksum)
+ .appendText(")");
+ }
+
+ @Override
+ public void describeMismatchSafely(PipelineResult pResult, Description description) {
+ description
+ .appendText("was (")
+ .appendText(actualChecksum)
+ .appendText(")");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
index e7aa5a7..6ca07ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.io.BaseEncoding;
+import com.google.api.client.util.Base64;
import java.io.IOException;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -36,7 +36,7 @@ class MatcherDeserializer extends JsonDeserializer<SerializableMatcher<?>> {
throws IOException, JsonProcessingException {
ObjectNode node = jsonParser.readValueAsTree();
String matcher = node.get("matcher").asText();
- byte[] in = BaseEncoding.base64().decode(matcher);
+ byte[] in = Base64.decodeBase64(matcher);
return (SerializableMatcher<?>) SerializableUtils
.deserializeFromByteArray(in, "SerializableMatcher");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
index 35375f6..2b4584c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
-import com.google.common.io.BaseEncoding;
+import com.google.api.client.util.Base64;
import java.io.IOException;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -33,7 +33,7 @@ class MatcherSerializer extends JsonSerializer<SerializableMatcher<?>> {
public void serialize(SerializableMatcher<?> matcher, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
byte[] out = SerializableUtils.serializeToByteArray(matcher);
- String encodedString = BaseEncoding.base64().encode(out);
+ String encodedString = Base64.encodeBase64String(out);
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("matcher", encodedString);
jsonGenerator.writeEndObject();
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
index e3ae485..3380a10 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import com.google.api.client.util.Base64;
import com.google.common.base.Throwables;
-import com.google.common.io.BaseEncoding;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -163,7 +163,7 @@ public final class CoderUtils {
public static <T> String encodeToBase64(Coder<T> coder, T value)
throws CoderException {
byte[] rawValue = encodeToByteArray(coder, value);
- return BaseEncoding.base64Url().omitPadding().encode(rawValue);
+ return Base64.encodeBase64URLSafeString(rawValue);
}
/**
@@ -171,9 +171,7 @@ public final class CoderUtils {
*/
public static <T> T decodeFromBase64(Coder<T> coder, String encodedValue) throws CoderException {
return decodeFromSafeStream(
- coder,
- new ByteArrayInputStream(BaseEncoding.base64Url().omitPadding().decode(encodedValue)),
- Coder.Context.OUTER);
+ coder, new ByteArrayInputStream(Base64.decodeBase64(encodedValue)), Coder.Context.OUTER);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
new file mode 100644
index 0000000..0f184de
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.io.CharStreams;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A sharded file where the file names are simply provided. */
+public class ExplicitShardedFile implements ShardedFile {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExplicitShardedFile.class);
+
+ private static final int MAX_READ_RETRIES = 4;
+ private static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
+ static final FluentBackoff BACK_OFF_FACTORY =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+ .withMaxRetries(MAX_READ_RETRIES);
+
+ private final List<Metadata> files;
+
+ /** Constructs an {@link ExplicitShardedFile} for the given files. */
+ public ExplicitShardedFile(Collection<String> files) throws IOException {
+ this.files = new LinkedList<>();
+ for (String file: files) {
+ this.files.add(FileSystems.matchSingleFileSpec(file));
+ }
+ }
+
+ @Override
+ public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
+ throws IOException, InterruptedException {
+ if (files.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ IOException lastException = null;
+
+ do {
+ try {
+ // Read data from file paths
+ return readLines(files);
+ } catch (IOException e) {
+ // Ignore and retry
+ lastException = e;
+ LOG.warn("Error in file reading. Ignore and retry.");
+ }
+ } while (BackOffUtils.next(sleeper, backOff));
+ // Failed after max retries
+ throw new IOException(
+ String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES),
+ lastException);
+ }
+
+ /**
+ * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}.
+ *
+ * <p>Because of eventual consistency, reads may discover no files or fewer files than the shard
+ * template implies. In this case, the read is considered to have failed.
+ */
+ public List<String> readFilesWithRetries() throws IOException, InterruptedException {
+ return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("explicit sharded file (%s)", Joiner.on(", ").join(files));
+ }
+
+ /**
+ * Reads all the lines of all the files.
+ *
+ * <p>Not suitable for use except in testing of small data, since the data size may be far more
+ * than can be reasonably processed serially, in-memory, by a single thread.
+ */
+ @VisibleForTesting
+ List<String> readLines(Collection<Metadata> files) throws IOException {
+ List<String> allLines = Lists.newArrayList();
+ int i = 1;
+ for (Metadata file : files) {
+ try (Reader reader = Channels.newReader(FileSystems.open(file.resourceId()),
+ StandardCharsets.UTF_8.name())) {
+ List<String> lines = CharStreams.readLines(reader);
+ allLines.addAll(lines);
+ LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
+ }
+ i++;
+ }
+ return allLines;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
new file mode 100644
index 0000000..e18dd96
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import com.google.common.io.CharStreams;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods for working with sharded files. For internal use only; many parameters
+ * are just hardcoded to allow existing uses to work OK.
+ */
+public class NumberedShardedFile implements ShardedFile {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NumberedShardedFile.class);
+
+ static final int MAX_READ_RETRIES = 4;
+ static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
+ static final FluentBackoff BACK_OFF_FACTORY =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+ .withMaxRetries(MAX_READ_RETRIES);
+
+ private static final Pattern DEFAULT_SHARD_TEMPLATE =
+ Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)");
+
+ private final String filePattern;
+ private final Pattern shardTemplate;
+
+ /**
+ * Constructor that uses default shard template.
+ *
+ * @param filePattern path or glob of files to include
+ */
+ public NumberedShardedFile(String filePattern) {
+ this(filePattern, DEFAULT_SHARD_TEMPLATE);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param filePattern path or glob of files to include
+ * @param shardTemplate template of shard name to parse out the total number of shards
+ * which is used in I/O retry to avoid inconsistency of filesystem.
+ * Customized template should assign name "numshards" to capturing
+ * group - total shard number.
+ */
+ public NumberedShardedFile(String filePattern, Pattern shardTemplate) {
+ checkArgument(
+ !Strings.isNullOrEmpty(filePattern),
+ "Expected valid file path, but received %s", filePattern);
+ checkNotNull(
+ shardTemplate,
+ "Expected non-null shard pattern. "
+ + "Please call the other constructor to use default pattern: %s",
+ DEFAULT_SHARD_TEMPLATE);
+
+ this.filePattern = filePattern;
+ this.shardTemplate = shardTemplate;
+ }
+
+ public String getFilePattern() {
+ return filePattern;
+ }
+
+ /**
+ * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}.
+ *
+ * <p>Because of eventual consistency, reads may discover no files or fewer files than
+ * the shard template implies. In this case, the read is considered to have failed.
+ */
+ @Override
+ public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
+ throws IOException, InterruptedException {
+ IOException lastException = null;
+
+ do {
+ try {
+ // Match inputPath which may contains glob
+ Collection<Metadata> files = Iterables.getOnlyElement(
+ FileSystems.match(Collections.singletonList(filePattern))).metadata();
+
+ LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePattern);
+
+ if (files.isEmpty() || !checkTotalNumOfFiles(files)) {
+ continue;
+ }
+
+ // Read data from file paths
+ return readLines(files);
+ } catch (IOException e) {
+ // Ignore and retry
+ lastException = e;
+ LOG.warn("Error in file reading. Ignore and retry.");
+ }
+ } while(BackOffUtils.next(sleeper, backOff));
+ // Failed after max retries
+ throw new IOException(
+ String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES),
+ lastException);
+ }
+
+ /**
+ * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}.
+ *
+ * <p>Because of eventual consistency, reads may discover no files or fewer files than
+ * the shard template implies. In this case, the read is considered to have failed.
+ */
+ public List<String> readFilesWithRetries()
+ throws IOException, InterruptedException {
+ return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s with shard template '%s'", filePattern, shardTemplate);
+ }
+
+ /**
+ * Reads all the lines of all the files.
+ *
+ * <p>Not suitable for use except in testing of small data, since the data size may be far more
+ * than can be reasonably processed serially, in-memory, by a single thread.
+ */
+ @VisibleForTesting
+ List<String> readLines(Collection<Metadata> files) throws IOException {
+ List<String> allLines = Lists.newArrayList();
+ int i = 1;
+ for (Metadata file : files) {
+ try (Reader reader =
+ Channels.newReader(FileSystems.open(file.resourceId()),
+ StandardCharsets.UTF_8.name())) {
+ List<String> lines = CharStreams.readLines(reader);
+ allLines.addAll(lines);
+ LOG.debug(
+ "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
+ }
+ i++;
+ }
+ return allLines;
+ }
+
+ /**
+ * Check if total number of files is correct by comparing with the number that
+ * is parsed from shard name using a name template. If no template is specified,
+ * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total
+ * number of files.
+ *
+ * @return {@code true} if at least one shard name matches template and total number
+ * of given files equals the number that is parsed from shard name.
+ */
+ @VisibleForTesting
+ boolean checkTotalNumOfFiles(Collection<Metadata> files) {
+ for (Metadata fileMedadata : files) {
+ String fileName = fileMedadata.resourceId().getFilename();
+
+ if (fileName == null) {
+ // this path has zero elements
+ continue;
+ }
+ Matcher matcher = shardTemplate.matcher(fileName);
+ if (!matcher.matches()) {
+ // shard name doesn't match the pattern, check with the next shard
+ continue;
+ }
+ // once match, extract total number of shards and compare to file list
+ return files.size() == Integer.parseInt(matcher.group("numshards"));
+ }
+ return false;
+ }
+
+ private String computeHash(@Nonnull List<String> strs) {
+ if (strs.isEmpty()) {
+ return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
+ }
+
+ List<HashCode> hashCodes = new ArrayList<>();
+ for (String str : strs) {
+ hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
+ }
+ return Hashing.combineUnordered(hashCodes).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
new file mode 100644
index 0000000..ec9ed64
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.testing.SerializableMatcher;
+
+/**
+ * Bare-bones class for using sharded files.
+ *
+ * <p>For internal use only; used only in SDK tests. Must be {@link Serializable} so it can be
+ * shipped as a {@link SerializableMatcher}.
+ */
+public interface ShardedFile extends Serializable {
+
+ /**
+ * Reads the lines from all shards of this file using the provided {@link Sleeper} and {@link
+ * BackOff}.
+ */
+ List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
+ throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
new file mode 100644
index 0000000..4ee6750
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
+import com.google.api.client.util.BackOff;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.PipelineResult;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+/** Tests for {@link FileChecksumMatcher}. */
+@RunWith(JUnit4.class)
+public class FileChecksumMatcherTest {
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+ @Rule
+ public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
+
+ @Mock
+ private PipelineResult pResult = Mockito.mock(PipelineResult.class);
+
+ private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff();
+
+ @Test
+ public void testPreconditionChecksumIsNull() throws IOException {
+ String tmpPath = tmpFolder.newFile().getPath();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("Expected valid checksum, but received"));
+ new FileChecksumMatcher(null, tmpPath);
+ }
+
+ @Test
+ public void testPreconditionChecksumIsEmpty() throws IOException {
+ String tmpPath = tmpFolder.newFile().getPath();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("Expected valid checksum, but received"));
+ new FileChecksumMatcher("", tmpPath);
+ }
+
+ @Test
+ public void testPreconditionFilePathIsEmpty() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("Expected valid file path, but received"));
+ new FileChecksumMatcher("checksumString", "");
+ }
+
+ @Test
+ public void testPreconditionShardTemplateIsNull() throws IOException {
+ String tmpPath = tmpFolder.newFile().getPath();
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage(
+ containsString(
+ "Expected non-null shard pattern. "
+ + "Please call the other constructor to use default pattern:"));
+ new FileChecksumMatcher("checksumString", tmpPath, null);
+ }
+
+ @Test
+ public void testMatcherThatVerifiesSingleFile() throws IOException{
+ File tmpFile = tmpFolder.newFile("result-000-of-001");
+ Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
+ FileChecksumMatcher matcher =
+ new FileChecksumMatcher("a8772322f5d7b851777f820fc79d050f9d302915", tmpFile.getPath());
+
+ assertThat(pResult, matcher);
+ }
+
+ @Test
+ public void testMatcherThatVerifiesMultipleFiles() throws IOException {
+ File tmpFile1 = tmpFolder.newFile("result-000-of-002");
+ File tmpFile2 = tmpFolder.newFile("result-001-of-002");
+ File tmpFile3 = tmpFolder.newFile("tmp");
+ Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
+ Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
+ Files.write("tmp", tmpFile3, StandardCharsets.UTF_8);
+
+ FileChecksumMatcher matcher =
+ new FileChecksumMatcher(
+ "90552392c28396935fe4f123bd0b5c2d0f6260c8",
+ tmpFolder.getRoot().toPath().resolve("result-*").toString());
+
+ assertThat(pResult, matcher);
+ }
+
+ @Test
+ public void testMatcherThatVerifiesFileWithEmptyContent() throws IOException {
+ File emptyFile = tmpFolder.newFile("result-000-of-001");
+ Files.write("", emptyFile, StandardCharsets.UTF_8);
+ FileChecksumMatcher matcher =
+ new FileChecksumMatcher(
+ "da39a3ee5e6b4b0d3255bfef95601890afd80709",
+ tmpFolder.getRoot().toPath().resolve("*").toString());
+
+ assertThat(pResult, matcher);
+ }
+
+ @Test
+ public void testMatcherThatUsesCustomizedTemplate() throws Exception {
+ // Customized template: resultSSS-totalNNN
+ File tmpFile1 = tmpFolder.newFile("result0-total2");
+ File tmpFile2 = tmpFolder.newFile("result1-total2");
+ Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
+ Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
+
+ Pattern customizedTemplate =
+ Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)");
+ FileChecksumMatcher matcher = new FileChecksumMatcher(
+ "90552392c28396935fe4f123bd0b5c2d0f6260c8",
+ tmpFolder.getRoot().toPath().resolve("*").toString(),
+ customizedTemplate);
+
+ assertThat(pResult, matcher);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b130d7aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
new file mode 100644
index 0000000..43a9166
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import com.google.api.client.util.BackOff;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.LocalResources;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+/** Tests for {@link NumberedShardedFile}. */
+@RunWith(JUnit4.class)
+public class NumberedShardedFileTest {
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
+
+ @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class);
+
+ private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff();
+ private String filePattern;
+
+ @Before
+ public void setup() throws IOException {
+ filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve(
+ "*", StandardResolveOptions.RESOLVE_FILE).toString();
+ }
+
+ @Test
+ public void testPreconditionFilePathIsNull() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("Expected valid file path, but received"));
+ new NumberedShardedFile(null);
+ }
+
+ @Test
+ public void testPreconditionFilePathIsEmpty() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(containsString("Expected valid file path, but received"));
+ new NumberedShardedFile("");
+ }
+
+ @Test
+ public void testReadMultipleShards() throws Exception {
+ String
+ contents1 = "To be or not to be, ",
+ contents2 = "it is not a question.",
+ contents3 = "should not be included";
+
+ File tmpFile1 = tmpFolder.newFile("result-000-of-002");
+ File tmpFile2 = tmpFolder.newFile("result-001-of-002");
+ File tmpFile3 = tmpFolder.newFile("tmp");
+ Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
+ Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
+ Files.write(contents3, tmpFile3, StandardCharsets.UTF_8);
+
+ filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve(
+ "result-*", StandardResolveOptions.RESOLVE_FILE).toString();
+ NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
+
+ assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2));
+ }
+
+ @Test
+ public void testReadEmpty() throws Exception {
+ File emptyFile = tmpFolder.newFile("result-000-of-001");
+ Files.write("", emptyFile, StandardCharsets.UTF_8);
+ NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
+
+ assertThat(shardedFile.readFilesWithRetries(), empty());
+ }
+
+ @Test
+ public void testReadCustomTemplate() throws Exception {
+ String contents1 = "To be or not to be, ", contents2 = "it is not a question.";
+
+ // Customized template: resultSSS-totalNNN
+ File tmpFile1 = tmpFolder.newFile("result0-total2");
+ File tmpFile2 = tmpFolder.newFile("result1-total2");
+ Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
+ Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
+
+ Pattern customizedTemplate =
+ Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)");
+ NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern, customizedTemplate);
+
+ assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2));
+ }
+
+ @Test
+ public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception {
+ File tmpFile = tmpFolder.newFile();
+ Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
+
+ NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern,
+ Pattern.compile("incorrect-template"));
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(
+ containsString(
+ "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
+ shardedFile.readFilesWithRetries(fastClock, backOff);
+ }
+
+ @Test
+ public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
+ File tmpFile = tmpFolder.newFile();
+ Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
+ NumberedShardedFile shardedFile = spy(new NumberedShardedFile(filePattern));
+ doThrow(IOException.class)
+ .when(shardedFile)
+ .readLines(anyCollection());
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(
+ containsString(
+ "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
+ shardedFile.readFilesWithRetries(fastClock, backOff);
+ }
+
+ @Test
+ public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception {
+ NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(
+ containsString(
+ "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
+ shardedFile.readFilesWithRetries(fastClock, backOff);
+ }
+
+ @Test
+ public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception {
+ tmpFolder.newFile("result-000-of-001");
+ tmpFolder.newFile("tmp-result-000-of-001");
+
+ NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(
+ containsString(
+ "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES));
+ shardedFile.readFilesWithRetries(fastClock, backOff);
+ }
+}