You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2014/01/16 23:44:10 UTC

[2/2] git commit: ACCUMULO-1998 Resolving corner cases around blocked output stream behavior and migration

ACCUMULO-1998 Resolving corner cases around blocked output stream behavior and migration


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9ba06ff2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9ba06ff2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9ba06ff2

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 9ba06ff2d515d982b47b4f121a14bb5a98a024f0
Parents: 08a9804
Author: John Vines <vi...@apache.org>
Authored: Thu Jan 9 17:35:02 2014 -0500
Committer: John Vines <vi...@apache.org>
Committed: Thu Jan 16 17:43:51 2014 -0500

----------------------------------------------------------------------
 .../security/crypto/BlockedOutputStream.java    |  6 +--
 .../security/crypto/CryptoModuleFactory.java    |  4 +-
 .../security/crypto/DefaultCryptoModule.java    |  4 +-
 .../security/crypto/BlockedIOStreamTest.java    | 44 ++++++++++++++++++++
 .../apache/accumulo/tserver/log/DfsLogger.java  | 12 +++---
 5 files changed, 58 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
index 9ca00b7..1f3cf3b 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
@@ -44,7 +44,7 @@ public class BlockedOutputStream extends OutputStream {
   }
 
   @Override
-  public void flush() throws IOException {
+  public synchronized void flush() throws IOException {
     int size = bb.position();
     if (size == 0)
       return;
@@ -64,9 +64,10 @@ public class BlockedOutputStream extends OutputStream {
 
   @Override
   public void write(int b) throws IOException {
-    bb.put((byte) b);
+    // Checking before provides same functionality but causes the case of previous flush() failing to now throw a buffer out of bounds error
     if (bb.remaining() == 0)
       flush();
+    bb.put((byte) b);
   }
 
   @Override
@@ -90,7 +91,6 @@ public class BlockedOutputStream extends OutputStream {
   @Override
   public void close() throws IOException {
     flush();
-    bb = null;
     out.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
index 65acc6b..3fd43a0 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
@@ -287,7 +287,9 @@ public class CryptoModuleFactory {
     params.setPadding(cipherTransformParts[2]);
     params.setRandomNumberGenerator(cryptoOpts.get(Property.CRYPTO_SECURE_RNG.getKey()));
     params.setRandomNumberGeneratorProvider(cryptoOpts.get(Property.CRYPTO_SECURE_RNG_PROVIDER.getKey()));
-    params.setBlockStreamSize(Integer.parseInt(cryptoOpts.get(Property.CRYPTO_BLOCK_STREAM_SIZE.getKey())));
+    String blockStreamSize = cryptoOpts.get(Property.CRYPTO_BLOCK_STREAM_SIZE.getKey());
+    if (blockStreamSize != null)
+      params.setBlockStreamSize(Integer.parseInt(blockStreamSize));
 
     return params;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
index 347887c..dfad05e 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
@@ -355,7 +355,7 @@ public class DefaultCryptoModule implements CryptoModule {
         if (marker.equals(ENCRYPTION_HEADER_MARKER_V2))
           params.setBlockStreamSize(dataIn.readInt());
         else
-          params.setBlockStreamSize(-1);
+          params.setBlockStreamSize(0);
       } else {
         
         log.trace("Read something off of the encrypted input stream that was not the encryption header marker, so pushing back bytes and returning the given stream");
@@ -398,7 +398,7 @@ public class DefaultCryptoModule implements CryptoModule {
     
     InputStream blockedDecryptingInputStream = new CipherInputStream(params.getEncryptedInputStream(), cipher);
     
-    if (params.getBlockStreamSize() != -1)
+    if (params.getBlockStreamSize() > 0)
       blockedDecryptingInputStream = new BlockedInputStream(blockedDecryptingInputStream, cipher.getBlockSize(), params.getBlockStreamSize());
 
     log.trace("Initialized cipher input stream with transformation ["+getCipherTransformation(params)+"]");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
index faba913..6fb52dd 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.Random;
 
 import org.apache.accumulo.core.Constants;
 import org.junit.Test;
@@ -71,4 +72,47 @@ public class BlockedIOStreamTest {
   public void testSmallBufferBlockedIO() throws IOException {
     writeRead(16, (12 + 4) * (int) (Math.ceil(25.0/12) + Math.ceil(31.0/12)));
   }
+  
+  @Test
+  public void testSpillingOverOutputStream() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    // buffer will be size 12
+    BlockedOutputStream blockOut = new BlockedOutputStream(baos, 16, 16);
+    Random r = new Random(22);
+    
+    byte[] undersized = new byte[11];
+    byte[] perfectSized = new byte[12];
+    byte[] overSized = new byte[13];
+    byte[] perfectlyOversized = new byte[13];
+    byte filler = (byte) r.nextInt();
+    
+    r.nextBytes(undersized);
+    r.nextBytes(perfectSized);
+    r.nextBytes(overSized);
+    r.nextBytes(perfectlyOversized);
+    
+    // 1 block
+    blockOut.write(undersized);
+    blockOut.write(filler);
+    blockOut.flush();
+    
+    // 2 blocks
+    blockOut.write(perfectSized);
+    blockOut.write(filler);
+    blockOut.flush();
+    
+    // 2 blocks
+    blockOut.write(overSized);
+    blockOut.write(filler);
+    blockOut.flush();
+    
+    // 3 blocks
+    blockOut.write(undersized);
+    blockOut.write(perfectlyOversized);
+    blockOut.write(filler);
+    blockOut.flush();
+    
+    baos.close();
+    assertEquals(16*8, baos.toByteArray().length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cc28ac2..3074614 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -366,14 +366,14 @@ public class DfsLogger {
       logFile.write(LOG_FILE_HEADER_V3.getBytes());
 
       CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
-      
+
       params.setPlaintextOutputStream(new NoFlushOutputStream(logFile));
-      
+
       // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here,
       // so that that crypto module can re-read its own parameters.
-      
+
       logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
-      
+
       
       params = cryptoModule.getEncryptingOutputStream(params);
       OutputStream encipheringOutputStream = params.getEncryptedOutputStream();
@@ -437,7 +437,7 @@ public class DfsLogger {
           log.info("Interrupted");
         }
     }
-    
+
     if (encryptingLogFile != null)
       try {
         encryptingLogFile.close();
@@ -492,7 +492,7 @@ public class DfsLogger {
         work.exception = e;
       }
     }
-    
+
     synchronized (closeLock) {
       // use a different lock for close check so that adding to work queue does not need
       // to wait on walog I/O operations