You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/11/15 17:56:02 UTC
incubator-gobblin git commit: [GOBBLIN-314] Validate filesize when
copying in writer
Repository: incubator-gobblin
Updated Branches:
refs/heads/master e9ad289c4 -> f3472d30c
[GOBBLIN-314] Validate filesize when copying in writer
Closes #2168 from jack-moseley/filesize_check
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f3472d30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f3472d30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f3472d30
Branch: refs/heads/master
Commit: f3472d30c716613bbe5e41df508e82b58add6e12
Parents: e9ad289
Author: Jack Moseley <jm...@linkedin.com>
Authored: Wed Nov 15 09:55:56 2017 -0800
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Wed Nov 15 09:55:56 2017 -0800
----------------------------------------------------------------------
.../copy/writer/FileAwareInputStreamDataWriter.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f3472d30/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 03fc2b6..cda144e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -84,6 +84,8 @@ import org.apache.gobblin.writer.DataWriter;
public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileAwareInputStream> implements FinalState, SpeculativeAttemptAwareConstruct {
public static final String GOBBLIN_COPY_BYTES_COPIED_METER = "gobblin.copy.bytesCopiedMeter";
+ public static final String GOBBLIN_COPY_CHECK_FILESIZE = "gobblin.copy.checkFileSize";
+ public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
protected final AtomicLong bytesWritten = new AtomicLong();
protected final AtomicLong filesWritten = new AtomicLong();
@@ -96,6 +98,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
protected final RecoveryHelper recoveryHelper;
protected final SharedResourcesBroker<GobblinScopeTypes> taskBroker;
protected final int bufferSize;
+ private final boolean checkFileSize;
protected final Meter copySpeedMeter;
@@ -143,6 +146,8 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
this.bufferSize = state.getPropAsInt(CopyConfiguration.BUFFER_SIZE, StreamCopier.DEFAULT_BUFFER_SIZE);
this.encryptionConfig = EncryptionConfigParser
.getConfigForBranch(EncryptionConfigParser.EntityType.WRITER, this.state, numBranches, branchId);
+
+ this.checkFileSize = state.getPropAsBoolean(GOBBLIN_COPY_CHECK_FILESIZE, DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE);
}
public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId)
@@ -231,7 +236,13 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
if (isInstrumentationEnabled()) {
copier.withCopySpeedMeter(this.copySpeedMeter);
}
- this.bytesWritten.addAndGet(copier.copy());
+ long numBytes = copier.copy();
+ long fileSize = copyableFile.getFileStatus().getLen();
+ if (this.checkFileSize && numBytes != fileSize) {
+ throw new IOException(String.format("Number of bytes copied doesn't match filesize for file %s.",
+ copyableFile.getOrigin().getPath()));
+ }
+ this.bytesWritten.addAndGet(numBytes);
if (isInstrumentationEnabled()) {
log.info("File {}: copied {} bytes, average rate: {} B/s", copyableFile.getOrigin().getPath(),
this.copySpeedMeter.getCount(), this.copySpeedMeter.getMeanRate());