You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/13 23:17:50 UTC
[1/2] incubator-beam git commit: Closes #155
Repository: incubator-beam
Updated Branches:
refs/heads/master 1915503b9 -> c4cbbb1d9
Closes #155
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c4cbbb1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c4cbbb1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c4cbbb1d
Branch: refs/heads/master
Commit: c4cbbb1d9b47b863539a9774e4fa8cc232dde3ad
Parents: 1915503 566d06b
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 13 14:17:00 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 13 14:17:00 2016 -0700
----------------------------------------------------------------------
.../cloud/dataflow/sdk/io/FileBasedSource.java | 102 +++++++++++--------
1 file changed, 57 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: FileBasedSource: throw IOException
instead of Exception where possible
Posted by dh...@apache.org.
FileBasedSource: throw IOException instead of Exception where possible
- Replace "throws Exception" with "throws IOException" from APIs
that already only throw IOException.
- Refactor one function to catch, handle, and rethrow
(Interrupted|Execution)Exception.
This makes it easier to write a new FileBasedSource, because the
FilebasedReader functions are only registered to throw IOException.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/566d06b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/566d06b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/566d06b4
Branch: refs/heads/master
Commit: 566d06b4d30d43468bcbee8d7a4876fbea70555b
Parents: 1915503
Author: Dan Halperin <dh...@google.com>
Authored: Fri Apr 8 16:09:21 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 13 14:17:00 2016 -0700
----------------------------------------------------------------------
.../cloud/dataflow/sdk/io/FileBasedSource.java | 102 +++++++++++--------
1 file changed, 57 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/566d06b4/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
index 9aa6036..3ad32b4 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
@@ -17,10 +17,12 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
@@ -42,6 +44,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
/**
@@ -135,20 +138,22 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
@Override
public final FileBasedSource<T> createSourceForSubrange(long start, long end) {
- Preconditions.checkArgument(mode != Mode.FILEPATTERN,
+ checkArgument(mode != Mode.FILEPATTERN,
"Cannot split a file pattern based source based on positions");
- Preconditions.checkArgument(start >= getStartOffset(), "Start offset value " + start
- + " of the subrange cannot be smaller than the start offset value " + getStartOffset()
- + " of the parent source");
- Preconditions.checkArgument(end <= getEndOffset(), "End offset value " + end
- + " of the subrange cannot be larger than the end offset value " + getEndOffset()
- + " of the parent source");
+ checkArgument(start >= getStartOffset(),
+ "Start offset value %s of the subrange cannot be smaller than the start offset value %s"
+ + " of the parent source",
+ start,
+ getStartOffset());
+ checkArgument(end <= getEndOffset(),
+ "End offset value %s of the subrange cannot be larger than the end offset value %s",
+ end,
+ getEndOffset());
FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec, start, end);
if (start > 0 || end != Long.MAX_VALUE) {
- Preconditions.checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
- "Source created for the range [" + start + "," + end + ")"
- + " must be a subrange source");
+ checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
+ "Source created for the range [%s,%s) must be a subrange source", start, end);
}
return source;
}
@@ -177,7 +182,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
PipelineOptions options);
@Override
- public final long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ public final long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
// This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
// we perform the size estimation of files and file patterns using the interface provided by
// IOChannelFactory.
@@ -190,12 +195,14 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
Collection<String> inputs = factory.match(fileOrPatternSpec);
if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
totalSize = getExactTotalSizeOfFiles(inputs, factory);
- LOG.debug("Size estimation of all files of pattern " + fileOrPatternSpec + " took "
- + (System.currentTimeMillis() - startTime) + " ms");
+ LOG.debug("Size estimation of all files of pattern {} took {} ms",
+ fileOrPatternSpec,
+ System.currentTimeMillis() - startTime);
} else {
totalSize = getEstimatedSizeOfFilesBySampling(inputs, factory);
- LOG.debug("Size estimation of pattern " + fileOrPatternSpec + " by sampling took "
- + (System.currentTimeMillis() - startTime) + " ms");
+ LOG.debug("Size estimation of pattern {} by sampling took {} ms",
+ fileOrPatternSpec,
+ System.currentTimeMillis() - startTime);
}
return totalSize;
} else {
@@ -210,12 +217,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
// TODO: replace this with bulk request API when it is available. Will require updates
// to IOChannelFactory interface.
private static long getExactTotalSizeOfFiles(
- Collection<String> files, IOChannelFactory ioChannelFactory) throws Exception {
+ Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
List<ListenableFuture<Long>> futures = new ArrayList<>();
ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
- long totalSize = 0;
try {
+ long totalSize = 0;
for (String file : files) {
futures.add(createFutureForSizeEstimation(file, ioChannelFactory, service));
}
@@ -225,7 +232,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
return totalSize;
- } finally {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } finally {
service.shutdown();
}
}
@@ -237,7 +249,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
return service.submit(
new Callable<Long>() {
@Override
- public Long call() throws Exception {
+ public Long call() throws IOException {
return ioChannelFactory.getSizeBytes(file);
}
});
@@ -248,7 +260,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
// estimate.
// TODO: Implement a more efficient sampling mechanism.
private static long getEstimatedSizeOfFilesBySampling(
- Collection<String> files, IOChannelFactory ioChannelFactory) throws Exception {
+ Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
int sampleSize = (int) (FRACTION_OF_FILES_TO_STAT * files.size());
sampleSize = Math.max(MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT, sampleSize);
@@ -295,11 +307,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
List<? extends FileBasedSource<T>> splitResults =
ImmutableList.copyOf(Iterables.concat(Futures.allAsList(futures).get()));
LOG.debug(
- "Splitting the source based on file pattern "
- + fileOrPatternSpec
- + " took "
- + (System.currentTimeMillis() - startTime)
- + " ms");
+ "Splitting the source based on file pattern {} took {} ms",
+ fileOrPatternSpec,
+ System.currentTimeMillis() - startTime);
return splitResults;
} finally {
service.shutdown();
@@ -313,8 +323,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
return splitResults;
} else {
- LOG.debug("The source for file " + fileOrPatternSpec
- + " is not split into sub-range based sources since the file is not seekable");
+ LOG.debug("The source for file {} is not split into sub-range based sources since "
+ + "the file is not seekable",
+ fileOrPatternSpec);
return ImmutableList.of(this);
}
}
@@ -348,14 +359,16 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
try {
endOffset = IOChannelUtils.getFactory(fileName).getSizeBytes(fileName);
} catch (IOException e) {
- LOG.warn("Failed to get size of " + fileName, e);
+ LOG.warn("Failed to get size of {}", fileName, e);
endOffset = Long.MAX_VALUE;
}
fileReaders.add(
createForSubrangeOfFile(fileName, 0, endOffset).createSingleFileReader(options));
}
- LOG.debug("Creating a reader for file pattern " + fileOrPatternSpec + " took "
- + (System.currentTimeMillis() - startTime) + " ms");
+ LOG.debug(
+ "Creating a reader for file pattern {} took {} ms",
+ fileOrPatternSpec,
+ System.currentTimeMillis() - startTime);
if (fileReaders.size() == 1) {
return fileReaders.get(0);
}
@@ -382,12 +395,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
super.validate();
switch (mode) {
case FILEPATTERN:
- Preconditions.checkArgument(getStartOffset() == 0,
+ checkArgument(getStartOffset() == 0,
"FileBasedSource is based on a file pattern or a full single file "
- + "but the starting offset proposed " + getStartOffset() + " is not zero");
- Preconditions.checkArgument(getEndOffset() == Long.MAX_VALUE,
+ + "but the starting offset proposed %s is not zero", getStartOffset());
+ checkArgument(getEndOffset() == Long.MAX_VALUE,
"FileBasedSource is based on a file pattern or a full single file "
- + "but the ending offset proposed " + getEndOffset() + " is not Long.MAX_VALUE");
+ + "but the ending offset proposed %s is not Long.MAX_VALUE", getEndOffset());
break;
case SINGLE_FILE_OR_SUBRANGE:
// Nothing more to validate.
@@ -398,10 +411,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
@Override
- public final long getMaxEndOffset(PipelineOptions options) throws Exception {
- if (mode == Mode.FILEPATTERN) {
- throw new IllegalArgumentException("Cannot determine the exact end offset of a file pattern");
- }
+ public final long getMaxEndOffset(PipelineOptions options) throws IOException {
+ checkArgument(
+ mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
if (getEndOffset() == Long.MAX_VALUE) {
IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
return factory.getSizeBytes(fileOrPatternSpec);
@@ -464,7 +476,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
*/
public FileBasedReader(FileBasedSource<T> source) {
super(source);
- Preconditions.checkArgument(source.getMode() != Mode.FILEPATTERN,
+ checkArgument(source.getMode() != Mode.FILEPATTERN,
"FileBasedReader does not support reading file patterns");
}
@@ -484,12 +496,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
seekChannel.position(source.getStartOffset());
} else {
// Channel is not seekable. Must not be a subrange.
- Preconditions.checkArgument(source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
+ checkArgument(source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
"Subrange-based sources must only be defined for file types that support seekable "
+ " read channels");
- Preconditions.checkArgument(source.getStartOffset() == 0, "Start offset "
- + source.getStartOffset()
- + " is not zero but channel for reading the file is not seekable.");
+ checkArgument(source.getStartOffset() == 0,
+ "Start offset %s is not zero but channel for reading the file is not seekable.",
+ source.getStartOffset());
}
startReading(channel);
@@ -570,7 +582,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
@Override
public boolean advance() throws IOException {
- Preconditions.checkState(currentReader != null, "Call start() before advance()");
+ checkState(currentReader != null, "Call start() before advance()");
if (currentReader.advance()) {
return true;
}