You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xi...@apache.org on 2017/12/29 21:58:00 UTC
[3/4] hadoop git commit: HADOOP-14872. CryptoInputStream should
implement unbuffer. Contributed by John Zhuge.
HADOOP-14872. CryptoInputStream should implement unbuffer. Contributed by John Zhuge.
(cherry picked from commit 6c32ddad30240a251caaefdf7fec9ff8ad177a7c)
(cherry picked from commit fc9e156484824fcb59faef2b2914f3cb53901b87)
(cherry picked from commit 5e0f4f212d0a2889edb7a7efb030f539faa4a8b9)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a1334ea1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a1334ea1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a1334ea1
Branch: refs/heads/branch-2.9
Commit: a1334ea177e4f74d47c3fa23664e8a5194c2138c
Parents: 141cc8f
Author: John Zhuge <jz...@apache.org>
Authored: Tue Nov 7 00:09:34 2017 -0800
Committer: Xiao Chen <xi...@apache.org>
Committed: Fri Dec 29 13:57:55 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/crypto/CryptoInputStream.java | 32 ++++++++-
.../hadoop/crypto/CryptoStreamsTestBase.java | 72 +++++++++++++++++++-
.../apache/hadoop/crypto/TestCryptoStreams.java | 28 ++++++--
.../crypto/TestCryptoStreamsForLocalFS.java | 5 ++
.../hadoop/crypto/TestCryptoStreamsNormal.java | 5 ++
5 files changed, 133 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1334ea1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index 0be6e34..a2273bf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -30,20 +30,23 @@ import java.util.EnumSet;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
import org.apache.hadoop.io.ByteBufferPool;
-
-import com.google.common.base.Preconditions;
+import org.apache.hadoop.util.StringUtils;
/**
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
@@ -61,7 +64,7 @@ import com.google.common.base.Preconditions;
public class CryptoInputStream extends FilterInputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
- ReadableByteChannel {
+ ReadableByteChannel, CanUnbuffer, StreamCapabilities {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Decryptor decryptor;
@@ -719,4 +722,27 @@ public class CryptoInputStream extends FilterInputStream implements
public boolean isOpen() {
return !closed;
}
+
+ private void cleanDecryptorPool() {
+ decryptorPool.clear();
+ }
+
+ @Override
+ public void unbuffer() {
+ cleanBufferPool();
+ cleanDecryptorPool();
+ StreamCapabilitiesPolicy.unbuffer(in);
+ }
+
+ @Override
+ public boolean hasCapability(String capability) {
+ switch (StringUtils.toLowerCase(capability)) {
+ case StreamCapabilities.READAHEAD:
+ case StreamCapabilities.DROPBEHIND:
+ case StreamCapabilities.UNBUFFER:
+ return true;
+ default:
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1334ea1/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
index 9183524..259383d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
@@ -27,6 +27,7 @@ import java.util.EnumSet;
import java.util.Random;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
@@ -102,7 +103,32 @@ public abstract class CryptoStreamsTestBase {
return total;
}
-
+
+ private int preadAll(PositionedReadable in, byte[] b, int off, int len)
+ throws IOException {
+ int n = 0;
+ int total = 0;
+ while (n != -1) {
+ total += n;
+ if (total >= len) {
+ break;
+ }
+ n = in.read(total, b, off + total, len - total);
+ }
+
+ return total;
+ }
+
+ private void preadCheck(PositionedReadable in) throws Exception {
+ byte[] result = new byte[dataLen];
+ int n = preadAll(in, result, 0, dataLen);
+
+ Assert.assertEquals(dataLen, n);
+ byte[] expectedData = new byte[n];
+ System.arraycopy(data, 0, expectedData, 0, n);
+ Assert.assertArrayEquals(result, expectedData);
+ }
+
protected OutputStream getOutputStream(int bufferSize) throws IOException {
return getOutputStream(bufferSize, key, iv);
}
@@ -146,7 +172,6 @@ public abstract class CryptoStreamsTestBase {
// EOF
n = in.read(result, 0, dataLen);
Assert.assertEquals(n, -1);
- in.close();
}
/** Test crypto writing with different buffer size. */
@@ -730,4 +755,47 @@ public abstract class CryptoStreamsTestBase {
in.close();
}
+
+ /** Test unbuffer. */
+ @Test(timeout=120000)
+ public void testUnbuffer() throws Exception {
+ OutputStream out = getOutputStream(smallBufferSize);
+ writeData(out);
+
+ // Test buffered read
+ try (InputStream in = getInputStream(smallBufferSize)) {
+ // Test unbuffer after buffered read
+ readCheck(in);
+ ((CanUnbuffer) in).unbuffer();
+
+ if (in instanceof Seekable) {
+ // Test buffered read again after unbuffer
+ // Must seek to the beginning first
+ ((Seekable) in).seek(0);
+ readCheck(in);
+ }
+
+ // Test close after unbuffer
+ ((CanUnbuffer) in).unbuffer();
+ // The close will be called when exiting this try-with-resource block
+ }
+
+ // Test pread
+ try (InputStream in = getInputStream(smallBufferSize)) {
+ if (in instanceof PositionedReadable) {
+ PositionedReadable pin = (PositionedReadable) in;
+
+ // Test unbuffer after pread
+ preadCheck(pin);
+ ((CanUnbuffer) in).unbuffer();
+
+ // Test pread again after unbuffer
+ preadCheck(pin);
+
+ // Test close after unbuffer
+ ((CanUnbuffer) in).unbuffer();
+ // The close will be called when exiting this try-with-resource block
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1334ea1/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
index 810270b..c6c6067 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
@@ -29,11 +29,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.DataInputBuffer;
@@ -164,16 +166,18 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
}
}
- public static class FakeInputStream extends InputStream implements
- Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
- CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
+ static class FakeInputStream extends InputStream
+ implements Seekable, PositionedReadable, ByteBufferReadable,
+ HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
+ HasEnhancedByteBufferAccess, CanUnbuffer,
+ StreamCapabilities {
private final byte[] oneByteBuf = new byte[1];
private int pos = 0;
private final byte[] data;
private final int length;
private boolean closed = false;
- public FakeInputStream(DataInputBuffer in) {
+ FakeInputStream(DataInputBuffer in) {
data = in.getData();
length = in.getLength();
}
@@ -355,6 +359,22 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
}
@Override
+ public void unbuffer() {
+ }
+
+ @Override
+ public boolean hasCapability(String capability) {
+ switch (capability.toLowerCase()) {
+ case StreamCapabilities.READAHEAD:
+ case StreamCapabilities.DROPBEHIND:
+ case StreamCapabilities.UNBUFFER:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
public FileDescriptor getFileDescriptor() throws IOException {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1334ea1/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
index 1ef6f3c..bb3fd7a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
@@ -112,4 +112,9 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase {
@Test(timeout=10000)
public void testSeekToNewSource() throws Exception {
}
+
+ @Ignore("Local file input stream does not support unbuffer")
+ @Override
+ @Test
+ public void testUnbuffer() throws Exception {}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1334ea1/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
index b5382c1..7e30077 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
@@ -120,4 +120,9 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
@Override
@Test(timeout=10000)
public void testHasEnhancedByteBufferAccess() throws IOException {}
+
+ @Ignore("ByteArrayInputStream does not support unbuffer")
+ @Override
+ @Test
+ public void testUnbuffer() throws Exception {}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org