You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2014/04/18 00:33:12 UTC
[1/2] git commit: ACCUMULO-2671 Refactoring BlockedOutputStream to
not recurse. With test
Repository: accumulo
Updated Branches:
refs/heads/1.6.0-SNAPSHOT cacb1b625 -> 8e3dc7b47
ACCUMULO-2671 Refactoring BlockedOutputStream to not recurse. With test
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/17344890
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/17344890
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/17344890
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 173448903a4b3eb5e755421428e65a4a4dd67de5
Parents: cacb1b6
Author: John Vines <vi...@apache.org>
Authored: Thu Apr 17 17:57:32 2014 -0400
Committer: John Vines <vi...@apache.org>
Committed: Thu Apr 17 17:57:32 2014 -0400
----------------------------------------------------------------------
.../security/crypto/BlockedOutputStream.java | 17 ++++----
.../security/crypto/BlockedIOStreamTest.java | 44 +++++++++++++++-----
2 files changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/17344890/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
index ca72055..3ce648e 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
@@ -72,15 +72,18 @@ public class BlockedOutputStream extends OutputStream {
@Override
public void write(byte b[], int off, int len) throws IOException {
- if (bb.remaining() >= len) {
- bb.put(b, off, len);
- if (bb.remaining() == 0)
- flush();
- } else {
+ // Can't recurse here in case the len is large and the blocksize is small (and the stack is small)
+ // So we'll just fill up the buffer over and over
+ while (len >= bb.remaining()) {
int remaining = bb.remaining();
- write(b, off, remaining);
- write(b, off + remaining, len - remaining);
+ bb.put(b, off, remaining);
+ // This is guaranteed to have the buffer filled, so we'll just flush it. No check needed
+ flush();
+ off += remaining;
+ len -= remaining;
}
+ // And then write the remainder (and this is guaranteed to not fill the buffer, so we won't flush afteward
+ bb.put(b, off, len);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/17344890/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
index b344fc3..a116110 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
@@ -70,49 +70,73 @@ public class BlockedIOStreamTest {
@Test
public void testSmallBufferBlockedIO() throws IOException {
- writeRead(16, (12 + 4) * (int) (Math.ceil(25.0/12) + Math.ceil(31.0/12)));
+ writeRead(16, (12 + 4) * (int) (Math.ceil(25.0 / 12) + Math.ceil(31.0 / 12)));
}
-
+
@Test
public void testSpillingOverOutputStream() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// buffer will be size 12
BlockedOutputStream blockOut = new BlockedOutputStream(baos, 16, 16);
Random r = new Random(22);
-
+
byte[] undersized = new byte[11];
byte[] perfectSized = new byte[12];
byte[] overSized = new byte[13];
byte[] perfectlyOversized = new byte[13];
byte filler = (byte) r.nextInt();
-
+
r.nextBytes(undersized);
r.nextBytes(perfectSized);
r.nextBytes(overSized);
r.nextBytes(perfectlyOversized);
-
+
// 1 block
blockOut.write(undersized);
blockOut.write(filler);
blockOut.flush();
-
+
// 2 blocks
blockOut.write(perfectSized);
blockOut.write(filler);
blockOut.flush();
-
+
// 2 blocks
blockOut.write(overSized);
blockOut.write(filler);
blockOut.flush();
-
+
// 3 blocks
blockOut.write(undersized);
blockOut.write(perfectlyOversized);
blockOut.write(filler);
blockOut.flush();
-
+
+ blockOut.close();
+ assertEquals(16 * 8, baos.toByteArray().length);
+ }
+
+ @Test
+ public void testGiantWrite() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ int blockSize = 16;
+ // buffer will be size 12
+ BlockedOutputStream blockOut = new BlockedOutputStream(baos, blockSize, blockSize);
+ Random r = new Random(22);
+
+ int size = 1024 * 1024 * 128;
+ byte[] giant = new byte[size];
+
+ r.nextBytes(giant);
+
+ blockOut.write(giant);
+ blockOut.flush();
+
blockOut.close();
- assertEquals(16*8, baos.toByteArray().length);
+ baos.close();
+
+ int blocks = (int) Math.ceil(size / (blockSize - 4.0));
+ assertEquals(blocks * 16, baos.toByteArray().length);
}
+
}
[2/2] git commit: ACCUMULO-2690 fixing output stream short circuiting
in walog
Posted by vi...@apache.org.
ACCUMULO-2690 fixing output stream short circuiting in walog
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8e3dc7b4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8e3dc7b4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8e3dc7b4
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 8e3dc7b475b6b803c4c4402e54ad185cf60b3d89
Parents: 1734489
Author: John Vines <vi...@apache.org>
Authored: Thu Apr 17 18:00:49 2014 -0400
Committer: John Vines <vi...@apache.org>
Committed: Thu Apr 17 18:00:49 2014 -0400
----------------------------------------------------------------------
.../core/security/crypto/NoFlushOutputStream.java | 10 ++--------
.../java/org/apache/accumulo/tserver/log/DfsLogger.java | 9 ++++++---
2 files changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8e3dc7b4/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java
index 2f9f4bb..17fc06a 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java
@@ -16,22 +16,16 @@
*/
package org.apache.accumulo.core.security.crypto;
-import java.io.FilterOutputStream;
-import java.io.IOException;
+import java.io.DataOutputStream;
import java.io.OutputStream;
-public class NoFlushOutputStream extends FilterOutputStream {
+public class NoFlushOutputStream extends DataOutputStream {
public NoFlushOutputStream(OutputStream out) {
super(out);
}
@Override
- public void write(byte[] b, int off, int len) throws IOException {
- out.write(b, off, len);
- }
-
- @Override
public void flush() {}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8e3dc7b4/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index c960bd6..eb04f09 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -368,7 +368,8 @@ public class DfsLogger {
CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
- params.setPlaintextOutputStream(new NoFlushOutputStream(logFile));
+ NoFlushOutputStream nfos = new NoFlushOutputStream(logFile);
+ params.setPlaintextOutputStream(nfos);
// In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here,
// so that that crypto module can re-read its own parameters.
@@ -380,9 +381,11 @@ public class DfsLogger {
// If the module just kicks back our original stream, then just use it, don't wrap it in
// another data OutputStream.
- if (encipheringOutputStream == logFile) {
- encryptingLogFile = logFile;
+ if (encipheringOutputStream == nfos) {
+ log.debug("No enciphering, using raw output stream");
+ encryptingLogFile = nfos;
} else {
+ log.debug("Enciphering found, wrapping in DataOutputStream");
encryptingLogFile = new DataOutputStream(encipheringOutputStream);
}