You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/08 18:48:46 UTC
[1/3] beam git commit: [BEAM-2212] FileBasedSource: refactor to
remove uses of fileOrPatternSpec.get()
Repository: beam
Updated Branches:
refs/heads/master 2b0e699b8 -> e1791c3f8
[BEAM-2212] FileBasedSource: refactor to remove uses of fileOrPatternSpec.get()
Makes it less likely to have errors from printing ValueProviders instead of runtime values
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9423babd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9423babd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9423babd
Branch: refs/heads/master
Commit: 9423babd8f827e843723c218441e9a91aaa7b361
Parents: 5bac40e
Author: Dan Halperin <dh...@google.com>
Authored: Mon May 8 09:59:16 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 8 11:48:39 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSource.java | 39 ++++++++++++--------
1 file changed, 24 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9423babd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 4e07342..d4413c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -196,19 +196,20 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
// This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
// we perform the size estimation of files and file patterns using the interface provided by
// FileSystem.
+ checkState(
+ fileOrPatternSpec.isAccessible(),
+ "Cannot estimate size of a FileBasedSource with inaccessible file pattern: {}.",
+ fileOrPatternSpec);
+ String fileOrPattern = fileOrPatternSpec.get();
if (mode == Mode.FILEPATTERN) {
- checkState(fileOrPatternSpec.isAccessible(),
- "Size estimation should be done at execution time.");
- String pattern = fileOrPatternSpec.get();
long totalSize = 0;
- List<MatchResult> inputs =
- FileSystems.match(Collections.singletonList(pattern));
+ List<MatchResult> inputs = FileSystems.match(Collections.singletonList(fileOrPattern));
MatchResult result = Iterables.getOnlyElement(inputs);
checkArgument(
result.status() == Status.OK,
"Error matching the pattern or glob %s: status %s",
- pattern,
+ fileOrPattern,
result.status());
List<Metadata> allMatches = result.metadata();
for (Metadata metadata : allMatches) {
@@ -216,7 +217,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
LOG.info(
"Filepattern {} matched {} files with total size {}",
- fileOrPatternSpec.get(),
+ fileOrPattern,
allMatches.size(),
totalSize);
return totalSize;
@@ -245,14 +246,17 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
// split a FileBasedSource based on a file pattern to FileBasedSources based on full single
// files. For files that can be efficiently seeked, we further split FileBasedSources based on
// those files to FileBasedSources based on sub ranges of single files.
+ checkState(
+ fileOrPatternSpec.isAccessible(),
+ "Cannot split a FileBasedSource without access to the file or pattern specification: {}.",
+ fileOrPatternSpec);
+ String fileOrPattern = fileOrPatternSpec.get();
if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
- checkState(fileOrPatternSpec.isAccessible(),
- "Bundle splitting should only happen at execution time.");
- List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+ List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern);
checkArgument(!expandedFiles.isEmpty(),
- "Unable to find any files matching %s", fileOrPatternSpec.get());
+ "Unable to find any files matching %s", fileOrPattern);
List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size());
for (Metadata metadata : expandedFiles) {
FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
@@ -268,7 +272,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
LOG.info(
"Splitting filepattern {} into bundles of size {} took {} ms "
+ "and produced {} files and {} bundles",
- fileOrPatternSpec.get(),
+ fileOrPattern,
desiredBundleSizeBytes,
System.currentTimeMillis() - startTime,
expandedFiles.size(),
@@ -283,7 +287,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
} else {
LOG.debug("The source for file {} is not split into sub-range based sources since "
+ "the file is not seekable",
- fileOrPatternSpec);
+ fileOrPattern);
return ImmutableList.of(this);
}
}
@@ -315,10 +319,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
public final BoundedReader<T> createReader(PipelineOptions options) throws IOException {
// Validate the current source prior to creating a reader for it.
this.validate();
+ checkState(
+ fileOrPatternSpec.isAccessible(),
+ "Cannot create a file reader without access to the file or pattern specification: {}.",
+ fileOrPatternSpec);
+ String fileOrPattern = fileOrPatternSpec.get();
if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
- List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+ List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern);
List<FileBasedReader<T>> fileReaders = new ArrayList<>();
for (Metadata metadata : fileMetadata) {
long endOffset = metadata.sizeBytes();
@@ -327,7 +336,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
LOG.debug(
"Creating a reader for file pattern {} took {} ms",
- fileOrPatternSpec.get(),
+ fileOrPattern,
System.currentTimeMillis() - startTime);
if (fileReaders.size() == 1) {
return fileReaders.get(0);
[3/3] beam git commit: This closes #2956
Posted by dh...@apache.org.
This closes #2956
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1791c3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1791c3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1791c3f
Branch: refs/heads/master
Commit: e1791c3f83a007013625b43f9976e7ffa64e8a49
Parents: 2b0e699 9423bab
Author: Dan Halperin <dh...@google.com>
Authored: Mon May 8 11:48:42 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 8 11:48:42 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSource.java | 39 ++++++++++++--------
1 file changed, 24 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: [BEAM-2212] FileBasedSource: improve message
when logging.
Posted by dh...@apache.org.
[BEAM-2212] FileBasedSource: improve message when logging.
ValueProvider should not be printed, rather the string instead.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bac40ef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bac40ef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bac40ef
Branch: refs/heads/master
Commit: 5bac40efcc7bcd9d53183579ec6089c23aeda782
Parents: 2b0e699
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 5 19:06:03 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 8 11:48:39 2017 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/io/FileBasedSource.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5bac40ef/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 2beb5e0..4e07342 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -268,7 +268,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
LOG.info(
"Splitting filepattern {} into bundles of size {} took {} ms "
+ "and produced {} files and {} bundles",
- fileOrPatternSpec,
+ fileOrPatternSpec.get(),
desiredBundleSizeBytes,
System.currentTimeMillis() - startTime,
expandedFiles.size(),
@@ -327,7 +327,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
LOG.debug(
"Creating a reader for file pattern {} took {} ms",
- fileOrPatternSpec,
+ fileOrPatternSpec.get(),
System.currentTimeMillis() - startTime);
if (fileReaders.size() == 1) {
return fileReaders.get(0);