You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 23:44:16 UTC

[2/4] beam git commit: Introduces EmptyMatchTreatment parameter to FileSystems.match()

Introduces EmptyMatchTreatment parameter to FileSystems.match()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/db9aede2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/db9aede2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/db9aede2

Branch: refs/heads/master
Commit: db9aede289f8546bb30113353f07aa75daa83eba
Parents: 5e43b23
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 14:43:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:23 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSource.java | 52 ++++++++++----------
 .../org/apache/beam/sdk/io/FileSystems.java     | 46 +++++++++++++++++
 .../java/org/apache/beam/sdk/io/TextSource.java |  7 ++-
 .../beam/sdk/io/fs/EmptyMatchTreatment.java     | 46 +++++++++++++++++
 .../org/apache/beam/sdk/io/fs/MatchResult.java  |  5 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java | 51 +++++++++++++++++++
 6 files changed, 180 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index d4413c9..7f865de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -23,19 +23,17 @@ import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verify;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -68,6 +66,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
 
   private final ValueProvider<String> fileOrPatternSpec;
+  private final EmptyMatchTreatment emptyMatchTreatment;
   @Nullable private MatchResult.Metadata singleFileMetadata;
   private final Mode mode;
 
@@ -80,15 +79,28 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   }
 
   /**
-   * Create a {@code FileBaseSource} based on a file or a file pattern specification.
+   * Create a {@code FileBaseSource} based on a file or a file pattern specification, with the given
+   * strategy for treating filepatterns that do not match any files.
    */
-  protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+  protected FileBasedSource(
+      ValueProvider<String> fileOrPatternSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      long minBundleSize) {
     super(0, Long.MAX_VALUE, minBundleSize);
-    mode = Mode.FILEPATTERN;
+    this.mode = Mode.FILEPATTERN;
+    this.emptyMatchTreatment = emptyMatchTreatment;
     this.fileOrPatternSpec = fileOrPatternSpec;
   }
 
   /**
+   * Like {@link #FileBasedSource(ValueProvider, EmptyMatchTreatment, long)}, but uses the default
+   * value of {@link EmptyMatchTreatment#DISALLOW}.
+   */
+  protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+    this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize);
+  }
+
+  /**
    * Create a {@code FileBasedSource} based on a single file. This constructor must be used when
    * creating a new {@code FileBasedSource} for a subrange of a single file.
    * Additionally, this constructor must be used to create new {@code FileBasedSource}s when
@@ -110,6 +122,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     mode = Mode.SINGLE_FILE_OR_SUBRANGE;
     this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
     this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString());
+
+    // This field will be unused in this mode.
+    this.emptyMatchTreatment = null;
   }
 
   /**
@@ -204,14 +219,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
     if (mode == Mode.FILEPATTERN) {
       long totalSize = 0;
-      List<MatchResult> inputs = FileSystems.match(Collections.singletonList(fileOrPattern));
-      MatchResult result = Iterables.getOnlyElement(inputs);
-      checkArgument(
-          result.status() == Status.OK,
-          "Error matching the pattern or glob %s: status %s",
-          fileOrPattern,
-          result.status());
-      List<Metadata> allMatches = result.metadata();
+      List<Metadata> allMatches = FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
       for (Metadata metadata : allMatches) {
         totalSize += metadata.sizeBytes();
       }
@@ -254,9 +262,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern);
-      checkArgument(!expandedFiles.isEmpty(),
-          "Unable to find any files matching %s", fileOrPattern);
+      List<Metadata> expandedFiles =
+          FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
       List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size());
       for (Metadata metadata : expandedFiles) {
         FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
@@ -327,7 +334,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern);
+      List<Metadata> fileMetadata =
+          FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
+      LOG.info("Matched {} files for pattern {}", fileMetadata.size(), fileOrPattern);
       List<FileBasedReader<T>> fileReaders = new ArrayList<>();
       for (Metadata metadata : fileMetadata) {
         long endOffset = metadata.sizeBytes();
@@ -389,13 +398,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     return metadata.sizeBytes();
   }
 
-  private static List<Metadata> expandFilePattern(String fileOrPatternSpec) throws IOException {
-    MatchResult matches =
-        Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(fileOrPatternSpec)));
-    LOG.info("Matched {} files for pattern {}", matches.metadata().size(), fileOrPatternSpec);
-    return ImmutableList.copyOf(matches.metadata());
-  }
-
   /**
    * A {@link Source.Reader reader} that implements code common to readers of
    * {@code FileBasedSource}s.

http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index bd4668f..96394b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.MatchResult.Status;
@@ -72,6 +73,8 @@ public class FileSystems {
   public static final String DEFAULT_SCHEME = "file";
   private static final Pattern FILE_SCHEME_PATTERN =
       Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):.*");
+  private static final Pattern GLOB_PATTERN =
+      Pattern.compile("[*?{}]");
 
   private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
       new AtomicReference<Map<String, FileSystem>>(
@@ -79,6 +82,11 @@ public class FileSystems {
 
   /********************************** METHODS FOR CLIENT **********************************/
 
+  /** Checks whether the given spec contains a glob wildcard character. */
+  public static boolean hasGlobWildcard(String spec) {
+    return GLOB_PATTERN.matcher(spec).find();
+  }
+
   /**
    * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}.
    * Callers should use {@link #match} to resolve users specs ambiguities before
@@ -102,6 +110,9 @@ public class FileSystems {
    * <p>In case the spec schemes don't match any known {@link FileSystem} implementations,
    * FileSystems will attempt to use {@link LocalFileSystem} to resolve a path.
    *
+   * <p>Specs that do not match any resources are treated according to
+   * {@link EmptyMatchTreatment#DISALLOW}.
+   *
    * @return {@code List<MatchResult>} in the same order of the input specs.
    *
    * @throws IllegalArgumentException if specs are invalid -- empty or have different schemes.
@@ -114,6 +125,17 @@ public class FileSystems {
     return getFileSystemInternal(getOnlyScheme(specs)).match(specs);
   }
 
+  /** Like {@link #match(List)}, but with a configurable {@link EmptyMatchTreatment}. */
+  public static List<MatchResult> match(List<String> specs, EmptyMatchTreatment emptyMatchTreatment)
+      throws IOException {
+    List<MatchResult> matches = getFileSystemInternal(getOnlyScheme(specs)).match(specs);
+    List<MatchResult> res = Lists.newArrayListWithExpectedSize(matches.size());
+    for (int i = 0; i < matches.size(); i++) {
+      res.add(maybeAdjustEmptyMatchResult(specs.get(i), matches.get(i), emptyMatchTreatment));
+    }
+    return res;
+  }
+
 
   /**
    * Like {@link #match(List)}, but for a single resource specification.
@@ -130,6 +152,30 @@ public class FileSystems {
         matches);
     return matches.get(0);
   }
+
+  /** Like {@link #match(String)}, but with a configurable {@link EmptyMatchTreatment}. */
+  public static MatchResult match(String spec, EmptyMatchTreatment emptyMatchTreatment)
+      throws IOException {
+    MatchResult res = match(spec);
+    return maybeAdjustEmptyMatchResult(spec, res, emptyMatchTreatment);
+  }
+
+  private static MatchResult maybeAdjustEmptyMatchResult(
+      String spec, MatchResult res, EmptyMatchTreatment emptyMatchTreatment)
+      throws IOException {
+    if (res.status() != Status.NOT_FOUND) {
+      return res;
+    }
+    boolean notFoundAllowed =
+        emptyMatchTreatment == EmptyMatchTreatment.ALLOW
+            || (FileSystems.hasGlobWildcard(spec)
+                && emptyMatchTreatment == EmptyMatchTreatment.ALLOW_IF_WILDCARD);
+    if (notFoundAllowed) {
+      return MatchResult.create(Status.OK, Collections.<Metadata>emptyList());
+    }
+    return res;
+  }
+
   /**
    * Returns the {@link Metadata} for a single file resource. Expects a resource specification
    * {@code spec} that matches a single result.

http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 86c73d5..29188dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -28,6 +28,7 @@ import java.nio.channels.SeekableByteChannel;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -48,7 +49,11 @@ import org.apache.beam.sdk.options.ValueProvider;
 @VisibleForTesting
 class TextSource extends FileBasedSource<String> {
   TextSource(ValueProvider<String> fileSpec) {
-    super(fileSpec, 1L);
+    this(fileSpec, EmptyMatchTreatment.DISALLOW);
+  }
+
+  TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment) {
+    super(fileSpec, emptyMatchTreatment, 1L);
   }
 
   private TextSource(MatchResult.Metadata metadata, long start, long end) {

http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
new file mode 100644
index 0000000..8e12993
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+
+/**
+ * Options for allowing or disallowing filepatterns that match no resources in {@link
+ * org.apache.beam.sdk.io.FileSystems#match}.
+ */
+public enum EmptyMatchTreatment {
+  /**
+   * Filepatterns matching no resources are allowed. For such a filepattern, {@link
+   * MatchResult#status} will be {@link Status#OK} and {@link MatchResult#metadata} will return an
+   * empty list.
+   */
+  ALLOW,
+
+  /**
+   * Filepatterns matching no resources are disallowed. For such a filepattern, {@link
+   * MatchResult#status} will be {@link Status#NOT_FOUND} and {@link MatchResult#metadata} will
+   * throw a {@link java.io.FileNotFoundException}.
+   */
+  DISALLOW,
+
+  /**
+   * Filepatterns matching no resources are allowed if the filepattern contains a glob wildcard
+   * character, and disallowed otherwise (i.e. if the filepattern specifies a single file).
+   */
+  ALLOW_IF_WILDCARD
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
index 642c049..aa80b96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
@@ -21,6 +21,7 @@ import com.google.auto.value.AutoValue;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.io.FileSystems;
 
 /**
  * The result of {@link org.apache.beam.sdk.io.FileSystem#match}.
@@ -78,7 +79,9 @@ public abstract class MatchResult {
   public abstract Status status();
 
   /**
-   * {@link Metadata} of matched files.
+   * {@link Metadata} of matched files. Note that if {@link #status()} is {@link Status#NOT_FOUND},
+   * this may either throw a {@link java.io.FileNotFoundException} or return an empty list,
+   * depending on the {@link EmptyMatchTreatment} used in the {@link FileSystems#match} call.
    */
   public abstract List<Metadata> metadata() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 1bdb915..ea9e06b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
 import org.apache.beam.sdk.io.Source.Reader;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -94,6 +95,15 @@ public class FileBasedSourceTest {
     }
 
     public TestFileBasedSource(
+        String fileOrPattern,
+        EmptyMatchTreatment emptyMatchTreatment,
+        long minBundleSize,
+        String splitHeader) {
+      super(StaticValueProvider.of(fileOrPattern), emptyMatchTreatment, minBundleSize);
+      this.splitHeader = splitHeader;
+    }
+
+    public TestFileBasedSource(
         Metadata fileOrPattern,
         long minBundleSize,
         long startOffset,
@@ -371,6 +381,47 @@ public class FileBasedSourceTest {
   }
 
   @Test
+  public void testEmptyFilepatternTreatmentDefaultDisallow() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    TestFileBasedSource source =
+        new TestFileBasedSource(new File(tempFolder.getRoot(), "doesNotExist").getPath(), 64, null);
+    thrown.expect(FileNotFoundException.class);
+    readFromSource(source, options);
+  }
+
+  @Test
+  public void testEmptyFilepatternTreatmentAllow() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    TestFileBasedSource source =
+        new TestFileBasedSource(
+            new File(tempFolder.getRoot(), "doesNotExist").getPath(),
+            EmptyMatchTreatment.ALLOW,
+            64,
+            null);
+    TestFileBasedSource sourceWithWildcard =
+        new TestFileBasedSource(
+            new File(tempFolder.getRoot(), "doesNotExist*").getPath(),
+            EmptyMatchTreatment.ALLOW_IF_WILDCARD,
+            64,
+            null);
+    assertEquals(0, readFromSource(source, options).size());
+    assertEquals(0, readFromSource(sourceWithWildcard, options).size());
+  }
+
+  @Test
+  public void testEmptyFilepatternTreatmentAllowIfWildcard() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    TestFileBasedSource source =
+        new TestFileBasedSource(
+            new File(tempFolder.getRoot(), "doesNotExist").getPath(),
+            EmptyMatchTreatment.ALLOW_IF_WILDCARD,
+            64,
+            null);
+    thrown.expect(FileNotFoundException.class);
+    readFromSource(source, options);
+  }
+
+  @Test
   public void testCloseUnstartedFilePatternReader() throws IOException {
     PipelineOptions options = PipelineOptionsFactory.create();
     List<String> data1 = createStringDataset(3, 50);