You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/11/15 17:56:02 UTC

incubator-gobblin git commit: [GOBBLIN-314] Validate filesize when copying in writer

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master e9ad289c4 -> f3472d30c


[GOBBLIN-314] Validate filesize when copying in writer

Closes #2168 from jack-moseley/filesize_check


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f3472d30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f3472d30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f3472d30

Branch: refs/heads/master
Commit: f3472d30c716613bbe5e41df508e82b58add6e12
Parents: e9ad289
Author: Jack Moseley <jm...@linkedin.com>
Authored: Wed Nov 15 09:55:56 2017 -0800
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Wed Nov 15 09:55:56 2017 -0800

----------------------------------------------------------------------
 .../copy/writer/FileAwareInputStreamDataWriter.java    | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f3472d30/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 03fc2b6..cda144e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -84,6 +84,8 @@ import org.apache.gobblin.writer.DataWriter;
 public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileAwareInputStream> implements FinalState, SpeculativeAttemptAwareConstruct {
 
   public static final String GOBBLIN_COPY_BYTES_COPIED_METER = "gobblin.copy.bytesCopiedMeter";
+  public static final String GOBBLIN_COPY_CHECK_FILESIZE = "gobblin.copy.checkFileSize";
+  public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
 
   protected final AtomicLong bytesWritten = new AtomicLong();
   protected final AtomicLong filesWritten = new AtomicLong();
@@ -96,6 +98,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
   protected final RecoveryHelper recoveryHelper;
   protected final SharedResourcesBroker<GobblinScopeTypes> taskBroker;
   protected final int bufferSize;
+  private final boolean checkFileSize;
 
   protected final Meter copySpeedMeter;
 
@@ -143,6 +146,8 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
     this.bufferSize = state.getPropAsInt(CopyConfiguration.BUFFER_SIZE, StreamCopier.DEFAULT_BUFFER_SIZE);
     this.encryptionConfig = EncryptionConfigParser
         .getConfigForBranch(EncryptionConfigParser.EntityType.WRITER, this.state, numBranches, branchId);
+
+    this.checkFileSize = state.getPropAsBoolean(GOBBLIN_COPY_CHECK_FILESIZE, DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE);
   }
 
   public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId)
@@ -231,7 +236,13 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
         if (isInstrumentationEnabled()) {
           copier.withCopySpeedMeter(this.copySpeedMeter);
         }
-        this.bytesWritten.addAndGet(copier.copy());
+        long numBytes = copier.copy();
+        long fileSize = copyableFile.getFileStatus().getLen();
+        if (this.checkFileSize && numBytes != fileSize) {
+          throw new IOException(String.format("Number of bytes copied doesn't match filesize for file %s.",
+              copyableFile.getOrigin().getPath()));
+        }
+        this.bytesWritten.addAndGet(numBytes);
         if (isInstrumentationEnabled()) {
           log.info("File {}: copied {} bytes, average rate: {} B/s", copyableFile.getOrigin().getPath(),
               this.copySpeedMeter.getCount(), this.copySpeedMeter.getMeanRate());