You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2014/01/16 23:44:10 UTC
[2/2] git commit: ACCUMULO-1998 Resolving corner cases around blocked
output stream behavior and migration
ACCUMULO-1998 Resolving corner cases around blocked output stream behavior and migration
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9ba06ff2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9ba06ff2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9ba06ff2
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 9ba06ff2d515d982b47b4f121a14bb5a98a024f0
Parents: 08a9804
Author: John Vines <vi...@apache.org>
Authored: Thu Jan 9 17:35:02 2014 -0500
Committer: John Vines <vi...@apache.org>
Committed: Thu Jan 16 17:43:51 2014 -0500
----------------------------------------------------------------------
.../security/crypto/BlockedOutputStream.java | 6 +--
.../security/crypto/CryptoModuleFactory.java | 4 +-
.../security/crypto/DefaultCryptoModule.java | 4 +-
.../security/crypto/BlockedIOStreamTest.java | 44 ++++++++++++++++++++
.../apache/accumulo/tserver/log/DfsLogger.java | 12 +++---
5 files changed, 58 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
index 9ca00b7..1f3cf3b 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
@@ -44,7 +44,7 @@ public class BlockedOutputStream extends OutputStream {
}
@Override
- public void flush() throws IOException {
+ public synchronized void flush() throws IOException {
int size = bb.position();
if (size == 0)
return;
@@ -64,9 +64,10 @@ public class BlockedOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
- bb.put((byte) b);
+ // Checking before provides same functionality but causes the case of previous flush() failing to now throw a buffer out of bounds error
if (bb.remaining() == 0)
flush();
+ bb.put((byte) b);
}
@Override
@@ -90,7 +91,6 @@ public class BlockedOutputStream extends OutputStream {
@Override
public void close() throws IOException {
flush();
- bb = null;
out.close();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
index 65acc6b..3fd43a0 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
@@ -287,7 +287,9 @@ public class CryptoModuleFactory {
params.setPadding(cipherTransformParts[2]);
params.setRandomNumberGenerator(cryptoOpts.get(Property.CRYPTO_SECURE_RNG.getKey()));
params.setRandomNumberGeneratorProvider(cryptoOpts.get(Property.CRYPTO_SECURE_RNG_PROVIDER.getKey()));
- params.setBlockStreamSize(Integer.parseInt(cryptoOpts.get(Property.CRYPTO_BLOCK_STREAM_SIZE.getKey())));
+ String blockStreamSize = cryptoOpts.get(Property.CRYPTO_BLOCK_STREAM_SIZE.getKey());
+ if (blockStreamSize != null)
+ params.setBlockStreamSize(Integer.parseInt(blockStreamSize));
return params;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
index 347887c..dfad05e 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
@@ -355,7 +355,7 @@ public class DefaultCryptoModule implements CryptoModule {
if (marker.equals(ENCRYPTION_HEADER_MARKER_V2))
params.setBlockStreamSize(dataIn.readInt());
else
- params.setBlockStreamSize(-1);
+ params.setBlockStreamSize(0);
} else {
log.trace("Read something off of the encrypted input stream that was not the encryption header marker, so pushing back bytes and returning the given stream");
@@ -398,7 +398,7 @@ public class DefaultCryptoModule implements CryptoModule {
InputStream blockedDecryptingInputStream = new CipherInputStream(params.getEncryptedInputStream(), cipher);
- if (params.getBlockStreamSize() != -1)
+ if (params.getBlockStreamSize() > 0)
blockedDecryptingInputStream = new BlockedInputStream(blockedDecryptingInputStream, cipher.getBlockSize(), params.getBlockStreamSize());
log.trace("Initialized cipher input stream with transformation ["+getCipherTransformation(params)+"]");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
index faba913..6fb52dd 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.util.Random;
import org.apache.accumulo.core.Constants;
import org.junit.Test;
@@ -71,4 +72,47 @@ public class BlockedIOStreamTest {
public void testSmallBufferBlockedIO() throws IOException {
writeRead(16, (12 + 4) * (int) (Math.ceil(25.0/12) + Math.ceil(31.0/12)));
}
+
+ @Test
+ public void testSpillingOverOutputStream() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ // buffer will be size 12
+ BlockedOutputStream blockOut = new BlockedOutputStream(baos, 16, 16);
+ Random r = new Random(22);
+
+ byte[] undersized = new byte[11];
+ byte[] perfectSized = new byte[12];
+ byte[] overSized = new byte[13];
+ byte[] perfectlyOversized = new byte[13];
+ byte filler = (byte) r.nextInt();
+
+ r.nextBytes(undersized);
+ r.nextBytes(perfectSized);
+ r.nextBytes(overSized);
+ r.nextBytes(perfectlyOversized);
+
+ // 1 block
+ blockOut.write(undersized);
+ blockOut.write(filler);
+ blockOut.flush();
+
+ // 2 blocks
+ blockOut.write(perfectSized);
+ blockOut.write(filler);
+ blockOut.flush();
+
+ // 2 blocks
+ blockOut.write(overSized);
+ blockOut.write(filler);
+ blockOut.flush();
+
+ // 3 blocks
+ blockOut.write(undersized);
+ blockOut.write(perfectlyOversized);
+ blockOut.write(filler);
+ blockOut.flush();
+
+ baos.close();
+ assertEquals(16*8, baos.toByteArray().length);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cc28ac2..3074614 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -366,14 +366,14 @@ public class DfsLogger {
logFile.write(LOG_FILE_HEADER_V3.getBytes());
CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
-
+
params.setPlaintextOutputStream(new NoFlushOutputStream(logFile));
-
+
// In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here,
// so that that crypto module can re-read its own parameters.
-
+
logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
-
+
params = cryptoModule.getEncryptingOutputStream(params);
OutputStream encipheringOutputStream = params.getEncryptedOutputStream();
@@ -437,7 +437,7 @@ public class DfsLogger {
log.info("Interrupted");
}
}
-
+
if (encryptingLogFile != null)
try {
encryptingLogFile.close();
@@ -492,7 +492,7 @@ public class DfsLogger {
work.exception = e;
}
}
-
+
synchronized (closeLock) {
// use a different lock for close check so that adding to work queue does not need
// to wait on walog I/O operations