You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xi...@apache.org on 2017/12/29 21:58:00 UTC

[3/4] hadoop git commit: HADOOP-14872. CryptoInputStream should implement unbuffer. Contributed by John Zhuge.

HADOOP-14872. CryptoInputStream should implement unbuffer. Contributed by John Zhuge.

(cherry picked from commit 6c32ddad30240a251caaefdf7fec9ff8ad177a7c)
(cherry picked from commit fc9e156484824fcb59faef2b2914f3cb53901b87)
(cherry picked from commit 5e0f4f212d0a2889edb7a7efb030f539faa4a8b9)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a1334ea1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a1334ea1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a1334ea1

Branch: refs/heads/branch-2.9
Commit: a1334ea177e4f74d47c3fa23664e8a5194c2138c
Parents: 141cc8f
Author: John Zhuge <jz...@apache.org>
Authored: Tue Nov 7 00:09:34 2017 -0800
Committer: Xiao Chen <xi...@apache.org>
Committed: Fri Dec 29 13:57:55 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/crypto/CryptoInputStream.java | 32 ++++++++-
 .../hadoop/crypto/CryptoStreamsTestBase.java    | 72 +++++++++++++++++++-
 .../apache/hadoop/crypto/TestCryptoStreams.java | 28 ++++++--
 .../crypto/TestCryptoStreamsForLocalFS.java     |  5 ++
 .../hadoop/crypto/TestCryptoStreamsNormal.java  |  5 ++
 5 files changed, 133 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1334ea1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index 0be6e34..a2273bf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -30,20 +30,23 @@ import java.util.EnumSet;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.HasFileDescriptor;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
 import org.apache.hadoop.io.ByteBufferPool;
-
-import com.google.common.base.Preconditions;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
@@ -61,7 +64,7 @@ import com.google.common.base.Preconditions;
 public class CryptoInputStream extends FilterInputStream implements 
     Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 
     CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, 
-    ReadableByteChannel {
+    ReadableByteChannel, CanUnbuffer, StreamCapabilities {
   private final byte[] oneByteBuf = new byte[1];
   private final CryptoCodec codec;
   private final Decryptor decryptor;
@@ -719,4 +722,27 @@ public class CryptoInputStream extends FilterInputStream implements
   public boolean isOpen() {
     return !closed;
   }
+
+  private void cleanDecryptorPool() {
+    decryptorPool.clear();
+  }
+
+  @Override
+  public void unbuffer() {
+    cleanBufferPool();
+    cleanDecryptorPool();
+    StreamCapabilitiesPolicy.unbuffer(in);
+  }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    switch (StringUtils.toLowerCase(capability)) {
+    case StreamCapabilities.READAHEAD:
+    case StreamCapabilities.DROPBEHIND:
+    case StreamCapabilities.UNBUFFER:
+      return true;
+    default:
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1334ea1/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
index 9183524..259383d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
@@ -27,6 +27,7 @@ import java.util.EnumSet;
 import java.util.Random;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
@@ -102,7 +103,32 @@ public abstract class CryptoStreamsTestBase {
     
     return total;
   }
-  
+
+  private int preadAll(PositionedReadable in, byte[] b, int off, int len)
+      throws IOException {
+    int n = 0;
+    int total = 0;
+    while (n != -1) {
+      total += n;
+      if (total >= len) {
+        break;
+      }
+      n = in.read(total, b, off + total, len - total);
+    }
+
+    return total;
+  }
+
+  private void preadCheck(PositionedReadable in) throws Exception {
+    byte[] result = new byte[dataLen];
+    int n = preadAll(in, result, 0, dataLen);
+
+    Assert.assertEquals(dataLen, n);
+    byte[] expectedData = new byte[n];
+    System.arraycopy(data, 0, expectedData, 0, n);
+    Assert.assertArrayEquals(result, expectedData);
+  }
+
   protected OutputStream getOutputStream(int bufferSize) throws IOException {
     return getOutputStream(bufferSize, key, iv);
   }
@@ -146,7 +172,6 @@ public abstract class CryptoStreamsTestBase {
     // EOF
     n = in.read(result, 0, dataLen);
     Assert.assertEquals(n, -1);
-    in.close();
   }
   
   /** Test crypto writing with different buffer size. */
@@ -730,4 +755,47 @@ public abstract class CryptoStreamsTestBase {
     
     in.close();
   }
+
+  /** Test unbuffer. */
+  @Test(timeout=120000)
+  public void testUnbuffer() throws Exception {
+    OutputStream out = getOutputStream(smallBufferSize);
+    writeData(out);
+
+    // Test buffered read
+    try (InputStream in = getInputStream(smallBufferSize)) {
+      // Test unbuffer after buffered read
+      readCheck(in);
+      ((CanUnbuffer) in).unbuffer();
+
+      if (in instanceof Seekable) {
+        // Test buffered read again after unbuffer
+        // Must seek to the beginning first
+        ((Seekable) in).seek(0);
+        readCheck(in);
+      }
+
+      // Test close after unbuffer
+      ((CanUnbuffer) in).unbuffer();
+      // The close will be called when exiting this try-with-resource block
+    }
+
+    // Test pread
+    try (InputStream in = getInputStream(smallBufferSize)) {
+      if (in instanceof PositionedReadable) {
+        PositionedReadable pin = (PositionedReadable) in;
+
+        // Test unbuffer after pread
+        preadCheck(pin);
+        ((CanUnbuffer) in).unbuffer();
+
+        // Test pread again after unbuffer
+        preadCheck(pin);
+
+        // Test close after unbuffer
+        ((CanUnbuffer) in).unbuffer();
+        // The close will be called when exiting this try-with-resource block
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1334ea1/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
index 810270b..c6c6067 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
@@ -29,11 +29,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.HasFileDescriptor;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -164,16 +166,18 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
     }
   }
   
-  public static class FakeInputStream extends InputStream implements 
-      Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 
-      CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
+  static class FakeInputStream extends InputStream
+      implements Seekable, PositionedReadable, ByteBufferReadable,
+                 HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
+                 HasEnhancedByteBufferAccess, CanUnbuffer,
+                 StreamCapabilities {
     private final byte[] oneByteBuf = new byte[1];
     private int pos = 0;
     private final byte[] data;
     private final int length;
     private boolean closed = false;
 
-    public FakeInputStream(DataInputBuffer in) {
+    FakeInputStream(DataInputBuffer in) {
       data = in.getData();
       length = in.getLength();
     }
@@ -355,6 +359,22 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
     }
 
     @Override
+    public void unbuffer() {
+    }
+
+    @Override
+    public boolean hasCapability(String capability) {
+      switch (capability.toLowerCase()) {
+      case StreamCapabilities.READAHEAD:
+      case StreamCapabilities.DROPBEHIND:
+      case StreamCapabilities.UNBUFFER:
+        return true;
+      default:
+        return false;
+      }
+    }
+
+    @Override
     public FileDescriptor getFileDescriptor() throws IOException {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1334ea1/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
index 1ef6f3c..bb3fd7a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
@@ -112,4 +112,9 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase {
   @Test(timeout=10000)
   public void testSeekToNewSource() throws Exception {
   }
+
+  @Ignore("Local file input stream does not support unbuffer")
+  @Override
+  @Test
+  public void testUnbuffer() throws Exception {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1334ea1/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
index b5382c1..7e30077 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
@@ -120,4 +120,9 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
   @Override
   @Test(timeout=10000)
   public void testHasEnhancedByteBufferAccess() throws IOException {}
+
+  @Ignore("ByteArrayInputStream does not support unbuffer")
+  @Override
+  @Test
+  public void testUnbuffer() throws Exception {}
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org