You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/08/14 20:01:19 UTC

[03/12] storm git commit: Make the buffer size in file copy an option and set a reasonable default

Make the buffer size in file copy an option and set a reasonable default


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e71e2ddf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e71e2ddf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e71e2ddf

Branch: refs/heads/master
Commit: e71e2ddfafe8bfb082320c2c3b40f4fb2a3d4995
Parents: 579dc87
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Jul 22 12:31:46 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Wed Jul 22 12:31:46 2015 +0530

----------------------------------------------------------------------
 .../main/java/org/apache/storm/hdfs/trident/HdfsState.java   | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e71e2ddf/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 9b9ba8e..8d32b32 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -193,6 +193,7 @@ public class HdfsState implements State {
         private transient FSDataOutputStream out;
         protected RecordFormat format;
         private long offset = 0;
+        private int bufferSize =  131072; // default 128 K
 
         public HdfsFileOptions withFsUrl(String fsUrl) {
             this.fsUrl = fsUrl;
@@ -219,6 +220,11 @@ public class HdfsState implements State {
             return this;
         }
 
+        public HdfsFileOptions withBufferSize(int size) {
+            this.bufferSize = Math.max(4096, size); // at least 4K
+            return this;
+        }
+
         @Deprecated
         public HdfsFileOptions addRotationAction(RotationAction action) {
             this.rotationActions.add(action);
@@ -262,7 +268,7 @@ public class HdfsState implements State {
         }
 
         private void copyBytes(FSDataInputStream is, FSDataOutputStream out, long bytesToCopy) throws IOException {
-            byte[] buf = new byte[4096];
+            byte[] buf = new byte[bufferSize];
             int n;
             while ((n = is.read(buf)) != -1 && bytesToCopy > 0) {
                 out.write(buf, 0, (int) Math.min(n, bytesToCopy));