You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/13 23:17:50 UTC

[1/2] incubator-beam git commit: Closes #155

Repository: incubator-beam
Updated Branches:
  refs/heads/master 1915503b9 -> c4cbbb1d9


Closes #155


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c4cbbb1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c4cbbb1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c4cbbb1d

Branch: refs/heads/master
Commit: c4cbbb1d9b47b863539a9774e4fa8cc232dde3ad
Parents: 1915503 566d06b
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 13 14:17:00 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 13 14:17:00 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/io/FileBasedSource.java  | 102 +++++++++++--------
 1 file changed, 57 insertions(+), 45 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: FileBasedSource: throw IOException instead of Exception where possible

Posted by dh...@apache.org.
FileBasedSource: throw IOException instead of Exception where possible

- Replace "throws Exception" with "throws IOException" from APIs
  that already only throw IOException.
- Refactor one function to catch, handle, and rethrow
  (Interrupted|Execution)Exception.

This makes it easier to write a new FileBasedSource, because the
FilebasedReader functions are only registered to throw IOException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/566d06b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/566d06b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/566d06b4

Branch: refs/heads/master
Commit: 566d06b4d30d43468bcbee8d7a4876fbea70555b
Parents: 1915503
Author: Dan Halperin <dh...@google.com>
Authored: Fri Apr 8 16:09:21 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 13 14:17:00 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/io/FileBasedSource.java  | 102 +++++++++++--------
 1 file changed, 57 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/566d06b4/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
index 9aa6036..3ad32b4 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
@@ -17,10 +17,12 @@
  */
 package com.google.cloud.dataflow.sdk.io;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
 import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Futures;
@@ -42,6 +44,7 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 
 /**
@@ -135,20 +138,22 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
   @Override
   public final FileBasedSource<T> createSourceForSubrange(long start, long end) {
-    Preconditions.checkArgument(mode != Mode.FILEPATTERN,
+    checkArgument(mode != Mode.FILEPATTERN,
         "Cannot split a file pattern based source based on positions");
-    Preconditions.checkArgument(start >= getStartOffset(), "Start offset value " + start
-        + " of the subrange cannot be smaller than the start offset value " + getStartOffset()
-        + " of the parent source");
-    Preconditions.checkArgument(end <= getEndOffset(), "End offset value " + end
-        + " of the subrange cannot be larger than the end offset value " + getEndOffset()
-        + " of the parent source");
+    checkArgument(start >= getStartOffset(),
+        "Start offset value %s of the subrange cannot be smaller than the start offset value %s"
+            + " of the parent source",
+        start,
+        getStartOffset());
+    checkArgument(end <= getEndOffset(),
+        "End offset value %s of the subrange cannot be larger than the end offset value %s",
+        end,
+        getEndOffset());
 
     FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec, start, end);
     if (start > 0 || end != Long.MAX_VALUE) {
-      Preconditions.checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
-          "Source created for the range [" + start + "," + end + ")"
-          + " must be a subrange source");
+      checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
+          "Source created for the range [%s,%s) must be a subrange source", start, end);
     }
     return source;
   }
@@ -177,7 +182,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       PipelineOptions options);
 
   @Override
-  public final long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+  public final long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
     // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
     // we perform the size estimation of files and file patterns using the interface provided by
     // IOChannelFactory.
@@ -190,12 +195,14 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       Collection<String> inputs = factory.match(fileOrPatternSpec);
       if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
         totalSize = getExactTotalSizeOfFiles(inputs, factory);
-        LOG.debug("Size estimation of all files of pattern " + fileOrPatternSpec + " took "
-           + (System.currentTimeMillis() - startTime) + " ms");
+        LOG.debug("Size estimation of all files of pattern {} took {} ms",
+            fileOrPatternSpec,
+            System.currentTimeMillis() - startTime);
       } else {
         totalSize = getEstimatedSizeOfFilesBySampling(inputs, factory);
-        LOG.debug("Size estimation of pattern " + fileOrPatternSpec + " by sampling took "
-           + (System.currentTimeMillis() - startTime) + " ms");
+        LOG.debug("Size estimation of pattern {} by sampling took {} ms",
+            fileOrPatternSpec,
+            System.currentTimeMillis() - startTime);
       }
       return totalSize;
     } else {
@@ -210,12 +217,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   // TODO: replace this with bulk request API when it is available. Will require updates
   // to IOChannelFactory interface.
   private static long getExactTotalSizeOfFiles(
-      Collection<String> files, IOChannelFactory ioChannelFactory) throws Exception {
+      Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
     List<ListenableFuture<Long>> futures = new ArrayList<>();
     ListeningExecutorService service =
         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
-    long totalSize = 0;
     try {
+      long totalSize = 0;
       for (String file : files) {
         futures.add(createFutureForSizeEstimation(file, ioChannelFactory, service));
       }
@@ -225,7 +232,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       }
 
       return totalSize;
-    } finally {
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    }  finally {
       service.shutdown();
     }
   }
@@ -237,7 +249,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     return service.submit(
         new Callable<Long>() {
           @Override
-          public Long call() throws Exception {
+          public Long call() throws IOException {
             return ioChannelFactory.getSizeBytes(file);
           }
         });
@@ -248,7 +260,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   // estimate.
   // TODO: Implement a more efficient sampling mechanism.
   private static long getEstimatedSizeOfFilesBySampling(
-      Collection<String> files, IOChannelFactory ioChannelFactory) throws Exception {
+      Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
     int sampleSize = (int) (FRACTION_OF_FILES_TO_STAT * files.size());
     sampleSize = Math.max(MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT, sampleSize);
 
@@ -295,11 +307,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
         List<? extends FileBasedSource<T>> splitResults =
             ImmutableList.copyOf(Iterables.concat(Futures.allAsList(futures).get()));
         LOG.debug(
-            "Splitting the source based on file pattern "
-                + fileOrPatternSpec
-                + " took "
-                + (System.currentTimeMillis() - startTime)
-                + " ms");
+            "Splitting the source based on file pattern {} took {} ms",
+            fileOrPatternSpec,
+            System.currentTimeMillis() - startTime);
         return splitResults;
       } finally {
         service.shutdown();
@@ -313,8 +323,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
         }
         return splitResults;
       } else {
-        LOG.debug("The source for file " + fileOrPatternSpec
-            + " is not split into sub-range based sources since the file is not seekable");
+        LOG.debug("The source for file {} is not split into sub-range based sources since "
+            + "the file is not seekable",
+            fileOrPatternSpec);
         return ImmutableList.of(this);
       }
     }
@@ -348,14 +359,16 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
         try {
           endOffset = IOChannelUtils.getFactory(fileName).getSizeBytes(fileName);
         } catch (IOException e) {
-          LOG.warn("Failed to get size of " + fileName, e);
+          LOG.warn("Failed to get size of {}", fileName, e);
           endOffset = Long.MAX_VALUE;
         }
         fileReaders.add(
             createForSubrangeOfFile(fileName, 0, endOffset).createSingleFileReader(options));
       }
-      LOG.debug("Creating a reader for file pattern " + fileOrPatternSpec + " took "
-          + (System.currentTimeMillis() - startTime) + " ms");
+      LOG.debug(
+          "Creating a reader for file pattern {} took {} ms",
+          fileOrPatternSpec,
+          System.currentTimeMillis() - startTime);
       if (fileReaders.size() == 1) {
         return fileReaders.get(0);
       }
@@ -382,12 +395,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     super.validate();
     switch (mode) {
       case FILEPATTERN:
-        Preconditions.checkArgument(getStartOffset() == 0,
+        checkArgument(getStartOffset() == 0,
             "FileBasedSource is based on a file pattern or a full single file "
-            + "but the starting offset proposed " + getStartOffset() + " is not zero");
-        Preconditions.checkArgument(getEndOffset() == Long.MAX_VALUE,
+            + "but the starting offset proposed %s is not zero", getStartOffset());
+        checkArgument(getEndOffset() == Long.MAX_VALUE,
             "FileBasedSource is based on a file pattern or a full single file "
-            + "but the ending offset proposed " + getEndOffset() + " is not Long.MAX_VALUE");
+            + "but the ending offset proposed %s is not Long.MAX_VALUE", getEndOffset());
         break;
       case SINGLE_FILE_OR_SUBRANGE:
         // Nothing more to validate.
@@ -398,10 +411,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   }
 
   @Override
-  public final long getMaxEndOffset(PipelineOptions options) throws Exception {
-    if (mode == Mode.FILEPATTERN) {
-      throw new IllegalArgumentException("Cannot determine the exact end offset of a file pattern");
-    }
+  public final long getMaxEndOffset(PipelineOptions options) throws IOException {
+    checkArgument(
+            mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
     if (getEndOffset() == Long.MAX_VALUE) {
       IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
       return factory.getSizeBytes(fileOrPatternSpec);
@@ -464,7 +476,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
      */
     public FileBasedReader(FileBasedSource<T> source) {
       super(source);
-      Preconditions.checkArgument(source.getMode() != Mode.FILEPATTERN,
+      checkArgument(source.getMode() != Mode.FILEPATTERN,
           "FileBasedReader does not support reading file patterns");
     }
 
@@ -484,12 +496,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
         seekChannel.position(source.getStartOffset());
       } else {
         // Channel is not seekable. Must not be a subrange.
-        Preconditions.checkArgument(source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
+        checkArgument(source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
             "Subrange-based sources must only be defined for file types that support seekable "
             + " read channels");
-        Preconditions.checkArgument(source.getStartOffset() == 0, "Start offset "
-            + source.getStartOffset()
-            + " is not zero but channel for reading the file is not seekable.");
+        checkArgument(source.getStartOffset() == 0,
+            "Start offset %s is not zero but channel for reading the file is not seekable.",
+            source.getStartOffset());
       }
 
       startReading(channel);
@@ -570,7 +582,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
     @Override
     public boolean advance() throws IOException {
-      Preconditions.checkState(currentReader != null, "Call start() before advance()");
+      checkState(currentReader != null, "Call start() before advance()");
       if (currentReader.advance()) {
         return true;
       }