You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/08 18:48:46 UTC

[1/3] beam git commit: [BEAM-2212] FileBasedSource: refactor to remove uses of fileOrPatternSpec.get()

Repository: beam
Updated Branches:
  refs/heads/master 2b0e699b8 -> e1791c3f8


[BEAM-2212] FileBasedSource: refactor to remove uses of fileOrPatternSpec.get()

Makes it less likely to have errors from printing ValueProviders instead of runtime values


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9423babd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9423babd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9423babd

Branch: refs/heads/master
Commit: 9423babd8f827e843723c218441e9a91aaa7b361
Parents: 5bac40e
Author: Dan Halperin <dh...@google.com>
Authored: Mon May 8 09:59:16 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 8 11:48:39 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSource.java | 39 ++++++++++++--------
 1 file changed, 24 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9423babd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 4e07342..d4413c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -196,19 +196,20 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
     // we perform the size estimation of files and file patterns using the interface provided by
     // FileSystem.
+    checkState(
+        fileOrPatternSpec.isAccessible(),
+        "Cannot estimate size of a FileBasedSource with inaccessible file pattern: {}.",
+        fileOrPatternSpec);
+    String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
-      checkState(fileOrPatternSpec.isAccessible(),
-                 "Size estimation should be done at execution time.");
-      String pattern = fileOrPatternSpec.get();
       long totalSize = 0;
-      List<MatchResult> inputs =
-          FileSystems.match(Collections.singletonList(pattern));
+      List<MatchResult> inputs = FileSystems.match(Collections.singletonList(fileOrPattern));
       MatchResult result = Iterables.getOnlyElement(inputs);
       checkArgument(
           result.status() == Status.OK,
           "Error matching the pattern or glob %s: status %s",
-          pattern,
+          fileOrPattern,
           result.status());
       List<Metadata> allMatches = result.metadata();
       for (Metadata metadata : allMatches) {
@@ -216,7 +217,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       }
       LOG.info(
           "Filepattern {} matched {} files with total size {}",
-          fileOrPatternSpec.get(),
+          fileOrPattern,
           allMatches.size(),
           totalSize);
       return totalSize;
@@ -245,14 +246,17 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     // split a FileBasedSource based on a file pattern to FileBasedSources based on full single
     // files. For files that can be efficiently seeked, we further split FileBasedSources based on
     // those files to FileBasedSources based on sub ranges of single files.
+    checkState(
+        fileOrPatternSpec.isAccessible(),
+        "Cannot split a FileBasedSource without access to the file or pattern specification: {}.",
+        fileOrPatternSpec);
+    String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      checkState(fileOrPatternSpec.isAccessible(),
-                 "Bundle splitting should only happen at execution time.");
-      List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+      List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern);
       checkArgument(!expandedFiles.isEmpty(),
-          "Unable to find any files matching %s", fileOrPatternSpec.get());
+          "Unable to find any files matching %s", fileOrPattern);
       List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size());
       for (Metadata metadata : expandedFiles) {
         FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
@@ -268,7 +272,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       LOG.info(
           "Splitting filepattern {} into bundles of size {} took {} ms "
               + "and produced {} files and {} bundles",
-          fileOrPatternSpec.get(),
+          fileOrPattern,
           desiredBundleSizeBytes,
           System.currentTimeMillis() - startTime,
           expandedFiles.size(),
@@ -283,7 +287,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       } else {
         LOG.debug("The source for file {} is not split into sub-range based sources since "
             + "the file is not seekable",
-            fileOrPatternSpec);
+            fileOrPattern);
         return ImmutableList.of(this);
       }
     }
@@ -315,10 +319,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   public final BoundedReader<T> createReader(PipelineOptions options) throws IOException {
     // Validate the current source prior to creating a reader for it.
     this.validate();
+    checkState(
+        fileOrPatternSpec.isAccessible(),
+        "Cannot create a file reader without access to the file or pattern specification: {}.",
+        fileOrPatternSpec);
+    String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+      List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern);
       List<FileBasedReader<T>> fileReaders = new ArrayList<>();
       for (Metadata metadata : fileMetadata) {
         long endOffset = metadata.sizeBytes();
@@ -327,7 +336,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       }
       LOG.debug(
           "Creating a reader for file pattern {} took {} ms",
-          fileOrPatternSpec.get(),
+          fileOrPattern,
           System.currentTimeMillis() - startTime);
       if (fileReaders.size() == 1) {
         return fileReaders.get(0);


[3/3] beam git commit: This closes #2956

Posted by dh...@apache.org.
This closes #2956


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1791c3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1791c3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1791c3f

Branch: refs/heads/master
Commit: e1791c3f83a007013625b43f9976e7ffa64e8a49
Parents: 2b0e699 9423bab
Author: Dan Halperin <dh...@google.com>
Authored: Mon May 8 11:48:42 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 8 11:48:42 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSource.java | 39 ++++++++++++--------
 1 file changed, 24 insertions(+), 15 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: [BEAM-2212] FileBasedSource: improve message when logging.

Posted by dh...@apache.org.
[BEAM-2212] FileBasedSource: improve message when logging.

ValueProvider should not be printed, rather the string instead.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bac40ef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bac40ef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bac40ef

Branch: refs/heads/master
Commit: 5bac40efcc7bcd9d53183579ec6089c23aeda782
Parents: 2b0e699
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 5 19:06:03 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 8 11:48:39 2017 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/io/FileBasedSource.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5bac40ef/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 2beb5e0..4e07342 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -268,7 +268,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       LOG.info(
           "Splitting filepattern {} into bundles of size {} took {} ms "
               + "and produced {} files and {} bundles",
-          fileOrPatternSpec,
+          fileOrPatternSpec.get(),
           desiredBundleSizeBytes,
           System.currentTimeMillis() - startTime,
           expandedFiles.size(),
@@ -327,7 +327,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       }
       LOG.debug(
           "Creating a reader for file pattern {} took {} ms",
-          fileOrPatternSpec,
+          fileOrPatternSpec.get(),
           System.currentTimeMillis() - startTime);
       if (fileReaders.size() == 1) {
         return fileReaders.get(0);