You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 23:44:16 UTC
[2/4] beam git commit: Introduces EmptyMatchTreatment parameter to
FileSystems.match()
Introduces EmptyMatchTreatment parameter to FileSystems.match()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/db9aede2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/db9aede2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/db9aede2
Branch: refs/heads/master
Commit: db9aede289f8546bb30113353f07aa75daa83eba
Parents: 5e43b23
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 14:43:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:23 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSource.java | 52 ++++++++++----------
.../org/apache/beam/sdk/io/FileSystems.java | 46 +++++++++++++++++
.../java/org/apache/beam/sdk/io/TextSource.java | 7 ++-
.../beam/sdk/io/fs/EmptyMatchTreatment.java | 46 +++++++++++++++++
.../org/apache/beam/sdk/io/fs/MatchResult.java | 5 +-
.../apache/beam/sdk/io/FileBasedSourceTest.java | 51 +++++++++++++++++++
6 files changed, 180 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index d4413c9..7f865de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -23,19 +23,17 @@ import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -68,6 +66,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
private final ValueProvider<String> fileOrPatternSpec;
+ private final EmptyMatchTreatment emptyMatchTreatment;
@Nullable private MatchResult.Metadata singleFileMetadata;
private final Mode mode;
@@ -80,15 +79,28 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
/**
- * Create a {@code FileBaseSource} based on a file or a file pattern specification.
+ * Create a {@code FileBaseSource} based on a file or a file pattern specification, with the given
+ * strategy for treating filepatterns that do not match any files.
*/
- protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+ protected FileBasedSource(
+ ValueProvider<String> fileOrPatternSpec,
+ EmptyMatchTreatment emptyMatchTreatment,
+ long minBundleSize) {
super(0, Long.MAX_VALUE, minBundleSize);
- mode = Mode.FILEPATTERN;
+ this.mode = Mode.FILEPATTERN;
+ this.emptyMatchTreatment = emptyMatchTreatment;
this.fileOrPatternSpec = fileOrPatternSpec;
}
/**
+ * Like {@link #FileBasedSource(ValueProvider, EmptyMatchTreatment, long)}, but uses the default
+ * value of {@link EmptyMatchTreatment#DISALLOW}.
+ */
+ protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+ this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize);
+ }
+
+ /**
* Create a {@code FileBasedSource} based on a single file. This constructor must be used when
* creating a new {@code FileBasedSource} for a subrange of a single file.
* Additionally, this constructor must be used to create new {@code FileBasedSource}s when
@@ -110,6 +122,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
mode = Mode.SINGLE_FILE_OR_SUBRANGE;
this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString());
+
+ // This field will be unused in this mode.
+ this.emptyMatchTreatment = null;
}
/**
@@ -204,14 +219,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
if (mode == Mode.FILEPATTERN) {
long totalSize = 0;
- List<MatchResult> inputs = FileSystems.match(Collections.singletonList(fileOrPattern));
- MatchResult result = Iterables.getOnlyElement(inputs);
- checkArgument(
- result.status() == Status.OK,
- "Error matching the pattern or glob %s: status %s",
- fileOrPattern,
- result.status());
- List<Metadata> allMatches = result.metadata();
+ List<Metadata> allMatches = FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
for (Metadata metadata : allMatches) {
totalSize += metadata.sizeBytes();
}
@@ -254,9 +262,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
- List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern);
- checkArgument(!expandedFiles.isEmpty(),
- "Unable to find any files matching %s", fileOrPattern);
+ List<Metadata> expandedFiles =
+ FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size());
for (Metadata metadata : expandedFiles) {
FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
@@ -327,7 +334,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
- List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern);
+ List<Metadata> fileMetadata =
+ FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
+ LOG.info("Matched {} files for pattern {}", fileMetadata.size(), fileOrPattern);
List<FileBasedReader<T>> fileReaders = new ArrayList<>();
for (Metadata metadata : fileMetadata) {
long endOffset = metadata.sizeBytes();
@@ -389,13 +398,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
return metadata.sizeBytes();
}
- private static List<Metadata> expandFilePattern(String fileOrPatternSpec) throws IOException {
- MatchResult matches =
- Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(fileOrPatternSpec)));
- LOG.info("Matched {} files for pattern {}", matches.metadata().size(), fileOrPatternSpec);
- return ImmutableList.copyOf(matches.metadata());
- }
-
/**
* A {@link Source.Reader reader} that implements code common to readers of
* {@code FileBasedSource}s.
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index bd4668f..96394b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
@@ -72,6 +73,8 @@ public class FileSystems {
public static final String DEFAULT_SCHEME = "file";
private static final Pattern FILE_SCHEME_PATTERN =
Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):.*");
+ private static final Pattern GLOB_PATTERN =
+ Pattern.compile("[*?{}]");
private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
new AtomicReference<Map<String, FileSystem>>(
@@ -79,6 +82,11 @@ public class FileSystems {
/********************************** METHODS FOR CLIENT **********************************/
+ /** Checks whether the given spec contains a glob wildcard character. */
+ public static boolean hasGlobWildcard(String spec) {
+ return GLOB_PATTERN.matcher(spec).find();
+ }
+
/**
* This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}.
* Callers should use {@link #match} to resolve users specs ambiguities before
@@ -102,6 +110,9 @@ public class FileSystems {
* <p>In case the spec schemes don't match any known {@link FileSystem} implementations,
* FileSystems will attempt to use {@link LocalFileSystem} to resolve a path.
*
+ * <p>Specs that do not match any resources are treated according to
+ * {@link EmptyMatchTreatment#DISALLOW}.
+ *
* @return {@code List<MatchResult>} in the same order of the input specs.
*
* @throws IllegalArgumentException if specs are invalid -- empty or have different schemes.
@@ -114,6 +125,17 @@ public class FileSystems {
return getFileSystemInternal(getOnlyScheme(specs)).match(specs);
}
+ /** Like {@link #match(List)}, but with a configurable {@link EmptyMatchTreatment}. */
+ public static List<MatchResult> match(List<String> specs, EmptyMatchTreatment emptyMatchTreatment)
+ throws IOException {
+ List<MatchResult> matches = getFileSystemInternal(getOnlyScheme(specs)).match(specs);
+ List<MatchResult> res = Lists.newArrayListWithExpectedSize(matches.size());
+ for (int i = 0; i < matches.size(); i++) {
+ res.add(maybeAdjustEmptyMatchResult(specs.get(i), matches.get(i), emptyMatchTreatment));
+ }
+ return res;
+ }
+
/**
* Like {@link #match(List)}, but for a single resource specification.
@@ -130,6 +152,30 @@ public class FileSystems {
matches);
return matches.get(0);
}
+
+ /** Like {@link #match(String)}, but with a configurable {@link EmptyMatchTreatment}. */
+ public static MatchResult match(String spec, EmptyMatchTreatment emptyMatchTreatment)
+ throws IOException {
+ MatchResult res = match(spec);
+ return maybeAdjustEmptyMatchResult(spec, res, emptyMatchTreatment);
+ }
+
+ private static MatchResult maybeAdjustEmptyMatchResult(
+ String spec, MatchResult res, EmptyMatchTreatment emptyMatchTreatment)
+ throws IOException {
+ if (res.status() != Status.NOT_FOUND) {
+ return res;
+ }
+ boolean notFoundAllowed =
+ emptyMatchTreatment == EmptyMatchTreatment.ALLOW
+ || (FileSystems.hasGlobWildcard(spec)
+ && emptyMatchTreatment == EmptyMatchTreatment.ALLOW_IF_WILDCARD);
+ if (notFoundAllowed) {
+ return MatchResult.create(Status.OK, Collections.<Metadata>emptyList());
+ }
+ return res;
+ }
+
/**
* Returns the {@link Metadata} for a single file resource. Expects a resource specification
* {@code spec} that matches a single result.
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 86c73d5..29188dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -28,6 +28,7 @@ import java.nio.channels.SeekableByteChannel;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -48,7 +49,11 @@ import org.apache.beam.sdk.options.ValueProvider;
@VisibleForTesting
class TextSource extends FileBasedSource<String> {
TextSource(ValueProvider<String> fileSpec) {
- super(fileSpec, 1L);
+ this(fileSpec, EmptyMatchTreatment.DISALLOW);
+ }
+
+ TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment) {
+ super(fileSpec, emptyMatchTreatment, 1L);
}
private TextSource(MatchResult.Metadata metadata, long start, long end) {
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
new file mode 100644
index 0000000..8e12993
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+
+/**
+ * Options for allowing or disallowing filepatterns that match no resources in {@link
+ * org.apache.beam.sdk.io.FileSystems#match}.
+ */
+public enum EmptyMatchTreatment {
+ /**
+ * Filepatterns matching no resources are allowed. For such a filepattern, {@link
+ * MatchResult#status} will be {@link Status#OK} and {@link MatchResult#metadata} will return an
+ * empty list.
+ */
+ ALLOW,
+
+ /**
+ * Filepatterns matching no resources are disallowed. For such a filepattern, {@link
+ * MatchResult#status} will be {@link Status#NOT_FOUND} and {@link MatchResult#metadata} will
+ * throw a {@link java.io.FileNotFoundException}.
+ */
+ DISALLOW,
+
+ /**
+ * Filepatterns matching no resources are allowed if the filepattern contains a glob wildcard
+ * character, and disallowed otherwise (i.e. if the filepattern specifies a single file).
+ */
+ ALLOW_IF_WILDCARD
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
index 642c049..aa80b96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
@@ -21,6 +21,7 @@ import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import org.apache.beam.sdk.io.FileSystems;
/**
* The result of {@link org.apache.beam.sdk.io.FileSystem#match}.
@@ -78,7 +79,9 @@ public abstract class MatchResult {
public abstract Status status();
/**
- * {@link Metadata} of matched files.
+ * {@link Metadata} of matched files. Note that if {@link #status()} is {@link Status#NOT_FOUND},
+ * this may either throw a {@link java.io.FileNotFoundException} or return an empty list,
+ * depending on the {@link EmptyMatchTreatment} used in the {@link FileSystems#match} call.
*/
public abstract List<Metadata> metadata() throws IOException;
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 1bdb915..ea9e06b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
import org.apache.beam.sdk.io.Source.Reader;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -94,6 +95,15 @@ public class FileBasedSourceTest {
}
public TestFileBasedSource(
+ String fileOrPattern,
+ EmptyMatchTreatment emptyMatchTreatment,
+ long minBundleSize,
+ String splitHeader) {
+ super(StaticValueProvider.of(fileOrPattern), emptyMatchTreatment, minBundleSize);
+ this.splitHeader = splitHeader;
+ }
+
+ public TestFileBasedSource(
Metadata fileOrPattern,
long minBundleSize,
long startOffset,
@@ -371,6 +381,47 @@ public class FileBasedSourceTest {
}
@Test
+ public void testEmptyFilepatternTreatmentDefaultDisallow() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ TestFileBasedSource source =
+ new TestFileBasedSource(new File(tempFolder.getRoot(), "doesNotExist").getPath(), 64, null);
+ thrown.expect(FileNotFoundException.class);
+ readFromSource(source, options);
+ }
+
+ @Test
+ public void testEmptyFilepatternTreatmentAllow() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ TestFileBasedSource source =
+ new TestFileBasedSource(
+ new File(tempFolder.getRoot(), "doesNotExist").getPath(),
+ EmptyMatchTreatment.ALLOW,
+ 64,
+ null);
+ TestFileBasedSource sourceWithWildcard =
+ new TestFileBasedSource(
+ new File(tempFolder.getRoot(), "doesNotExist*").getPath(),
+ EmptyMatchTreatment.ALLOW_IF_WILDCARD,
+ 64,
+ null);
+ assertEquals(0, readFromSource(source, options).size());
+ assertEquals(0, readFromSource(sourceWithWildcard, options).size());
+ }
+
+ @Test
+ public void testEmptyFilepatternTreatmentAllowIfWildcard() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ TestFileBasedSource source =
+ new TestFileBasedSource(
+ new File(tempFolder.getRoot(), "doesNotExist").getPath(),
+ EmptyMatchTreatment.ALLOW_IF_WILDCARD,
+ 64,
+ null);
+ thrown.expect(FileNotFoundException.class);
+ readFromSource(source, options);
+ }
+
+ @Test
public void testCloseUnstartedFilePatternReader() throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
List<String> data1 = createStringDataset(3, 50);