You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/08/08 17:32:08 UTC
hadoop git commit: HDFS-12134: libhdfs++: Add a synchronization
interface for the GSSAPI. Contributed by James Clampffer.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-8707 3117e2a87 -> 411a9f829
HDFS-12134: libhdfs++: Add a synchronization interface for the GSSAPI. Contributed by James Clampffer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/411a9f82
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/411a9f82
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/411a9f82
Branch: refs/heads/HDFS-8707
Commit: 411a9f829fe3399fa7fc952bb4bd597a4f0e5861
Parents: 3117e2a
Author: James Clampffer <ja...@hp.com>
Authored: Mon Aug 7 13:04:50 2017 -0400
Committer: James Clampffer <ja...@hp.com>
Committed: Mon Aug 7 13:04:50 2017 -0400
----------------------------------------------------------------------
.../native/libhdfspp/include/hdfspp/hdfspp.h | 1 +
.../native/libhdfspp/include/hdfspp/locks.h | 110 +++++++++
.../native/libhdfspp/include/hdfspp/status.h | 2 +
.../native/libhdfspp/lib/common/CMakeLists.txt | 2 +-
.../main/native/libhdfspp/lib/common/locks.cc | 100 +++++++++
.../main/native/libhdfspp/lib/common/status.cc | 11 +-
.../src/main/native/libhdfspp/lib/common/uri.cc | 1 +
.../libhdfspp/lib/rpc/cyrus_sasl_engine.cc | 100 +++++++--
.../native/libhdfspp/lib/rpc/gsasl_engine.cc | 110 ++++++---
.../native/libhdfspp/lib/rpc/sasl_protocol.cc | 2 -
.../main/native/libhdfspp/tests/CMakeLists.txt | 3 +
.../src/main/native/libhdfspp/tests/uri_test.cc | 1 -
.../native/libhdfspp/tests/user_lock_test.cc | 225 +++++++++++++++++++
13 files changed, 610 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
index 673455e..611da21 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
@@ -26,6 +26,7 @@
#include "hdfspp/fsinfo.h"
#include "hdfspp/content_summary.h"
#include "hdfspp/uri.h"
+#include "hdfspp/locks.h"
#include <functional>
#include <memory>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h
new file mode 100644
index 0000000..3dfeab4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_HDFS_LOCKS_H_
+#define COMMON_HDFS_LOCKS_H_
+
+#include <stdexcept>
+#include <string>
+#include <atomic>
+#include <mutex>
+#include <memory>
+
+namespace hdfs
+{
+
+//
+// Thrown by LockGuard to indicate that it was unable to acquire a mutex
+// what_str should contain info about what caused the failure
+//
+class LockFailure : public std::runtime_error {
+ public:
+ LockFailure(const char *what_str) : std::runtime_error(what_str) {};
+ LockFailure(const std::string& what_str) : std::runtime_error(what_str) {};
+};
+
+//
+// A pluggable mutex type to allow client code to share mutexes it may
+// already use to protect certain system resources. Certain shared
+// libraries have some procedures that aren't always implemented in a thread
+// safe manner. If libhdfs++ and the code linking it depend on the same
+// library this provides a mechanism to coordinate safe access.
+//
+// Interface provided is intended to be similar to std::mutex. If the lock
+// can't be aquired it may throw LockFailure from the lock method. If lock
+// does fail libhdfs++ is expected fail as cleanly as possible e.g.
+// FileSystem::Mkdirs might return a MutexError but a subsequent call may be
+// successful.
+//
+class Mutex {
+ public:
+ virtual ~Mutex() {};
+ virtual void lock() = 0;
+ virtual void unlock() = 0;
+ virtual std::string str() = 0;
+};
+
+//
+// LockGuard works in a similar manner to std::lock_guard: it locks the mutex
+// in the constructor and unlocks it in the destructor.
+// Failure to acquire the mutex in the constructor will result in throwing a
+// LockFailure exception.
+//
+class LockGuard {
+ public:
+ LockGuard(Mutex *m);
+ ~LockGuard();
+ private:
+ Mutex *_mtx;
+};
+
+//
+// Manage instances of hdfs::Mutex that are intended to be global to the
+// process.
+//
+// LockManager's InitLocks method provides a mechanism for the calling
+// application to share its own implementations of hdfs::Mutex. It must be
+// called prior to instantiating any FileSystem objects and can only be
+// called once. If a lock is not provided a default mutex type wrapping
+// std::mutex is used as a default.
+//
+
+class LockManager {
+ public:
+ // Initializes with a default set of C++11 style mutexes
+ static bool InitLocks(Mutex *gssapi);
+ static Mutex *getGssapiMutex();
+
+ // Tests only, implementation may no-op on release builds.
+ // Reset _finalized to false and set all Mutex* members to default values.
+ static void TEST_reset_manager();
+ static Mutex *TEST_get_default_mutex();
+ private:
+ // Used only in tests.
+ static Mutex *TEST_default_mutex;
+ // Use to synchronize calls into GSSAPI/Kerberos libs
+ static Mutex *gssapiMtx;
+
+ // Prevent InitLocks from being called more than once
+ // Allows all locks to be set a single time atomically
+ static std::mutex _state_lock;
+ static bool _finalized;
+};
+
+} // end namespace hdfs
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
index d0922ae..718e530 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
@@ -50,6 +50,7 @@ class Status {
static Status PathNotFound(const char *msg);
static Status InvalidOffset(const char *msg);
static Status PathIsNotDirectory(const char *msg);
+ static Status MutexError(const char *msg);
// success
bool ok() const { return code_ == 0; }
@@ -79,6 +80,7 @@ class Status {
kNotADirectory = static_cast<unsigned>(std::errc::not_a_directory),
kFileAlreadyExists = static_cast<unsigned>(std::errc::file_exists),
kPathIsNotEmptyDirectory = static_cast<unsigned>(std::errc::directory_not_empty),
+ kBusy = static_cast<unsigned>(std::errc::device_or_resource_busy),
// non-errc codes start at 256
kException = 256,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
index b0b721a..15e65c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -19,6 +19,6 @@ if(NEED_LINK_DL)
set(LIB_DL dl)
endif()
-add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc)
+add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc locks.cc)
add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
target_link_libraries(common ${LIB_DL})
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc
new file mode 100644
index 0000000..30dcb44
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfspp/locks.h"
+
+#include <mutex>
+
+
+namespace hdfs {
+
+LockGuard::LockGuard(Mutex *m) : _mtx(m) {
+ if(!m) {
+ throw LockFailure("LockGuard passed invalid (null) Mutex pointer");
+ }
+ _mtx->lock();
+}
+
+LockGuard::~LockGuard() {
+ if(_mtx) {
+ _mtx->unlock();
+ }
+}
+
+
+// Basic mutexes to use as default. Just a wrapper around C++11 std::mutex.
+class DefaultMutex : public Mutex {
+ public:
+ DefaultMutex() {}
+
+ void lock() override {
+ // Could throw in here if the implementation couldn't lock for some reason.
+ _mtx.lock();
+ }
+
+ void unlock() override {
+ _mtx.unlock();
+ }
+
+ std::string str() override {
+ return "DefaultMutex";
+ }
+ private:
+ std::mutex _mtx;
+};
+
+DefaultMutex defaultTestMutex;
+DefaultMutex defaultGssapiMutex;
+
+// LockManager static var instantiation
+Mutex *LockManager::TEST_default_mutex = &defaultTestMutex;
+Mutex *LockManager::gssapiMtx = &defaultGssapiMutex;
+std::mutex LockManager::_state_lock;
+bool LockManager::_finalized = false;
+
+bool LockManager::InitLocks(Mutex *gssapi) {
+ std::lock_guard<std::mutex> guard(_state_lock);
+
+ // You get once shot to set this - swapping the locks
+ // out while in use gets risky. It can still be done by
+ // using the Mutex as a proxy object if one understands
+ // the implied risk of doing so.
+ if(_finalized)
+ return false;
+
+ gssapiMtx = gssapi;
+ _finalized = true;
+ return true;
+}
+
+Mutex *LockManager::getGssapiMutex() {
+ std::lock_guard<std::mutex> guard(_state_lock);
+ return gssapiMtx;
+}
+
+Mutex *LockManager::TEST_get_default_mutex() {
+ return TEST_default_mutex;
+}
+
+void LockManager::TEST_reset_manager() {
+ _finalized = false;
+ // user still responsible for cleanup
+ gssapiMtx = &defaultGssapiMutex;
+}
+
+} // end namepace hdfs
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
index 5903553..4c5c7be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
@@ -177,7 +177,16 @@ std::string Status::ToString() const {
}
bool Status::notWorthRetry() const {
- return noRetryExceptions.find(code_) != noRetryExceptions.end();
+ return noRetryExceptions.find(code_) != noRetryExceptions.end();
+}
+
+Status Status::MutexError(const char *msg) {
+ std::string formatted = "MutexError";
+ if(msg) {
+ formatted += ": ";
+ formatted += msg;
+ }
+ return Status(kBusy/*try_lock failure errno*/, msg);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
index 2213f8b..9e9319b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
@@ -244,6 +244,7 @@ URI URI::parse_from_string(const std::string &str)
///////////////////////////////////////////////////////////////////////////////
URI::URI() : _port(-1) {}
+
URI::Query::Query(const std::string& k, const std::string& v) : key(k), value(v) {}
std::string URI::str(bool encoded_output) const
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
index 69b2267..5c96ede 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+#include "hdfspp/locks.h"
+
#include <sys/types.h>
#include "sasl/sasl.h"
#include "sasl/saslutil.h"
@@ -31,6 +33,9 @@
namespace hdfs {
+static Mutex *getSaslMutex() {
+ return LockManager::getGssapiMutex();
+}
// Forward decls of sasl callback functions
typedef int (*sasl_callback_ft)(void);
@@ -111,23 +116,30 @@ Status CySaslEngine::SaslError( int rc) {
* Cyrus SASL ENGINE
*/
- CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr)
- {
- // Create an array of callbacks that embed a pointer to this
- // so we can call methods of the engine
- per_connection_callbacks_ = {
- { SASL_CB_USER, (sasl_callback_ft) & get_name, this}, // userid for authZ
- { SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for authT
- { SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi realm
- // { SASL_CB_PASS, (sasl_callback_ft)&getsecret, this
- { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL}
- };
- }
+CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr)
+{
+ // Create an array of callbacks that embed a pointer to this
+ // so we can call methods of the engine
+ per_connection_callbacks_ = {
+ { SASL_CB_USER, (sasl_callback_ft) & get_name, this}, // userid for authZ
+ { SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for authT
+ { SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi realm
+ // { SASL_CB_PASS, (sasl_callback_ft)&getsecret, this
+ { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL}
+ };
+}
+// Cleanup of last resort. Call Finish to allow a safer check on disposal
CySaslEngine::~CySaslEngine()
{
+
if (conn_) {
+ try {
+ LockGuard saslGuard(getSaslMutex());
sasl_dispose( &conn_); // undo sasl_client_new()
+ } catch (const LockFailure& e) {
+ LOG_ERROR(kRPC, << "Unable to dispose of SASL context due to " << e.what());
+ }
}
} // destructor
@@ -146,8 +158,15 @@ Status CySaslEngine::InitCyrusSasl()
const char * fqdn = chosen_mech_.serverid.c_str();
const char * proto = chosen_mech_.protocol.c_str();
- rc = sasl_client_new(proto, fqdn, NULL, NULL, &per_connection_callbacks_[0], 0, &conn_);
- if (rc != SASL_OK) return SaslError(rc);
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ rc = sasl_client_new(proto, fqdn, NULL, NULL, &per_connection_callbacks_[0], 0, &conn_);
+ if (rc != SASL_OK) {
+ return SaslError(rc);
+ }
+ } catch (const LockFailure& e) {
+ return Status::MutexError("mutex that guards sasl_client_new unable to lock");
+ }
return Status::OK();
} // cysasl_new()
@@ -176,8 +195,15 @@ CySaslEngine::Start()
const char * chosen_mech;
std::string token;
- rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), &client_interact,
- (const char **) &buf, &buflen, &chosen_mech);
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), &client_interact,
+ (const char **) &buf, &buflen, &chosen_mech);
+ } catch (const LockFailure& e) {
+ state_ = kFailure;
+ return std::make_pair( Status::MutexError("mutex that guards sasl_client_new unable to lock"), "" );
+ }
+
switch (rc) {
case SASL_OK: state_ = kSuccess;
@@ -192,6 +218,7 @@ CySaslEngine::Start()
// Cyrus will free this buffer when the connection is shut down
token = std::string( buf, buflen);
return std::make_pair( Status::OK(), token);
+
} // start() method
std::pair<Status, std::string> CySaslEngine::Step(const std::string data)
@@ -203,9 +230,15 @@ std::pair<Status, std::string> CySaslEngine::Step(const std::string data)
if (state_ != kWaitingForData)
LOG_WARN(kRPC, << "CySaslEngine::step when state is " << state_);
- int rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact,
- (const char **) &output, &outlen);
-
+ int rc = 0;
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact,
+ (const char **) &output, &outlen);
+ } catch (const LockFailure& e) {
+ state_ = kFailure;
+ return std::make_pair( Status::MutexError("mutex that guards sasl_client_new unable to lock"), "" );
+ }
// right now, state_ == kWaitingForData,
// so update state_, to reflect _step()'s result:
switch (rc) {
@@ -224,8 +257,13 @@ Status CySaslEngine::Finish()
LOG_WARN(kRPC, << "CySaslEngine::finish when state is " << state_);
if (conn_ != nullptr) {
+ try {
+ LockGuard saslGuard(getSaslMutex());
sasl_dispose( &conn_);
conn_ = NULL;
+ } catch (const LockFailure& e) {
+ return Status::MutexError("mutex that guards sasl_dispose unable to lock");
+ }
}
return Status::OK();
@@ -234,6 +272,8 @@ Status CySaslEngine::Finish()
//////////////////////////////////////////////////
// Internal callbacks, for sasl_init_client(). //
// Mostly lifted from cyrus' sample_client.c . //
+// Implicitly called in a context that already //
+// holds the SASL/GSSAPI lock. //
//////////////////////////////////////////////////
static int
@@ -388,14 +428,26 @@ const sasl_callback_t per_process_callbacks[] = {
CyrusPerProcessData::CyrusPerProcessData()
{
- int init_rc = sasl_client_init(per_process_callbacks);
- init_status_ = make_status(init_rc);
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ int init_rc = sasl_client_init(per_process_callbacks);
+ init_status_ = make_status(init_rc);
+ } catch (const LockFailure& e) {
+ init_status_ = Status::MutexError("mutex protecting process-wide sasl_client_init unable to lock");
+ }
}
CyrusPerProcessData::~CyrusPerProcessData()
{
// Undo sasl_client_init())
- sasl_done();
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ sasl_done();
+ } catch (const LockFailure& e) {
+ // Not can be done at this point, but the process is most likely shutting down anyway.
+ LOG_ERROR(kRPC, << "mutex protecting process-wide sasl_done unable to lock");
+ }
+
}
Status CyrusPerProcessData::Init()
@@ -405,6 +457,10 @@ Status CyrusPerProcessData::Init()
CyrusPerProcessData & CyrusPerProcessData::GetInstance()
{
+ // Meyer's singleton, thread safe and lazily initialized in C++11
+ //
+ // Must be lazily initialized to allow client code to plug in a GSSAPI mutex
+ // implementation.
static CyrusPerProcessData per_process_data;
return per_process_data;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
index 8286bac..7705c81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
@@ -16,18 +16,26 @@
* limitations under the License.
*/
+#include "hdfspp/locks.h"
+
#include <sstream>
#include <gsasl.h>
#include "sasl_engine.h"
#include "gsasl_engine.h"
#include "common/logging.h"
+
namespace hdfs {
+
/*****************************************************************************
* GSASL UTILITY FUNCTIONS
*/
+static Mutex *getSaslMutex() {
+ return LockManager::getGssapiMutex();
+}
+
static Status rc_to_status(int rc)
{
if (rc == GSASL_OK) {
@@ -70,32 +78,45 @@ std::pair<Status, std::string> base64_encode(const std::string & in) {
GSaslEngine::~GSaslEngine()
{
- if (session_ != nullptr) {
+ // These should already be called in this->Finish
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ if (session_ != nullptr) {
gsasl_finish(session_);
- }
+ }
- if (ctx_ != nullptr) {
+ if (ctx_ != nullptr) {
gsasl_done(ctx_);
+ }
+ } catch (const LockFailure& e) {
+ if(session_ || ctx_) {
+ LOG_ERROR(kRPC, << "GSaslEngine::~GSaslEngine@" << this << " unable to dispose of gsasl state: " << e.what());
+ }
}
}
Status GSaslEngine::gsasl_new() {
- int status = GSASL_OK;
-
- if (ctx_) return Status::OK();
-
- status = gsasl_init( & ctx_);
-
- switch ( status) {
- case GSASL_OK:
- return Status::OK();
- case GSASL_MALLOC_ERROR:
- LOG_WARN(kRPC, << "GSaslEngine: Out of memory.");
- return Status::Error("SaslEngine: Out of memory.");
- default:
- LOG_WARN(kRPC, << "GSaslEngine: Unexpected error." << status);
- return Status::Error("SaslEngine: Unexpected error.");
- }
+ int status = GSASL_OK;
+
+ if (ctx_) return Status::OK();
+
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ status = gsasl_init( & ctx_);
+ } catch (const LockFailure& e) {
+ return Status::MutexError("Mutex that guards gsasl_init unable to lock");
+ }
+
+ switch ( status) {
+ case GSASL_OK:
+ return Status::OK();
+ case GSASL_MALLOC_ERROR:
+ LOG_WARN(kRPC, << "GSaslEngine: Out of memory.");
+ return Status::Error("SaslEngine: Out of memory.");
+ default:
+ LOG_WARN(kRPC, << "GSaslEngine: Unexpected error." << status);
+ return Status::Error("SaslEngine: Unexpected error.");
+ }
} // gsasl_new()
std::pair<Status, std::string>
@@ -107,12 +128,22 @@ GSaslEngine::Start()
this->gsasl_new();
/* Create new authentication session. */
- rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_);
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_);
+ } catch (const LockFailure& e) {
+ state_ = kErrorState;
+ return std::make_pair(Status::MutexError("Mutex that guards gsasl_client_start unable to lock"), "");
+ }
if (rc != GSASL_OK) {
state_ = kErrorState;
return std::make_pair( rc_to_status( rc), std::string(""));
}
- init_kerberos();
+ Status init_status = init_kerberos();
+ if(!init_status.ok()) {
+ state_ = kErrorState;
+ return std::make_pair(init_status, "");
+ }
state_ = kWaitingForData;
@@ -124,12 +155,17 @@ GSaslEngine::Start()
Status GSaslEngine::init_kerberos() {
//TODO: check that we have a principal
-
- gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str());
- gsasl_property_set(session_, GSASL_HOSTNAME, chosen_mech_.serverid.c_str());
- gsasl_property_set(session_, GSASL_SERVICE, chosen_mech_.protocol.c_str());
- return Status::OK();
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ // these don't return anything that indicates failure
+ gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str());
+ gsasl_property_set(session_, GSASL_HOSTNAME, chosen_mech_.serverid.c_str());
+ gsasl_property_set(session_, GSASL_SERVICE, chosen_mech_.protocol.c_str());
+ } catch (const LockFailure& e) {
+ return Status::MutexError("Mutex that guards gsasl_property_set in GSaslEngine::init_kerberos unable to lock");
}
+ return Status::OK();
+}
std::pair<Status, std::string> GSaslEngine::Step(const std::string data) {
if (state_ != kWaitingForData)
@@ -137,8 +173,16 @@ std::pair<Status, std::string> GSaslEngine::Step(const std::string data) {
char * output = NULL;
size_t outputSize;
- int rc = gsasl_step(session_, data.c_str(), data.size(), &output,
+
+ int rc = 0;
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ rc = gsasl_step(session_, data.c_str(), data.size(), &output,
&outputSize);
+ } catch (const LockFailure& e) {
+ state_ = kFailure;
+ return std::make_pair(Status::MutexError("Mutex that guards gsasl_client_start unable to lock"), "");
+ }
if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) {
std::string retval(output, output ? outputSize : 0);
@@ -166,16 +210,20 @@ Status GSaslEngine::Finish()
if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState )
LOG_WARN(kRPC, << "GSaslEngine::finish when state is " << state_);
- if (session_ != nullptr) {
+ try {
+ LockGuard saslGuard(getSaslMutex());
+ if (session_ != nullptr) {
gsasl_finish(session_);
session_ = NULL;
- }
+ }
- if (ctx_ != nullptr) {
+ if (ctx_ != nullptr) {
gsasl_done(ctx_);
ctx_ = nullptr;
+ }
+ } catch (const LockFailure& e) {
+ return Status::MutexError("Mutex that guards sasl state cleanup in GSaslEngine::Finish unable to lock");
}
-
return Status::OK();
} // finish() method
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
index ad8191b..0957ea3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
@@ -38,8 +38,6 @@ namespace hdfs {
using namespace hadoop::common;
using namespace google::protobuf;
-template <class T>
-using optional = std::experimental::optional<T>;
/*****
* Threading model: all entry points need to acquire the sasl_lock before accessing
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
index 395fad5..0b4581e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -116,6 +116,9 @@ add_executable(hdfs_ioservice_test hdfs_ioservice_test.cc)
target_link_libraries(hdfs_ioservice_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(hdfs_ioservice hdfs_ioservice_test)
+add_executable(user_lock_test user_lock_test.cc)
+target_link_libraries(user_lock_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
+add_memcheck_test(user_lock user_lock_test)
#
#
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc
index 78f1a58..97f0afd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc
@@ -23,7 +23,6 @@ using ::testing::_;
using namespace hdfs;
-
URI expect_uri_throw(const char *uri) {
bool threw = false;
std::string what_msg;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc
new file mode 100644
index 0000000..6df47b2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hdfspp/locks.h>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <iostream>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+using namespace hdfs;
+
+// try_lock will always return false, unlock will always throw because it
+// can never be locked.
+class CantLockMutex : public Mutex {
+ public:
+ void lock() override {
+ throw LockFailure("This mutex cannot be locked");
+ }
+ void unlock() override {
+ throw LockFailure("Unlock");
+ }
+ std::string str() override {
+ return "CantLockMutex";
+ }
+};
+
+TEST(UserLockTest, DefaultMutexBasics) {
+ Mutex *mtx = LockManager::TEST_get_default_mutex();
+
+ // lock and unlock twice to make sure unlock works
+ bool locked = false;
+ try {
+ mtx->lock();
+ locked = true;
+ } catch (...) {}
+ EXPECT_TRUE(locked);
+ mtx->unlock();
+
+ locked = false;
+ try {
+ mtx->lock();
+ locked = true;
+ } catch (...) {}
+ EXPECT_TRUE(locked);
+ mtx->unlock();
+
+ EXPECT_EQ(mtx->str(), "DefaultMutex");
+}
+
+
+// Make sure lock manager can only be initialized once unless test reset called
+TEST(UserLockTest, LockManager) {
+ std::unique_ptr<CantLockMutex> mtx(new CantLockMutex());
+ EXPECT_TRUE(mtx != nullptr);
+
+ // Check the default lock
+ Mutex *defaultGssapiMtx = LockManager::getGssapiMutex();
+ EXPECT_TRUE(defaultGssapiMtx != nullptr);
+
+ // Try a double init. Should not work
+ bool res = LockManager::InitLocks(mtx.get());
+ EXPECT_TRUE(res);
+
+ // Check pointer value
+ EXPECT_EQ(LockManager::getGssapiMutex(), mtx.get());
+
+ res = LockManager::InitLocks(mtx.get());
+ EXPECT_FALSE(res);
+
+ // Make sure test reset still works
+ LockManager::TEST_reset_manager();
+ res = LockManager::InitLocks(mtx.get());
+ EXPECT_TRUE(res);
+ LockManager::TEST_reset_manager();
+ EXPECT_EQ(LockManager::getGssapiMutex(), defaultGssapiMtx);
+}
+
+TEST(UserLockTest, CheckCantLockMutex) {
+ std::unique_ptr<CantLockMutex> mtx(new CantLockMutex());
+ EXPECT_TRUE(mtx != nullptr);
+
+ bool locked = false;
+ try {
+ mtx->lock();
+ } catch (...) {}
+ EXPECT_FALSE(locked);
+
+ bool threw_on_unlock = false;
+ try {
+ mtx->unlock();
+ } catch (const LockFailure& e) {
+ threw_on_unlock = true;
+ }
+ EXPECT_TRUE(threw_on_unlock);
+
+ EXPECT_EQ("CantLockMutex", mtx->str());
+}
+
+TEST(UserLockTest, LockGuardBasics) {
+ Mutex *goodMtx = LockManager::TEST_get_default_mutex();
+ CantLockMutex badMtx;
+
+ // lock/unlock a few times to increase chances of UB if lock is misused
+ for(int i=0;i<10;i++) {
+ bool caught_exception = false;
+ try {
+ LockGuard guard(goodMtx);
+ // now have a scoped lock
+ } catch (const LockFailure& e) {
+ caught_exception = true;
+ }
+ EXPECT_FALSE(caught_exception);
+ }
+
+ // still do a few times, but expect it to blow up each time
+ for(int i=0;i<10;i++) {
+ bool caught_exception = false;
+ try {
+ LockGuard guard(&badMtx);
+ // now have a scoped lock
+ } catch (const LockFailure& e) {
+ caught_exception = true;
+ }
+ EXPECT_TRUE(caught_exception);
+ }
+
+}
+
+struct Incrementer {
+ int64_t& _val;
+ int64_t _iters;
+ Mutex *_mtx;
+ Incrementer(int64_t &val, int64_t iters, Mutex *m)
+ : _val(val), _iters(iters), _mtx(m) {}
+ void operator()(){
+ for(int64_t i=0; i<_iters; i++) {
+ LockGuard valguard(_mtx);
+ _val += 1;
+ }
+ }
+};
+
+struct Decrementer {
+ int64_t& _val;
+ int64_t _iters;
+ Mutex *_mtx;
+ Decrementer(int64_t &val, int64_t iters, Mutex *m)
+ : _val(val), _iters(iters), _mtx(m) {}
+ void operator()(){
+ for(int64_t i=0; i<_iters; i++) {
+ LockGuard valguard(_mtx);
+ _val -= 1;
+ }
+ }
+};
+
+TEST(UserLockTest, LockGuardConcurrency) {
+ Mutex *mtx = LockManager::TEST_get_default_mutex();
+
+ // Prove that these actually mutate the value
+ int64_t test_value = 0;
+ Incrementer inc(test_value, 1000, mtx);
+ inc();
+ EXPECT_EQ(test_value, 1000);
+
+ Decrementer dec(test_value, 1000, mtx);
+ dec();
+ EXPECT_EQ(test_value, 0);
+
+ std::vector<std::thread> workers;
+ std::vector<Incrementer> incrementers;
+ std::vector<Decrementer> decrementors;
+
+ const int delta = 1024 * 1024;
+ const int threads = 2 * 6;
+ EXPECT_EQ(threads % 2, 0);
+
+ // a bunch of threads race to increment and decrement the value
+ // if all goes well the operations balance out and the value is unchanged
+ for(int i=0; i < threads; i++) {
+ if(i%2 == 0) {
+ incrementers.emplace_back(test_value, delta, mtx);
+ workers.emplace_back(incrementers.back());
+ } else {
+ decrementors.emplace_back(test_value, delta, mtx);
+ workers.emplace_back(decrementors.back());
+ }
+ }
+
+ // join, everything should balance to 0
+ for(std::thread& thread : workers) {
+ thread.join();
+ }
+ EXPECT_EQ(test_value, 0);
+}
+
+
+int main(int argc, char *argv[]) {
+
+ // The following line must be executed to initialize Google Mock
+ // (and Google Test) before running the tests.
+ ::testing::InitGoogleMock(&argc, argv);
+ int res = RUN_ALL_TESTS();
+ return res;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org