You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/10/14 18:13:56 UTC
orc git commit: ORC-252. Add support for Key Management Servers (kms)
to HadoopShims.
Repository: orc
Updated Branches:
refs/heads/master afd0175be -> af29f0386
ORC-252. Add support for Key Management Servers (kms) to HadoopShims.
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/af29f038
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/af29f038
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/af29f038
Branch: refs/heads/master
Commit: af29f03862f13fded501959da8a24c9a25c86898
Parents: afd0175
Author: Owen O'Malley <om...@apache.org>
Authored: Fri Oct 13 08:27:57 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Fri Oct 13 08:27:57 2017 -0700
----------------------------------------------------------------------
java/shims/pom.xml | 10 +-
.../org/apache/orc/EncryptionAlgorithm.java | 92 ++++++++++
.../java/org/apache/orc/impl/HadoopShims.java | 94 +++++++++-
.../org/apache/orc/impl/HadoopShimsCurrent.java | 13 +-
.../org/apache/orc/impl/HadoopShimsPre2_3.java | 7 +
.../org/apache/orc/impl/HadoopShimsPre2_6.java | 124 +++++++++++++
.../org/apache/orc/impl/HadoopShimsPre2_7.java | 173 +++++++++++++------
.../apache/orc/impl/TestHadoopShimsPre2_7.java | 60 +++++++
8 files changed, 508 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/af29f038/java/shims/pom.xml
----------------------------------------------------------------------
diff --git a/java/shims/pom.xml b/java/shims/pom.xml
index a4b58c2..019790d 100644
--- a/java/shims/pom.xml
+++ b/java/shims/pom.xml
@@ -46,6 +46,13 @@
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
+
+ <!-- test inter-project -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -65,9 +72,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
- <sourceFileExcludes>
- <exclude>**/OrcProto.java</exclude>
- </sourceFileExcludes>
<destDir>${project.artifactId}</destDir>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/orc/blob/af29f038/java/shims/src/java/org/apache/orc/EncryptionAlgorithm.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/EncryptionAlgorithm.java b/java/shims/src/java/org/apache/orc/EncryptionAlgorithm.java
new file mode 100644
index 0000000..68bb3fc
--- /dev/null
+++ b/java/shims/src/java/org/apache/orc/EncryptionAlgorithm.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * The encryption algorithms supported by ORC.
+ *
+ * This class can't reference any of the newer Hadoop classes.
+ */
+public enum EncryptionAlgorithm {
+ AES_128("AES", "CTR/NoPadding", 16, 1),
+ AES_256("AES", "CTR/NoPadding", 32, 2);
+
+ private final String algorithm;
+ private final String mode;
+ private final int keyLength;
+ private final int serialization;
+ private final byte[] zero;
+
+ EncryptionAlgorithm(String algorithm, String mode, int keyLength,
+ int serialization) {
+ this.algorithm = algorithm;
+ this.mode = mode;
+ this.keyLength = keyLength;
+ this.serialization = serialization;
+ zero = new byte[keyLength];
+ }
+
+ public String getAlgorithm() {
+ return algorithm;
+ }
+
+ public Cipher createCipher() {
+ try {
+ return Cipher.getInstance(algorithm + "/" + mode);
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalArgumentException("Bad algorithm " + algorithm);
+ } catch (NoSuchPaddingException e) {
+ throw new IllegalArgumentException("Bad padding " + algorithm);
+ }
+ }
+
+ public int keyLength() {
+ return keyLength;
+ }
+
+ public byte[] getZeroKey() {
+ return zero;
+ }
+
+ /**
+ * Get the serialization code for this enumeration.
+ * @return the serialization value
+ */
+ public int getSerialization() {
+ return serialization;
+ }
+
+ /**
+ * Get the serialization code for this enumeration.
+ * @return the serialization value
+ */
+ public static EncryptionAlgorithm fromSerialization(int serialization) {
+ for(EncryptionAlgorithm algorithm: values()) {
+ if (algorithm.serialization == serialization) {
+ return algorithm;
+ }
+ }
+ throw new IllegalArgumentException("Unknown code in encryption algorithm " +
+ serialization);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/af29f038/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
index 3ba3b43..2eb210d 100644
--- a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
@@ -18,12 +18,15 @@
package org.apache.orc.impl;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.orc.EncryptionAlgorithm;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.security.Key;
public interface HadoopShims {
@@ -75,25 +78,32 @@ public interface HadoopShims {
/**
* Provides an HDFS ZeroCopyReader shim.
- * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to)
+ * @param in FSDataInputStream to read from (where the cached/mmap buffers are
+ * tied to)
* @param pool ByteBufferPoolShim to allocate fallback buffers with
*
* @return returns null if not supported
*/
- ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException;
+ ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+ ByteBufferPoolShim pool
+ ) throws IOException;
interface ZeroCopyReaderShim extends Closeable {
+
/**
- * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer.
- * Also move the in stream by that amount. The data read can be small than maxLength.
+ * Get a ByteBuffer from the FSDataInputStream - this can be either a
+ * HeapByteBuffer or an MappedByteBuffer. Also move the in stream by that
+ * amount. The data read can be small than maxLength.
*
* @return ByteBuffer read from the stream,
*/
- ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException;
+ ByteBuffer readBuffer(int maxLength,
+ boolean verifyChecksums) throws IOException;
+
/**
* Release a ByteBuffer obtained from a read on the
- * Also move the in stream by that amount. The data read can be small than maxLength.
- *
+ * Also move the in stream by that amount. The data read can be small than
+ * maxLength.
*/
void releaseBuffer(ByteBuffer buffer);
@@ -109,4 +119,74 @@ public interface HadoopShims {
* @return the number of bytes written
*/
long padStreamToBlock(OutputStream output, long padding) throws IOException;
+
+ /**
+ * A source of crypto keys. This is usually backed by a Ranger KMS.
+ */
+ interface KeyProvider {
+
+ /**
+ * Get the current metadata for a given key. This is used when encrypting
+ * new data.
+ * @param keyName the name of a key
+ * @return metadata for the current version of the key
+ */
+ KeyMetadata getCurrentKeyVersion(String keyName) throws IOException;
+
+ /**
+ * Create a metadata object while reading.
+ * @param keyName the name of the key
+ * @param version the version of the key to use
+ * @param algorithm the algorithm for that version of the key
+ * @return the metadata for the key version
+ */
+ KeyMetadata getKeyVersion(String keyName, int version,
+ EncryptionAlgorithm algorithm);
+
+ /**
+ * Create a local key for the given key version and initialization vector.
+ * Given a probabilistically unique iv, it will generate a unique key
+ * with the master key at the specified version. This allows the encryption
+ * to use this local key for the encryption and decryption without ever
+ * having access to the master key.
+ *
+ * This uses KeyProviderCryptoExtension.decryptEncryptedKey with a fixed key
+ * of the appropriate length.
+ *
+ * @param key the master key version
+ * @param iv the unique initialization vector
+ * @return the local key's material
+ */
+ Key getLocalKey(KeyMetadata key, byte[] iv) throws IOException;
+ }
+
+ /**
+ * Information about a crypto key.
+ */
+ interface KeyMetadata {
+ /**
+ * Get the name of the key.
+ */
+ String getKeyName();
+
+ /**
+ * Get the encryption algorithm for this key.
+ * @return the algorithm
+ */
+ EncryptionAlgorithm getAlgorithm();
+
+ /**
+ * Get the version of this key.
+ * @return the version
+ */
+ int getVersion();
+ }
+
+ /**
+ * Create a random key for encrypting.
+ * @param conf the configuration
+ * @return a key provider or null if none was provided
+ */
+ KeyProvider getKeyProvider(Configuration conf) throws IOException;
+
}
http://git-wip-us.apache.org/repos/asf/orc/blob/af29f038/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
index d9c9b6e..ca22fbf 100644
--- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
@@ -18,6 +18,7 @@
package org.apache.orc.impl;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -27,11 +28,16 @@ import java.util.EnumSet;
/**
* Shims for recent versions of Hadoop
+ *
+ * Adds support for:
+ * <ul>
+ * <li>Variable length HDFS blocks</li>
+ * </ul>
*/
public class HadoopShimsCurrent implements HadoopShims {
public DirectDecompressor getDirectDecompressor(DirectCompressionType codec) {
- return HadoopShimsPre2_7.getDecompressor(codec);
+ return HadoopShimsPre2_6.getDecompressor(codec);
}
@Override
@@ -53,5 +59,10 @@ public class HadoopShimsCurrent implements HadoopShims {
}
}
+ @Override
+ public KeyProvider getKeyProvider(Configuration conf) throws IOException {
+ return new HadoopShimsPre2_7.KeyProviderImpl(conf);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/orc/blob/af29f038/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java
index 3c67691..6190eb2 100644
--- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java
@@ -18,6 +18,7 @@
package org.apache.orc.impl;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import java.io.IOException;
@@ -62,4 +63,10 @@ public class HadoopShimsPre2_3 implements HadoopShims {
return padStream(output, padding);
}
+ @Override
+ public KeyProvider getKeyProvider(Configuration conf) {
+ // Not supported
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/orc/blob/af29f038/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java
new file mode 100644
index 0000000..aa312e6
--- /dev/null
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Shims for versions of Hadoop less than 2.6
+ *
+ * Adds support for:
+ * <ul>
+ * <li>Direct buffer decompression</li>
+ * <li>Zero copy</li>
+ * </ul>
+ */
+public class HadoopShimsPre2_6 implements HadoopShims {
+
+ static class SnappyDirectDecompressWrapper implements DirectDecompressor {
+ private final SnappyDirectDecompressor root;
+
+ SnappyDirectDecompressWrapper(SnappyDirectDecompressor root) {
+ this.root = root;
+ }
+
+ public void decompress(ByteBuffer input, ByteBuffer output) throws IOException {
+ root.decompress(input, output);
+ }
+
+ @Override
+ public void reset() {
+ root.reset();
+ }
+
+ @Override
+ public void end() {
+ root.end();
+ }
+ }
+
+ static class ZlibDirectDecompressWrapper implements DirectDecompressor {
+ private final ZlibDecompressor.ZlibDirectDecompressor root;
+
+ ZlibDirectDecompressWrapper(ZlibDecompressor.ZlibDirectDecompressor root) {
+ this.root = root;
+ }
+
+ public void decompress(ByteBuffer input, ByteBuffer output) throws IOException {
+ root.decompress(input, output);
+ }
+
+ @Override
+ public void reset() {
+ root.reset();
+ }
+
+ @Override
+ public void end() {
+ root.end();
+ }
+ }
+
+ static DirectDecompressor getDecompressor( DirectCompressionType codec) {
+ switch (codec) {
+ case ZLIB:
+ return new ZlibDirectDecompressWrapper
+ (new ZlibDecompressor.ZlibDirectDecompressor());
+ case ZLIB_NOHEADER:
+ return new ZlibDirectDecompressWrapper
+ (new ZlibDecompressor.ZlibDirectDecompressor
+ (ZlibDecompressor.CompressionHeader.NO_HEADER, 0));
+ case SNAPPY:
+ return new SnappyDirectDecompressWrapper
+ (new SnappyDirectDecompressor());
+ default:
+ return null;
+ }
+ }
+
+ public DirectDecompressor getDirectDecompressor( DirectCompressionType codec) {
+ return getDecompressor(codec);
+ }
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+ ByteBufferPoolShim pool
+ ) throws IOException {
+ return ZeroCopyShims.getZeroCopyReader(in, pool);
+ }
+
+ @Override
+ public long padStreamToBlock(OutputStream output,
+ long padding) throws IOException {
+ return HadoopShimsPre2_3.padStream(output, padding);
+ }
+
+ @Override
+ public KeyProvider getKeyProvider(Configuration conf) {
+ // not supported
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/af29f038/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java
----------------------------------------------------------------------
diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java
index a210353..4617b11 100644
--- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java
+++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java
@@ -18,96 +18,161 @@
package org.apache.orc.impl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderExtension;
+import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
-import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
-import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
+import org.apache.orc.EncryptionAlgorithm;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.security.Key;
+import java.util.List;
/**
- * Shims for versions of Hadoop less than 2.7
+ * Shims for versions of Hadoop less than 2.7.
+ *
+ * Adds support for:
+ * <ul>
+ * <li>Crypto</li>
+ * </ul>
*/
public class HadoopShimsPre2_7 implements HadoopShims {
- static class SnappyDirectDecompressWrapper implements DirectDecompressor {
- private final SnappyDirectDecompressor root;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HadoopShimsPre2_7.class);
+
+
+ public DirectDecompressor getDirectDecompressor( DirectCompressionType codec) {
+ return HadoopShimsPre2_6.getDecompressor(codec);
+ }
- SnappyDirectDecompressWrapper(SnappyDirectDecompressor root) {
- this.root = root;
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+ ByteBufferPoolShim pool
+ ) throws IOException {
+ return ZeroCopyShims.getZeroCopyReader(in, pool);
+ }
+
+ @Override
+ public long padStreamToBlock(OutputStream output,
+ long padding) throws IOException {
+ return HadoopShimsPre2_3.padStream(output, padding);
+ }
+
+ static String buildKeyVersionName(KeyMetadata key) {
+ return key.getKeyName() + "@" + key.getVersion();
+ }
+
+ /**
+ * Shim implementation for Hadoop's KeyProvider API that lets applications get
+ * access to encryption keys.
+ */
+ static class KeyProviderImpl implements KeyProvider {
+ private final KeyProviderCryptoExtension provider;
+
+ KeyProviderImpl(Configuration conf) throws IOException {
+ List<org.apache.hadoop.crypto.key.KeyProvider> result =
+ KeyProviderFactory.getProviders(conf);
+ if (result.size() != 1) {
+ throw new IllegalArgumentException("Can't get KeyProvider for ORC" +
+ " encryption. Got " + result.size() + " results.");
+ }
+ provider = (KeyProviderCryptoExtension) result.get(0);
}
- public void decompress(ByteBuffer input, ByteBuffer output) throws IOException {
- root.decompress(input, output);
+ @Override
+ public KeyMetadata getCurrentKeyVersion(String keyName) throws IOException {
+ return new KeyMetadataImpl(keyName, provider.getMetadata(keyName));
}
@Override
- public void reset() {
- root.reset();
+ public KeyMetadata getKeyVersion(String keyName, int version,
+ EncryptionAlgorithm algorithm) {
+ return new KeyMetadataImpl(keyName, version, algorithm);
}
@Override
- public void end() {
- root.end();
+ public Key getLocalKey(KeyMetadata key, byte[] iv) throws IOException {
+ EncryptionAlgorithm algorithm = key.getAlgorithm();
+ KeyProviderCryptoExtension.EncryptedKeyVersion encryptedKey =
+ KeyProviderCryptoExtension.EncryptedKeyVersion.createForDecryption(
+ key.getKeyName(), buildKeyVersionName(key), iv,
+ algorithm.getZeroKey());
+ try {
+ KeyProviderCryptoExtension.KeyVersion decrypted =
+ provider.decryptEncryptedKey(encryptedKey);
+ return new SecretKeySpec(decrypted.getMaterial(),
+ algorithm.getAlgorithm());
+ } catch (GeneralSecurityException e) {
+ throw new IOException("Problem decrypting key " + key.getKeyName(), e);
+ }
}
}
- static class ZlibDirectDecompressWrapper implements DirectDecompressor {
- private final ZlibDecompressor.ZlibDirectDecompressor root;
+ static class KeyMetadataImpl implements KeyMetadata {
+ private final String keyName;
+ private final int version;
+ private final EncryptionAlgorithm algorithm;
- ZlibDirectDecompressWrapper(ZlibDecompressor.ZlibDirectDecompressor root) {
- this.root = root;
+ KeyMetadataImpl(String keyName, KeyProviderExtension.Metadata metadata) {
+ this.keyName = keyName;
+ version = metadata.getVersions() - 1;
+ algorithm = findAlgorithm(metadata);
}
- public void decompress(ByteBuffer input, ByteBuffer output) throws IOException {
- root.decompress(input, output);
+ KeyMetadataImpl(String keyName, int version, EncryptionAlgorithm algorithm){
+ this.keyName = keyName;
+ this.version = version;
+ this.algorithm = algorithm;
}
@Override
- public void reset() {
- root.reset();
+ public String getKeyName() {
+ return keyName;
}
@Override
- public void end() {
- root.end();
+ public EncryptionAlgorithm getAlgorithm() {
+ return algorithm;
}
- }
- static DirectDecompressor getDecompressor( DirectCompressionType codec) {
- switch (codec) {
- case ZLIB:
- return new ZlibDirectDecompressWrapper
- (new ZlibDecompressor.ZlibDirectDecompressor());
- case ZLIB_NOHEADER:
- return new ZlibDirectDecompressWrapper
- (new ZlibDecompressor.ZlibDirectDecompressor
- (ZlibDecompressor.CompressionHeader.NO_HEADER, 0));
- case SNAPPY:
- return new SnappyDirectDecompressWrapper
- (new SnappyDecompressor.SnappyDirectDecompressor());
- default:
- return null;
+ @Override
+ public int getVersion() {
+ return version;
}
- }
- public DirectDecompressor getDirectDecompressor( DirectCompressionType codec) {
- return getDecompressor(codec);
- }
-
- @Override
- public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
- ByteBufferPoolShim pool
- ) throws IOException {
- return ZeroCopyShims.getZeroCopyReader(in, pool);
+ /**
+ * Find the correct algorithm based on the key's metadata.
+ * @param meta the key's metadata
+ * @return the correct algorithm
+ */
+ static EncryptionAlgorithm findAlgorithm(KeyProviderCryptoExtension.Metadata meta) {
+ String cipher = meta.getCipher();
+ if (cipher.startsWith("AES/")) {
+ int bitLength = meta.getBitLength();
+ if (bitLength == 128) {
+ return EncryptionAlgorithm.AES_128;
+ } else {
+ if (bitLength != 256) {
+ LOG.info("ORC column encryption does not support " + bitLength +
+ " bit keys. Using 256 bits instead.");
+ }
+ return EncryptionAlgorithm.AES_256;
+ }
+ }
+ throw new IllegalArgumentException("ORC column encryption only supports" +
+ " AES and not " + cipher);
+ }
}
@Override
- public long padStreamToBlock(OutputStream output,
- long padding) throws IOException {
- return HadoopShimsPre2_3.padStream(output, padding);
+ public KeyProvider getKeyProvider(Configuration conf) throws IOException {
+ return new KeyProviderImpl(conf);
}
-
}
http://git-wip-us.apache.org/repos/asf/orc/blob/af29f038/java/shims/src/test/org/apache/orc/impl/TestHadoopShimsPre2_7.java
----------------------------------------------------------------------
diff --git a/java/shims/src/test/org/apache/orc/impl/TestHadoopShimsPre2_7.java b/java/shims/src/test/org/apache/orc/impl/TestHadoopShimsPre2_7.java
new file mode 100644
index 0000000..c6f2cc2
--- /dev/null
+++ b/java/shims/src/test/org/apache/orc/impl/TestHadoopShimsPre2_7.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
+import org.apache.orc.EncryptionAlgorithm;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.sql.Date;
+import java.util.HashMap;
+
+import static junit.framework.Assert.assertEquals;
+
+public class TestHadoopShimsPre2_7 {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFindingUnknownEncryption() throws Exception {
+ KeyProvider.Metadata meta = new KMSClientProvider.KMSMetadata(
+ "XXX/CTR/NoPadding", 128, "", new HashMap<String, String>(),
+ new Date(0), 1);
+ HadoopShimsPre2_7.KeyMetadataImpl.findAlgorithm(meta);
+ }
+
+ @Test
+ public void testFindingAesEncryption() throws Exception {
+ KeyProvider.Metadata meta = new KMSClientProvider.KMSMetadata(
+ "AES/CTR/NoPadding", 128, "", new HashMap<String, String>(),
+ new Date(0), 1);
+ assertEquals(EncryptionAlgorithm.AES_128,
+ HadoopShimsPre2_7.KeyMetadataImpl.findAlgorithm(meta));
+ meta = new KMSClientProvider.KMSMetadata(
+ "AES/CTR/NoPadding", 256, "", new HashMap<String, String>(),
+ new Date(0), 1);
+ assertEquals(EncryptionAlgorithm.AES_256,
+ HadoopShimsPre2_7.KeyMetadataImpl.findAlgorithm(meta));
+ meta = new KMSClientProvider.KMSMetadata(
+ "AES/CTR/NoPadding", 512, "", new HashMap<String, String>(),
+ new Date(0), 1);
+ assertEquals(EncryptionAlgorithm.AES_256,
+ HadoopShimsPre2_7.KeyMetadataImpl.findAlgorithm(meta));
+ }
+}