You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2017/07/28 06:25:35 UTC
incubator-hawq git commit: HAWQ-1506. Fix multi-append bug of write a
encryption zone
Repository: incubator-hawq
Updated Branches:
refs/heads/master 54a9af323 -> 2662bebd1
HAWQ-1506. Fix multi-append bug of write a encryption zone
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2662bebd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2662bebd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2662bebd
Branch: refs/heads/master
Commit: 2662bebd163069f5742ff0b236768cd559089f28
Parents: 54a9af3
Author: interma <in...@outlook.com>
Authored: Tue Jul 25 16:50:25 2017 +0800
Committer: rlei <rl...@pivotal.io>
Committed: Fri Jul 28 14:25:38 2017 +0800
----------------------------------------------------------------------
depends/libhdfs3/mock/MockCryptoCodec.h | 5 +-
depends/libhdfs3/src/client/CryptoCodec.cpp | 319 ++++++++++---------
depends/libhdfs3/src/client/CryptoCodec.h | 96 +++---
.../libhdfs3/src/client/FileEncryptionInfo.h | 2 +-
depends/libhdfs3/src/client/HttpClient.cpp | 2 +
.../libhdfs3/src/client/OutputStreamImpl.cpp | 32 +-
.../libhdfs3/test/function/TestCInterface.cpp | 160 +++++++++-
.../libhdfs3/test/function/TestKmsClient.cpp | 1 -
.../libhdfs3/test/unit/UnitTestCryptoCodec.cpp | 16 +-
.../libhdfs3/test/unit/UnitTestOutputStream.cpp | 2 +-
10 files changed, 416 insertions(+), 219 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/mock/MockCryptoCodec.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/mock/MockCryptoCodec.h b/depends/libhdfs3/mock/MockCryptoCodec.h
index 4d23e11..a9a220e 100644
--- a/depends/libhdfs3/mock/MockCryptoCodec.h
+++ b/depends/libhdfs3/mock/MockCryptoCodec.h
@@ -30,8 +30,9 @@
class MockCryptoCodec: public Hdfs::CryptoCodec {
public:
MockCryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp, int32_t bufSize) : CryptoCodec(encryptionInfo, kcp, bufSize) {}
- MOCK_METHOD2(encode, std::string(const char * buffer,int64_t size));
- MOCK_METHOD2(decode, std::string(const char * buffer,int64_t size));
+
+ MOCK_METHOD2(init, int(CryptoMethod crypto_method, int64_t stream_offset));
+ MOCK_METHOD2(cipher_wrap, std::string(const char * buffer,int64_t size));
};
#endif /* _HDFS_LIBHDFS3_MOCK_CRYPTOCODEC_H_ */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/CryptoCodec.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/CryptoCodec.cpp b/depends/libhdfs3/src/client/CryptoCodec.cpp
index 6ba1b74..0ca2d16 100644
--- a/depends/libhdfs3/src/client/CryptoCodec.cpp
+++ b/depends/libhdfs3/src/client/CryptoCodec.cpp
@@ -25,154 +25,181 @@
using namespace Hdfs::Internal;
-namespace Hdfs {
-
-/**
- * Construct a CryptoCodec instance.
- * @param encryptionInfo the encryption info of file.
- * @param kcp a KmsClientProvider instance to get key from kms server.
- * @param bufSize crypto buffer size.
- */
-CryptoCodec::CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp, int32_t bufSize) : encryptionInfo(encryptionInfo), kcp(kcp), bufSize(bufSize)
-{
-
- /* Init global status. */
- ERR_load_crypto_strings();
- OpenSSL_add_all_algorithms();
- OPENSSL_config(NULL);
-
- /* Create cipher context. */
- encryptCtx = EVP_CIPHER_CTX_new();
- cipher = NULL;
-
-}
-
-/**
- * Destroy a CryptoCodec instance.
- */
-CryptoCodec::~CryptoCodec()
-{
- if (encryptCtx)
- EVP_CIPHER_CTX_free(encryptCtx);
-}
-
-/**
- * Get decrypted key from kms.
- */
-std::string CryptoCodec::getDecryptedKeyFromKms()
-{
- ptree map = kcp->decryptEncryptedKey(*encryptionInfo);
- std::string key;
- try {
- key = map.get < std::string > ("material");
- } catch (...) {
- THROW(HdfsIOException, "CryptoCodec : Can not get key from kms.");
- }
-
- int rem = key.length() % 4;
- if (rem) {
- rem = 4 - rem;
- while (rem != 0) {
- key = key + "=";
- rem--;
- }
- }
-
- std::replace(key.begin(), key.end(), '-', '+');
- std::replace(key.begin(), key.end(), '_', '/');
-
- LOG(INFO, "CryptoCodec : getDecryptedKeyFromKms material is :%s", key.c_str());
-
- key = KmsClientProvider::base64Decode(key);
- return key;
-
-
-}
-
-/**
- * Common encode/decode buffer method.
- * @param buffer the buffer to be encode/decode.
- * @param size the size of buffer.
- * @param enc true is for encode, false is for decode.
- * @return return the encode/decode buffer.
- */
-std::string CryptoCodec::endecInternal(const char * buffer, int64_t size, bool enc)
-{
- std::string key = encryptionInfo->getKey();
- std::string iv = encryptionInfo->getIv();
- LOG(INFO,
- "CryptoCodec : endecInternal info. key:%s, iv:%s, buffer:%s, size:%d, is_encode:%d.",
- key.c_str(), iv.c_str(), buffer, size, enc);
-
- /* Get decrypted key from KMS */
- key = getDecryptedKeyFromKms();
-
- /* Select cipher method based on the key length. */
- if (key.length() == KEY_LENGTH_256) {
- cipher = EVP_aes_256_ctr();
- } else if (key.length() == KEY_LENGTH_128) {
- cipher = EVP_aes_128_ctr();
- } else {
- THROW(InvalidParameter, "CryptoCodec : Invalid key length.");
- }
-
- /* Init cipher context with cipher method, encrypted key and IV from KMS. */
- int encode = enc ? 1 : 0;
- if (!EVP_CipherInit_ex(encryptCtx, cipher, NULL,
- (const unsigned char *) key.c_str(),
- (const unsigned char *) iv.c_str(), encode)) {
- LOG(WARNING, "EVP_CipherInit_ex failed");
- }
- LOG(DEBUG3, "EVP_CipherInit_ex successfully");
- EVP_CIPHER_CTX_set_padding(encryptCtx, 0);
-
- /* Encode/decode buffer within cipher context. */
- std::string result;
- result.resize(size);
- int offset = 0;
- int remaining = size;
- int len = 0;
- /* If the encode/decode buffer size larger than crypto buffer size, encode/decode buffer one by one. */
- while (remaining > bufSize) {
- if (!EVP_CipherUpdate(encryptCtx, (unsigned char *) &result[offset],
- &len, (const unsigned char *) buffer + offset, bufSize)) {
- std::string err = ERR_lib_error_string(ERR_get_error());
- THROW(HdfsIOException, "CryptoCodec : Cannot encrypt AES data %s",
- err.c_str());
- }
- offset += len;
- remaining -= len;
- LOG(DEBUG3,
- "CryptoCodec : EVP_CipherUpdate successfully, result:%s, len:%d",
- result.c_str(), len);
- }
- if (remaining) {
- if (!EVP_CipherUpdate(encryptCtx, (unsigned char *) &result[offset],
- &len, (const unsigned char *) buffer + offset, remaining)) {
- std::string err = ERR_lib_error_string(ERR_get_error());
- THROW(HdfsIOException, "CryptoCodec : Cannot encrypt AES data %s",
- err.c_str());
- }
- }
-
- return result;
-}
-/**
- * Encode buffer.
- */
-std::string CryptoCodec::encode(const char * buffer, int64_t size)
-{
- return endecInternal(buffer, size, true);
-}
+namespace Hdfs {
-/**
- * Decode buffer.
- */
-std::string CryptoCodec::decode(const char * buffer, int64_t size)
-{
- return endecInternal(buffer, size, false);
-}
+ //copy from java HDFS code
+ std::string CryptoCodec::calculateIV(const std::string& initIV, unsigned long counter) {
+ char IV[initIV.length()];
+
+ int i = initIV.length(); // IV length
+ int j = 0; // counter bytes index
+ unsigned int sum = 0;
+ while (i-- > 0) {
+ // (sum >>> Byte.SIZE) is the carry for addition
+ sum = (initIV[i] & 0xff) + (sum >> 8);
+ if (j++ < 8) { // Big-endian, and long is 8 bytes length
+ sum += (char) counter & 0xff;
+ counter >>= 8;
+ }
+ IV[i] = (char) sum;
+ }
+
+ return std::string(IV, initIV.length());
+ }
+
+ CryptoCodec::CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp, int32_t bufSize) :
+ encryptionInfo(encryptionInfo), kcp(kcp), bufSize(bufSize)
+ {
+
+ // Init global status
+ ERR_load_crypto_strings();
+ OpenSSL_add_all_algorithms();
+ OPENSSL_config(NULL);
+
+ // Create cipher context
+ cipherCtx = EVP_CIPHER_CTX_new();
+ cipher = NULL;
+
+ padding = 0;
+ counter = 0;
+ is_init = false;
+ }
+
+ CryptoCodec::~CryptoCodec()
+ {
+ if (cipherCtx)
+ EVP_CIPHER_CTX_free(cipherCtx);
+ }
+
+ std::string CryptoCodec::getDecryptedKeyFromKms()
+ {
+ ptree map = kcp->decryptEncryptedKey(*encryptionInfo);
+ std::string key;
+ try {
+ key = map.get < std::string > ("material");
+ } catch (...) {
+ THROW(HdfsIOException, "CryptoCodec : Can not get key from kms.");
+ }
+
+ int rem = key.length() % 4;
+ if (rem) {
+ rem = 4 - rem;
+ while (rem != 0) {
+ key = key + "=";
+ rem--;
+ }
+ }
+
+ std::replace(key.begin(), key.end(), '-', '+');
+ std::replace(key.begin(), key.end(), '_', '/');
+
+ LOG(INFO, "CryptoCodec : getDecryptedKeyFromKms material is :%s", key.c_str());
+
+ key = KmsClientProvider::base64Decode(key);
+ return key;
+ }
+
+ int CryptoCodec::init(CryptoMethod crypto_method, int64_t stream_offset) {
+ //check already init
+ if (is_init)
+ return 0;
+
+ // Get decrypted key from KMS
+ std::string key = getDecryptedKeyFromKms();
+
+ // Select cipher method based on the key length
+ uint64_t AlgorithmBlockSize = key.length();
+ if (AlgorithmBlockSize == KEY_LENGTH_256) {
+ cipher = EVP_aes_256_ctr();
+ } else if (AlgorithmBlockSize == KEY_LENGTH_128) {
+ cipher = EVP_aes_128_ctr();
+ } else {
+ LOG(WARNING, "CryptoCodec : Invalid key length.");
+ return -1;
+ }
+
+ //calculate new IV when appending a existed file
+ std::string iv = encryptionInfo->getIv();
+ if (stream_offset > 0) {
+ counter = stream_offset / AlgorithmBlockSize;
+ padding = stream_offset % AlgorithmBlockSize;
+ iv = this->calculateIV(iv, counter);
+ }
+
+ //judge encrypt/decrypt
+ int enc = 0;
+ method = crypto_method;
+ if (method == CryptoMethod::ENCRYPT)
+ enc = 1;
+
+ // Init cipher context with cipher method
+ if (!EVP_CipherInit_ex(cipherCtx, cipher, NULL,
+ (const unsigned char *) key.c_str(), (const unsigned char *) iv.c_str(),
+ enc)) {
+ LOG(WARNING, "EVP_CipherInit_ex failed");
+ return -1;
+ }
+
+ //AES/CTR/NoPadding
+ EVP_CIPHER_CTX_set_padding(cipherCtx, 0);
+
+ LOG(INFO, "CryptoCodec init success, key_length:%llu, is_encode:%d", AlgorithmBlockSize, enc);
+ is_init = true;
+ return 1;
+ }
+
+ std::string CryptoCodec::cipher_wrap(const char * buffer, int64_t size) {
+ if (!is_init)
+ THROW(InvalidParameter, "CryptoCodec isn't init");
+
+ int offset = 0;
+ int remaining = size;
+ int len = 0;
+ int ret = 0;
+
+ std::string in_buf(buffer,size);
+ std::string out_buf(size, 0);
+ //set necessary padding when appending a existed file
+ if (padding > 0) {
+ in_buf.insert(0, padding, 0);
+ out_buf.resize(padding+size);
+ remaining += padding;
+ }
+
+ // If the encode/decode buffer size larger than crypto buffer size, encode/decode buffer one by one
+ while (remaining > bufSize) {
+ ret = EVP_CipherUpdate(cipherCtx, (unsigned char *) &out_buf[offset], &len,
+ (const unsigned char *)in_buf.data() + offset, bufSize);
+
+ if (!ret) {
+ std::string err = ERR_lib_error_string(ERR_get_error());
+ THROW(HdfsIOException, "CryptoCodec : cipher_wrap AES data failed:%s, crypto_method:%d", err.c_str(), method);
+ }
+ offset += len;
+ remaining -= len;
+ LOG(DEBUG3, "CryptoCodec : EVP_CipherUpdate successfully, len:%d", len);
+ }
+
+ if (remaining) {
+ ret = EVP_CipherUpdate(cipherCtx, (unsigned char *) &out_buf[offset], &len,
+ (const unsigned char *) in_buf.data() + offset, remaining);
+
+ if (!ret) {
+ std::string err = ERR_lib_error_string(ERR_get_error());
+ THROW(HdfsIOException, "CryptoCodec : cipher_wrap AES data failed:%s, crypto_method:%d", err.c_str(), method);
+ }
+
+ }
+
+ //cut off padding when necessary
+ if (padding > 0) {
+ out_buf.erase(0, padding);
+ padding = 0;
+ }
+
+ return out_buf;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/CryptoCodec.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/CryptoCodec.h b/depends/libhdfs3/src/client/CryptoCodec.h
index e45599b..cae7d3b 100644
--- a/depends/libhdfs3/src/client/CryptoCodec.h
+++ b/depends/libhdfs3/src/client/CryptoCodec.h
@@ -35,54 +35,68 @@
namespace Hdfs {
-class CryptoCodec {
-public:
- /**
- * Construct a CryptoCodec instance.
- * @param encryptionInfo the encryption info of file.
- * @param kcp a KmsClientProvider instance to get key from kms server.
- * @param bufSize crypto buffer size.
- */
- CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp, int32_t bufSize);
+ enum CryptoMethod {
+ DECRYPT = 0,
+ ENCRYPT = 1
+ };
- /**
- * Destroy a CryptoCodec instance.
- */
- virtual ~CryptoCodec();
+ class CryptoCodec {
+ public:
+ /**
+ * Construct a CryptoCodec instance.
+ * @param encryptionInfo the encryption info of file.
+ * @param kcp a KmsClientProvider instance to get key from kms server.
+ * @param bufSize crypto buffer size.
+ */
+ CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp, int32_t bufSize);
- /**
- * Encode buffer.
- */
- virtual std::string encode(const char * buffer, int64_t size);
+ /**
+ * Destroy a CryptoCodec instance.
+ */
+ virtual ~CryptoCodec();
- /**
- * Decode buffer.
- */
- virtual std::string decode(const char * buffer, int64_t size);
+ /**
+ * encrypt/decrypt(depends on init()) buffer data
+ * @param buffer
+ * @param size
+ * @return encrypt/decrypt result string
+ */
+ virtual std::string cipher_wrap(const char * buffer, int64_t size);
-private:
+ /**
+ * init CryptoCodec
+ * @param method CryptoMethod
+ * @param stream_offset 0 when open a new file; file_lenght when append a existed file
+ * @return 1 success; 0 no need(already inited); -1 failed
+ */
+ virtual int init(CryptoMethod crypto_method, int64_t stream_offset = 0);
- /**
- * Common encode/decode buffer method.
- * @param buffer the buffer to be encode/decode.
- * @param size the size of buffer.
- * @param enc true is for encode, false is for decode.
- * @return return the encode/decode buffer.
- */
- std::string endecInternal(const char *buffer, int64_t size, bool enc);
+ private:
- /**
- * Get decrypted key from kms.
- */
- std::string getDecryptedKeyFromKms();
+ /**
+ * Get decrypted key from kms.
+ */
+ std::string getDecryptedKeyFromKms();
- shared_ptr<KmsClientProvider> kcp;
- FileEncryptionInfo *encryptionInfo;
- EVP_CIPHER_CTX *encryptCtx;
- EVP_CIPHER_CTX *decryptCtx;
- const EVP_CIPHER *cipher;
- int32_t bufSize;
-};
+ /**
+ * calculate new IV for appending a existed file
+ * @param initIV
+ * @param counter
+ * @return new IV string
+ */
+ std::string calculateIV(const std::string& initIV, unsigned long counter);
+
+ shared_ptr<KmsClientProvider> kcp;
+ FileEncryptionInfo* encryptionInfo;
+ EVP_CIPHER_CTX* cipherCtx;
+ const EVP_CIPHER* cipher;
+ CryptoMethod method;
+
+ bool is_init;
+ int32_t bufSize;
+ int64_t padding;
+ int64_t counter;
+ };
}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/FileEncryptionInfo.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/FileEncryptionInfo.h b/depends/libhdfs3/src/client/FileEncryptionInfo.h
index 32ead6c..7584c02 100644
--- a/depends/libhdfs3/src/client/FileEncryptionInfo.h
+++ b/depends/libhdfs3/src/client/FileEncryptionInfo.h
@@ -81,8 +81,8 @@ public:
}
private:
- int suite;
int cryptoProtocolVersion;
+ int suite;
std::string key;
std::string iv;
std::string keyName;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/HttpClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/HttpClient.cpp b/depends/libhdfs3/src/client/HttpClient.cpp
index 562f599..09a74a6 100644
--- a/depends/libhdfs3/src/client/HttpClient.cpp
+++ b/depends/libhdfs3/src/client/HttpClient.cpp
@@ -339,6 +339,8 @@ std::string HttpClient::escape(const std::string &data) {
} else {
LOG(WARNING, "HttpClient : Curl in escape method is NULL");
}
+ std::string empty;
+ return empty;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/OutputStreamImpl.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/OutputStreamImpl.cpp b/depends/libhdfs3/src/client/OutputStreamImpl.cpp
index 4c5f869..d987295 100644
--- a/depends/libhdfs3/src/client/OutputStreamImpl.cpp
+++ b/depends/libhdfs3/src/client/OutputStreamImpl.cpp
@@ -255,9 +255,18 @@ void OutputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char *
FileEncryptionInfo *fileEnInfo = fileStatus.getFileEncryption();
if (fileStatus.isFileEncrypted()) {
if (cryptoCodec == NULL) {
- auth = shared_ptr<RpcAuth> (new RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getKmsMethod())));
- kcp = shared_ptr<KmsClientProvider> (new KmsClientProvider(auth, conf));
- cryptoCodec = shared_ptr<CryptoCodec> (new CryptoCodec(fileEnInfo, kcp, conf->getCryptoBufferSize()));
+ auth = shared_ptr<RpcAuth> (
+ new RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getKmsMethod())));
+ kcp = shared_ptr<KmsClientProvider> (
+ new KmsClientProvider(auth, conf));
+ cryptoCodec = shared_ptr<CryptoCodec> (
+ new CryptoCodec(fileEnInfo, kcp, conf->getCryptoBufferSize()));
+
+ int64_t file_length = fileStatus.getLength();
+ int ret = cryptoCodec->init(CryptoMethod::ENCRYPT, file_length);
+ if (ret < 0) {
+ THROW(HdfsIOException, "init CryptoCodec failed, file:%s", this->path.c_str());
+ }
}
}
initAppend();
@@ -278,13 +287,18 @@ void OutputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char *
if (fileStatus.isFileEncrypted()) {
if (cryptoCodec == NULL) {
auth = shared_ptr<RpcAuth>(
- new RpcAuth(fs->getUserInfo(),
- RpcAuth::ParseMethod(conf->getKmsMethod())));
+ new RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getKmsMethod())));
kcp = shared_ptr<KmsClientProvider>(
new KmsClientProvider(auth, conf));
cryptoCodec = shared_ptr<CryptoCodec>(
- new CryptoCodec(fileEnInfo, kcp,
- conf->getCryptoBufferSize()));
+ new CryptoCodec(fileEnInfo, kcp, conf->getCryptoBufferSize()));
+
+ int64_t file_length = fileStatus.getLength();
+ assert(file_length == 0);
+ int ret = cryptoCodec->init(CryptoMethod::ENCRYPT, file_length);
+ if (ret < 0) {
+ THROW(HdfsIOException, "init CryptoCodec failed, file:%s", this->path.c_str());
+ }
}
}
closed = false;
@@ -317,8 +331,10 @@ void OutputStreamImpl::append(const char * buf, int64_t size) {
void OutputStreamImpl::appendInternal(const char * buf, int64_t size) {
int64_t todo = size;
std::string bufEncode;
+
if (fileStatus.isFileEncrypted()) {
- bufEncode = cryptoCodec->encode(buf, size);
+ //encrypt buf
+ bufEncode = cryptoCodec->cipher_wrap(buf, size);
buf = bufEncode.c_str();
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/function/TestCInterface.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/test/function/TestCInterface.cpp b/depends/libhdfs3/test/function/TestCInterface.cpp
index 56fe07e..40f6a1b 100644
--- a/depends/libhdfs3/test/function/TestCInterface.cpp
+++ b/depends/libhdfs3/test/function/TestCInterface.cpp
@@ -134,6 +134,20 @@ static void bufferMD5(const char* strFilePath, int size, char* result) {
}
}
+static void diff_file2buffer(const char *file_path, const char *buf) {
+ std::cout << "diff file: " << file_path << std::endl;
+ char resultFile[33] = { 0 };
+ char resultBuffer[33] = { 0 };
+
+ fileMD5(file_path, resultFile);
+ std::cout << "resultFile is " << resultFile << std::endl;
+
+ bufferMD5(buf, strlen(buf), resultBuffer);
+ std::cout << "resultBuf is " << resultBuffer << std::endl;
+
+ ASSERT_STREQ(resultFile, resultBuffer);
+}
+
bool CheckFileContent(hdfsFS fs, const char * path, int64_t len, size_t offset) {
hdfsFile in = hdfsOpenFile(fs, path, O_RDONLY, 0, 0, 0);
@@ -246,7 +260,6 @@ TEST(TestCInterfaceConnect, TestConnect_Success) {
TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) {
hdfsFS fs = NULL;
hdfsEncryptionZoneInfo * enInfo = NULL;
- char * uri = NULL;
setenv("LIBHDFS3_CONF", "function-test.xml", 1);
struct hdfsBuilder * bld = hdfsNewBuilder();
assert(bld != NULL);
@@ -256,7 +269,7 @@ TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) {
system("hadoop fs -rmr /TDE");
system("hadoop key create keytde");
system("hadoop fs -mkdir /TDE");
- ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, "/TDE", "keytde"));
+ ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, "/TDE", "keytde"));
enInfo = hdfsGetEZForPath(fs, "/TDE");
ASSERT_TRUE(enInfo != NULL);
EXPECT_TRUE(enInfo->mKeyName != NULL);
@@ -274,11 +287,10 @@ TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) {
system(tdeKey.c_str());
system(mkTde.c_str());
ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, tde.c_str(), key.c_str()));
- }
- hdfsEncryptionZoneInfo * enZoneInfos = NULL;
+ }
int num = 0;
hdfsListEncryptionZones(fs, &num);
- EXPECT_EQ(num, 12);
+ EXPECT_EQ(num, 12);
ASSERT_EQ(hdfsDisconnect(fs), 0);
hdfsFreeBuilder(bld);
}
@@ -286,7 +298,6 @@ TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) {
TEST(TestCInterfaceTDE, TestAppendWithTDE_Success) {
hdfsFS fs = NULL;
hdfsEncryptionZoneInfo * enInfo = NULL;
- char * uri = NULL;
setenv("LIBHDFS3_CONF", "function-test.xml", 1);
struct hdfsBuilder * bld = hdfsNewBuilder();
assert(bld != NULL);
@@ -327,7 +338,6 @@ TEST(TestCInterfaceTDE, TestAppendWithTDE_Success) {
TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) {
hdfsFS fs = NULL;
hdfsEncryptionZoneInfo * enInfo = NULL;
- char * uri = NULL;
setenv("LIBHDFS3_CONF", "function-test.xml", 1);
struct hdfsBuilder * bld = hdfsNewBuilder();
assert(bld != NULL);
@@ -348,6 +358,7 @@ TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) {
const char *tdefile = "/TDE/testfile";
ASSERT_TRUE(CreateFile(fs, tdefile, 0, 0));
+ //case1: append
int size = 1024 * 32;
size_t offset = 0;
hdfsFile out;
@@ -371,15 +382,10 @@ TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) {
} while (0);
system("rm -rf ./testfile");
system("hadoop fs -get /TDE/testfile ./");
- char resultFile[33] = { 0 };
- fileMD5("./testfile", resultFile);
- std::cout << "resultFile is " << resultFile << std::endl;
- char resultBuffer[33] = { 0 };
- LOG(INFO, "buffer is %s", &buffer[0]);
- bufferMD5(&buffer[0], size, resultBuffer);
- std::cout << "result is " << resultBuffer << std::endl;
- ASSERT_STREQ(resultFile, resultBuffer);
+ diff_file2buffer("testfile", &buffer[0]);
system("rm ./testfile");
+
+ //case5: a large file (> 64M) TODO
system("hadoop fs -rmr /TDE");
system("hadoop key delete keytde4append -f");
ASSERT_EQ(hdfsDisconnect(fs), 0);
@@ -387,6 +393,106 @@ TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) {
}
+TEST(TestCInterfaceTDE, TestAppendMultiTimes_Success) {
+ hdfsFS fs = NULL;
+ hdfsEncryptionZoneInfo * enInfo = NULL;
+ setenv("LIBHDFS3_CONF", "function-test.xml", 1);
+ struct hdfsBuilder * bld = hdfsNewBuilder();
+ assert(bld != NULL);
+ hdfsBuilderSetNameNode(bld, "default");
+ fs = hdfsBuilderConnect(bld);
+ ASSERT_TRUE(fs != NULL);
+
+ //creake iey and encryption zone
+ system("hadoop fs -rmr /TDE");
+ system("hadoop key delete keytde4append -f");
+ system("hadoop key create keytde4append");
+ system("hadoop fs -mkdir /TDE");
+ ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, "/TDE", "keytde4append"));
+ enInfo = hdfsGetEZForPath(fs, "/TDE");
+ ASSERT_TRUE(enInfo != NULL);
+ EXPECT_TRUE(enInfo->mKeyName != NULL);
+ hdfsFreeEncryptionZoneInfo(enInfo, 1);
+
+ hdfsFile out;
+ //case2: close and append
+ const char *tdefile2 = "/TDE/testfile2";
+ char out_data2[] = "12345678";
+ ASSERT_TRUE(CreateFile(fs, tdefile2, 0, 0));
+ out = hdfsOpenFile(fs, tdefile2, O_WRONLY | O_APPEND, 0, 0, 0);
+ hdfsWrite(fs, out, out_data2, 4);
+ hdfsCloseFile(fs, out);
+
+ out = hdfsOpenFile(fs, tdefile2, O_WRONLY | O_APPEND, 0, 0, 0);
+ hdfsWrite(fs, out, out_data2+4, 4);
+ hdfsCloseFile(fs, out);
+ system("rm ./testfile2");
+ system("hadoop fs -get /TDE/testfile2 ./");
+ diff_file2buffer("testfile2", out_data2);
+
+ //case3: multi-append
+ const char *tdefile3 = "/TDE/testfile3";
+ char out_data3[] = "1234567812345678123456781234567812345678123456781234567812345678"; //16*4byte
+ ASSERT_TRUE(CreateFile(fs, tdefile3, 0, 0));
+ out = hdfsOpenFile(fs, tdefile3, O_WRONLY | O_APPEND, 0, 0, 0);
+ hdfsWrite(fs, out, out_data3, 5);
+ hdfsWrite(fs, out, out_data3+5, 28);
+ hdfsWrite(fs, out, out_data3+33, 15);
+ hdfsWrite(fs, out, out_data3+48, 16);
+ hdfsCloseFile(fs, out);
+ system("rm ./testfile3");
+ system("hadoop fs -get /TDE/testfile3 ./");
+ diff_file2buffer("testfile3", out_data3);
+
+
+ //case4: multi-append > bufsize(8k)
+ const char *tdefile4 = "/TDE/testfile4";
+ int data_size = 13*1024+1;
+ char *out_data4 = (char *)malloc(data_size);
+ Hdfs::FillBuffer(out_data4, data_size-1, 1024);
+ out_data4[data_size-1] = 0;
+ ASSERT_TRUE(CreateFile(fs, tdefile4, 0, 0));
+ out = hdfsOpenFile(fs, tdefile4, O_WRONLY | O_APPEND, 0, 0, 0);
+
+ int todo = 0;
+ int offset = 0;
+ todo = 9*1024-1;
+ while (todo > 0) {
+ int rc = 0;
+ if (0 > (rc = hdfsWrite(fs, out, out_data4+offset, todo))) {
+ break;
+ }
+ todo -= rc;
+ offset += rc;
+ }
+
+ todo = 4*1024+1;
+ while (todo > 0) {
+ int rc = 0;
+ if (0 > (rc = hdfsWrite(fs, out, out_data4+offset, todo))) {
+ break;
+ }
+ todo -= rc;
+ offset += rc;
+ }
+
+
+ ASSERT_EQ(data_size-1, offset);
+
+ hdfsCloseFile(fs, out);
+ system("rm ./testfile4");
+ system("hadoop fs -get /TDE/testfile4 ./");
+ diff_file2buffer("testfile4", out_data4);
+ free(out_data4);
+
+
+
+ system("hadoop fs -rmr /TDE");
+ system("hadoop key delete keytde4append -f");
+ ASSERT_EQ(hdfsDisconnect(fs), 0);
+ hdfsFreeBuilder(bld);
+}
+
TEST(TestErrorMessage, TestErrorMessage) {
EXPECT_NO_THROW(hdfsGetLastError());
hdfsChown(NULL, TEST_HDFS_PREFIX, NULL, NULL);
@@ -1773,3 +1879,27 @@ TEST_F(TestCInterface, TestGetHosts_Success) {
hdfsFreeHosts(hosts);
hdfsCloseFile(fs, out);
}
+
+// test concurrent write to a same file
+// expected:
+// At any point there can only be 1 writer.
+// This is enforced by requiring the writer to acquire leases.
+TEST_F(TestCInterface, TestConcurrentWrite_Failure) {
+ hdfsFS fs = NULL;
+ setenv("LIBHDFS3_CONF", "function-test.xml", 1);
+ struct hdfsBuilder * bld = hdfsNewBuilder();
+ assert(bld != NULL);
+ hdfsBuilderSetNameNode(bld, "default");
+ fs = hdfsBuilderConnect(bld);
+ ASSERT_TRUE(fs != NULL);
+
+ const char *file_path = BASE_DIR "/concurrent_write";
+ char buf[] = "1234";
+ hdfsFile fout1 = hdfsOpenFile(fs, file_path, O_WRONLY | O_APPEND, 0, 0, 0);
+ hdfsFile fout2 = hdfsOpenFile(fs, file_path, O_WRONLY | O_APPEND, 0, 0, 0);
+ ASSERT_TRUE(fout2 == NULL); //must failed
+ int rc = hdfsWrite(fs, fout1, buf, sizeof(buf)-1);
+ ASSERT_TRUE(rc > 0);
+ int retval = hdfsCloseFile(fs, fout1);
+ ASSERT_TRUE(retval == 0);
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/function/TestKmsClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/test/function/TestKmsClient.cpp b/depends/libhdfs3/test/function/TestKmsClient.cpp
index d997f88..21280de 100644
--- a/depends/libhdfs3/test/function/TestKmsClient.cpp
+++ b/depends/libhdfs3/test/function/TestKmsClient.cpp
@@ -186,7 +186,6 @@ TEST_F(TestKmsClient, DecryptEncryptedKeySuccess) {
TEST_F(TestKmsClient, CreateKeyFailediBadUrl) {
std::string keyName = "testcreatekeyfailname";
std::string cipher = "AES/CTR/NoPadding";
- int length = 128;
std::string material = "testCreateKey";
std::string url[4] = { "ftp:///http@localhost:16000/kms",
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp b/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp
index 36c67b1..92e9403 100644
--- a/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp
+++ b/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp
@@ -111,7 +111,8 @@ TEST_F(TestCryptoCodec, encode_Success) {
//char buf[1024] = "encode hello world";
char buf[1024];
- Hdfs::FillBuffer(buf, sizeof(buf), 2048);
+ Hdfs::FillBuffer(buf, sizeof(buf)-1, 2048);
+ buf[sizeof(buf)-1] = 0;
int32_t bufSize = 1024;
@@ -121,13 +122,20 @@ TEST_F(TestCryptoCodec, encode_Success) {
encryptionInfo.setKey(Key[i]);
shared_ptr<MockHttpClient> hc(new MockHttpClient());
kcp->setHttpClient(hc);
- CryptoCodec es(&encryptionInfo, kcp, bufSize);
+
EXPECT_CALL(*kcp, decryptEncryptedKey(_)).Times(2).WillRepeatedly(
Return(kcp->getEDKResult(encryptionInfo)));
- std::string encodeStr = es.encode(buf, strlen(buf));
+
+ CryptoCodec es(&encryptionInfo, kcp, bufSize);
+ es.init(CryptoMethod::ENCRYPT);
+ CryptoCodec ds(&encryptionInfo, kcp, bufSize);
+ ds.init(CryptoMethod::DECRYPT);
+
+
+ std::string encodeStr = es.cipher_wrap(buf, strlen(buf));
ASSERT_NE(0, memcmp(buf, encodeStr.c_str(), strlen(buf)));
- std::string decodeStr = es.decode(encodeStr.c_str(), strlen(buf));
+ std::string decodeStr = ds.cipher_wrap(encodeStr.c_str(), strlen(buf));
ASSERT_STREQ(decodeStr.c_str(), buf);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp b/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp
index b8b6a46..de36eac 100644
--- a/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp
+++ b/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp
@@ -406,7 +406,7 @@ TEST_F(TestOutputStream, appendEncryption_Success) {
EXPECT_CALL(*pipelineStub, close(_)).Times(2).WillOnce(Return(lastBlock)).WillOnce(Return(lastBlock));
EXPECT_CALL(*fs, fsync(_)).Times(2);
std::string bufferEn;
- EXPECT_CALL(*cryptoC, encode(_,_)).Times(1).WillOnce(Return(bufferEn));
+ EXPECT_CALL(*cryptoC, cipher_wrap(_,_)).Times(1).WillOnce(Return(bufferEn));
EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer)));
EXPECT_CALL(*pipelineStub, close(_)).Times(1).WillOnce(Return(lastBlock));
EXPECT_CALL(*fs, fsync(_)).Times(1);