You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2016/05/18 02:50:16 UTC
[03/51] [abbrv] [partial] incubator-hawq git commit: HAWQ-735. Import
thrift-0.9.3 into depends/thirdparty/thrift folder
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
new file mode 100644
index 0000000..d57e7ec
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#if USE_STD_THREAD
+
+#include <thrift/concurrency/StdThreadFactory.h>
+#include <thrift/concurrency/Exception.h>
+
+#include <cassert>
+
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/weak_ptr.hpp>
+#include <thread>
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+/**
+ * The C++11 thread class.
+ *
+ * Note that we use boost shared_ptr rather than std shared_ptrs here
+ * because the Thread/Runnable classes use those and we don't want to
+ * mix them.
+ *
+ * @version $Id:$
+ */
+class StdThread : public Thread, public boost::enable_shared_from_this<StdThread> {
+public:
+ enum STATE { uninitialized, starting, started, stopping, stopped };
+
+ static void threadMain(boost::shared_ptr<StdThread> thread);
+
+private:
+ std::unique_ptr<std::thread> thread_;
+ STATE state_;
+ bool detached_;
+
+public:
+ StdThread(bool detached, boost::shared_ptr<Runnable> runnable)
+ : state_(uninitialized), detached_(detached) {
+ this->Thread::runnable(runnable);
+ }
+
+ ~StdThread() {
+ if (!detached_) {
+ try {
+ join();
+ } catch (...) {
+ // We're really hosed.
+ }
+ }
+ }
+
+ void start() {
+ if (state_ != uninitialized) {
+ return;
+ }
+
+ boost::shared_ptr<StdThread> selfRef = shared_from_this();
+ state_ = starting;
+
+ thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef));
+
+ if (detached_)
+ thread_->detach();
+ }
+
+ void join() {
+ if (!detached_ && state_ != uninitialized) {
+ thread_->join();
+ }
+ }
+
+ Thread::id_t getId() { return thread_.get() ? thread_->get_id() : std::thread::id(); }
+
+ boost::shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
+
+ void runnable(boost::shared_ptr<Runnable> value) { Thread::runnable(value); }
+};
+
+void StdThread::threadMain(boost::shared_ptr<StdThread> thread) {
+ if (thread == NULL) {
+ return;
+ }
+
+ if (thread->state_ != starting) {
+ return;
+ }
+
+ thread->state_ = started;
+ thread->runnable()->run();
+
+ if (thread->state_ != stopping && thread->state_ != stopped) {
+ thread->state_ = stopping;
+ }
+
+ return;
+}
+
+/**
+ * std::thread factory implementation
+ */
+class StdThreadFactory::Impl {
+
+private:
+ bool detached_;
+
+public:
+ Impl(bool detached) : detached_(detached) {}
+
+ /**
+ * Creates a new std::thread to run the runnable object
+ *
+ * @param runnable A runnable object
+ */
+ boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const {
+ boost::shared_ptr<StdThread> result
+ = boost::shared_ptr<StdThread>(new StdThread(detached_, runnable));
+ runnable->thread(result);
+ return result;
+ }
+
+ bool isDetached() const { return detached_; }
+
+ void setDetached(bool value) { detached_ = value; }
+
+ Thread::id_t getCurrentThreadId() const { return std::this_thread::get_id(); }
+};
+
+StdThreadFactory::StdThreadFactory(bool detached) : impl_(new StdThreadFactory::Impl(detached)) {
+}
+
+boost::shared_ptr<Thread> StdThreadFactory::newThread(boost::shared_ptr<Runnable> runnable) const {
+ return impl_->newThread(runnable);
+}
+
+bool StdThreadFactory::isDetached() const {
+ return impl_->isDetached();
+}
+
+void StdThreadFactory::setDetached(bool value) {
+ impl_->setDetached(value);
+}
+
+Thread::id_t StdThreadFactory::getCurrentThreadId() const {
+ return impl_->getCurrentThreadId();
+}
+}
+}
+} // apache::thrift::concurrency
+
+#endif // USE_STD_THREAD
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
new file mode 100644
index 0000000..fb86bbf
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ 1
+
+#include <thrift/concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+/**
+ * A thread factory to create std::threads.
+ *
+ * @version $Id:$
+ */
+class StdThreadFactory : public ThreadFactory {
+
+public:
+ /**
+ * Std thread factory. All threads created by a factory are reference-counted
+ * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
+ * the Runnable tasks they host will be properly cleaned up once the last strong reference
+ * to both is given up.
+ *
+ * By default threads are not joinable.
+ */
+
+ StdThreadFactory(bool detached = true);
+
+ // From ThreadFactory;
+ boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
+
+ // From ThreadFactory;
+ Thread::id_t getCurrentThreadId() const;
+
+ /**
+ * Sets detached mode of threads
+ */
+ virtual void setDetached(bool detached);
+
+ /**
+ * Gets current detached mode
+ */
+ virtual bool isDetached() const;
+
+private:
+ class Impl;
+ boost::shared_ptr<Impl> impl_;
+};
+}
+}
+} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Thread.h
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Thread.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Thread.h
new file mode 100644
index 0000000..f5eb3a8
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Thread.h
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_THREAD_H_
+#define _THRIFT_CONCURRENCY_THREAD_H_ 1
+
+#include <stdint.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+
+#include <thrift/thrift-config.h>
+
+#if USE_BOOST_THREAD
+#include <boost/thread.hpp>
+#elif USE_STD_THREAD
+#include <thread>
+#else
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
+#endif
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+class Thread;
+
+/**
+ * Minimal runnable class. More or less analogous to java.lang.Runnable.
+ *
+ * @version $Id:$
+ */
+class Runnable {
+
+public:
+ virtual ~Runnable(){};
+ virtual void run() = 0;
+
+ /**
+ * Gets the thread object that is hosting this runnable object - can return
+ * an empty boost::shared pointer if no references remain on that thread object
+ */
+ virtual boost::shared_ptr<Thread> thread() { return thread_.lock(); }
+
+ /**
+ * Sets the thread that is executing this object. This is only meant for
+ * use by concrete implementations of Thread.
+ */
+ virtual void thread(boost::shared_ptr<Thread> value) { thread_ = value; }
+
+private:
+ boost::weak_ptr<Thread> thread_;
+};
+
+/**
+ * Minimal thread class. Returned by thread factory bound to a Runnable object
+ * and ready to start execution. More or less analogous to java.lang.Thread
+ * (minus all the thread group, priority, mode and other baggage, since that
+ * is difficult to abstract across platforms and is left for platform-specific
+ * ThreadFactory implemtations to deal with
+ *
+ * @see apache::thrift::concurrency::ThreadFactory)
+ */
+class Thread {
+
+public:
+#if USE_BOOST_THREAD
+ typedef boost::thread::id id_t;
+
+ static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); }
+ static inline id_t get_current() { return boost::this_thread::get_id(); }
+#elif USE_STD_THREAD
+ typedef std::thread::id id_t;
+
+ static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); }
+ static inline id_t get_current() { return std::this_thread::get_id(); }
+#else
+ typedef pthread_t id_t;
+
+ static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); }
+ static inline id_t get_current() { return pthread_self(); }
+#endif
+
+ virtual ~Thread(){};
+
+ /**
+ * Starts the thread. Does platform specific thread creation and
+ * configuration then invokes the run method of the Runnable object bound
+ * to this thread.
+ */
+ virtual void start() = 0;
+
+ /**
+ * Join this thread. Current thread blocks until this target thread
+ * completes.
+ */
+ virtual void join() = 0;
+
+ /**
+ * Gets the thread's platform-specific ID
+ */
+ virtual id_t getId() = 0;
+
+ /**
+ * Gets the runnable object this thread is hosting
+ */
+ virtual boost::shared_ptr<Runnable> runnable() const { return _runnable; }
+
+protected:
+ virtual void runnable(boost::shared_ptr<Runnable> value) { _runnable = value; }
+
+private:
+ boost::shared_ptr<Runnable> _runnable;
+};
+
+/**
+ * Factory to create platform-specific thread object and bind them to Runnable
+ * object for execution
+ */
+class ThreadFactory {
+
+public:
+ virtual ~ThreadFactory() {}
+ virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
+
+ /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
+ */
+
+ static const Thread::id_t unknown_thread_id;
+
+ virtual Thread::id_t getCurrentThreadId() const = 0;
+};
+}
+}
+} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_THREAD_H_
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
new file mode 100644
index 0000000..a2b44d4
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/concurrency/ThreadManager.h>
+#include <thrift/concurrency/Exception.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Util.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <assert.h>
+#include <queue>
+#include <set>
+
+#if defined(DEBUG)
+#include <iostream>
+#endif // defined(DEBUG)
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
+
+/**
+ * ThreadManager class
+ *
+ * This class manages a pool of threads. It uses a ThreadFactory to create
+ * threads. It never actually creates or destroys worker threads, rather
+ * it maintains statistics on number of idle threads, number of active threads,
+ * task backlog, and average wait and service times.
+ *
+ * @version $Id:$
+ */
+class ThreadManager::Impl : public ThreadManager {
+
+public:
+ Impl()
+ : workerCount_(0),
+ workerMaxCount_(0),
+ idleCount_(0),
+ pendingTaskCountMax_(0),
+ expiredCount_(0),
+ state_(ThreadManager::UNINITIALIZED),
+ monitor_(&mutex_),
+ maxMonitor_(&mutex_) {}
+
+ ~Impl() { stop(); }
+
+ void start();
+
+ void stop() { stopImpl(false); }
+
+ void join() { stopImpl(true); }
+
+ ThreadManager::STATE state() const { return state_; }
+
+ shared_ptr<ThreadFactory> threadFactory() const {
+ Synchronized s(monitor_);
+ return threadFactory_;
+ }
+
+ void threadFactory(shared_ptr<ThreadFactory> value) {
+ Synchronized s(monitor_);
+ threadFactory_ = value;
+ }
+
+ void addWorker(size_t value);
+
+ void removeWorker(size_t value);
+
+ size_t idleWorkerCount() const { return idleCount_; }
+
+ size_t workerCount() const {
+ Synchronized s(monitor_);
+ return workerCount_;
+ }
+
+ size_t pendingTaskCount() const {
+ Synchronized s(monitor_);
+ return tasks_.size();
+ }
+
+ size_t totalTaskCount() const {
+ Synchronized s(monitor_);
+ return tasks_.size() + workerCount_ - idleCount_;
+ }
+
+ size_t pendingTaskCountMax() const {
+ Synchronized s(monitor_);
+ return pendingTaskCountMax_;
+ }
+
+ size_t expiredTaskCount() {
+ Synchronized s(monitor_);
+ size_t result = expiredCount_;
+ expiredCount_ = 0;
+ return result;
+ }
+
+ void pendingTaskCountMax(const size_t value) {
+ Synchronized s(monitor_);
+ pendingTaskCountMax_ = value;
+ }
+
+ bool canSleep();
+
+ void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
+
+ void remove(shared_ptr<Runnable> task);
+
+ shared_ptr<Runnable> removeNextPending();
+
+ void removeExpiredTasks();
+
+ void setExpireCallback(ExpireCallback expireCallback);
+
+private:
+ void stopImpl(bool join);
+
+ size_t workerCount_;
+ size_t workerMaxCount_;
+ size_t idleCount_;
+ size_t pendingTaskCountMax_;
+ size_t expiredCount_;
+ ExpireCallback expireCallback_;
+
+ ThreadManager::STATE state_;
+ shared_ptr<ThreadFactory> threadFactory_;
+
+ friend class ThreadManager::Task;
+ std::queue<shared_ptr<Task> > tasks_;
+ Mutex mutex_;
+ Monitor monitor_;
+ Monitor maxMonitor_;
+ Monitor workerMonitor_;
+
+ friend class ThreadManager::Worker;
+ std::set<shared_ptr<Thread> > workers_;
+ std::set<shared_ptr<Thread> > deadWorkers_;
+ std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
+};
+
+class ThreadManager::Task : public Runnable {
+
+public:
+ enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
+
+ Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL)
+ : runnable_(runnable),
+ state_(WAITING),
+ expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
+
+ ~Task() {}
+
+ void run() {
+ if (state_ == EXECUTING) {
+ runnable_->run();
+ state_ = COMPLETE;
+ }
+ }
+
+ shared_ptr<Runnable> getRunnable() { return runnable_; }
+
+ int64_t getExpireTime() const { return expireTime_; }
+
+private:
+ shared_ptr<Runnable> runnable_;
+ friend class ThreadManager::Worker;
+ STATE state_;
+ int64_t expireTime_;
+};
+
+class ThreadManager::Worker : public Runnable {
+ enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
+
+public:
+ Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED), idle_(false) {}
+
+ ~Worker() {}
+
+private:
+ bool isActive() const {
+ return (manager_->workerCount_ <= manager_->workerMaxCount_)
+ || (manager_->state_ == JOINING && !manager_->tasks_.empty());
+ }
+
+public:
+ /**
+ * Worker entry point
+ *
+ * As long as worker thread is running, pull tasks off the task queue and
+ * execute.
+ */
+ void run() {
+ bool active = false;
+ bool notifyManager = false;
+
+ /**
+ * Increment worker semaphore and notify manager if worker count reached
+ * desired max
+ *
+ * Note: We have to release the monitor and acquire the workerMonitor
+ * since that is what the manager blocks on for worker add/remove
+ */
+ {
+ Synchronized s(manager_->monitor_);
+ active = manager_->workerCount_ < manager_->workerMaxCount_;
+ if (active) {
+ manager_->workerCount_++;
+ notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
+ }
+ }
+
+ if (notifyManager) {
+ Synchronized s(manager_->workerMonitor_);
+ manager_->workerMonitor_.notify();
+ notifyManager = false;
+ }
+
+ while (active) {
+ shared_ptr<ThreadManager::Task> task;
+
+ /**
+ * While holding manager monitor block for non-empty task queue (Also
+ * check that the thread hasn't been requested to stop). Once the queue
+ * is non-empty, dequeue a task, release monitor, and execute. If the
+ * worker max count has been decremented such that we exceed it, mark
+ * ourself inactive, decrement the worker count and notify the manager
+ * (technically we're notifying the next blocked thread but eventually
+ * the manager will see it.
+ */
+ {
+ Guard g(manager_->mutex_);
+ active = isActive();
+
+ while (active && manager_->tasks_.empty()) {
+ manager_->idleCount_++;
+ idle_ = true;
+ manager_->monitor_.wait();
+ active = isActive();
+ idle_ = false;
+ manager_->idleCount_--;
+ }
+
+ if (active) {
+ manager_->removeExpiredTasks();
+
+ if (!manager_->tasks_.empty()) {
+ task = manager_->tasks_.front();
+ manager_->tasks_.pop();
+ if (task->state_ == ThreadManager::Task::WAITING) {
+ task->state_ = ThreadManager::Task::EXECUTING;
+ }
+ }
+
+ /* If we have a pending task max and we just dropped below it, wakeup any
+ thread that might be blocked on add. */
+ if (manager_->pendingTaskCountMax_ != 0
+ && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
+ manager_->maxMonitor_.notify();
+ }
+ } else {
+ idle_ = true;
+ manager_->workerCount_--;
+ notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
+ }
+ }
+
+ if (task) {
+ if (task->state_ == ThreadManager::Task::EXECUTING) {
+ try {
+ task->run();
+ } catch (const std::exception& e) {
+ GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
+ } catch (...) {
+ GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
+ }
+ }
+ }
+ }
+
+ {
+ Synchronized s(manager_->workerMonitor_);
+ manager_->deadWorkers_.insert(this->thread());
+ if (notifyManager) {
+ manager_->workerMonitor_.notify();
+ }
+ }
+
+ return;
+ }
+
+private:
+ ThreadManager::Impl* manager_;
+ friend class ThreadManager::Impl;
+ STATE state_;
+ bool idle_;
+};
+
+void ThreadManager::Impl::addWorker(size_t value) {
+ std::set<shared_ptr<Thread> > newThreads;
+ for (size_t ix = 0; ix < value; ix++) {
+ shared_ptr<ThreadManager::Worker> worker
+ = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
+ newThreads.insert(threadFactory_->newThread(worker));
+ }
+
+ {
+ Synchronized s(monitor_);
+ workerMaxCount_ += value;
+ workers_.insert(newThreads.begin(), newThreads.end());
+ }
+
+ for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end();
+ ++ix) {
+ shared_ptr<ThreadManager::Worker> worker
+ = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
+ worker->state_ = ThreadManager::Worker::STARTING;
+ (*ix)->start();
+ idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
+ }
+
+ {
+ Synchronized s(workerMonitor_);
+ while (workerCount_ != workerMaxCount_) {
+ workerMonitor_.wait();
+ }
+ }
+}
+
+void ThreadManager::Impl::start() {
+
+ if (state_ == ThreadManager::STOPPED) {
+ return;
+ }
+
+ {
+ Synchronized s(monitor_);
+ if (state_ == ThreadManager::UNINITIALIZED) {
+ if (!threadFactory_) {
+ throw InvalidArgumentException();
+ }
+ state_ = ThreadManager::STARTED;
+ monitor_.notifyAll();
+ }
+
+ while (state_ == STARTING) {
+ monitor_.wait();
+ }
+ }
+}
+
+void ThreadManager::Impl::stopImpl(bool join) {
+ bool doStop = false;
+ if (state_ == ThreadManager::STOPPED) {
+ return;
+ }
+
+ {
+ Synchronized s(monitor_);
+ if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
+ && state_ != ThreadManager::STOPPED) {
+ doStop = true;
+ state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
+ }
+ }
+
+ if (doStop) {
+ removeWorker(workerCount_);
+ }
+
+ // XXX
+ // should be able to block here for transition to STOPPED since we're no
+ // using shared_ptrs
+
+ {
+ Synchronized s(monitor_);
+ state_ = ThreadManager::STOPPED;
+ }
+}
+
+void ThreadManager::Impl::removeWorker(size_t value) {
+ std::set<shared_ptr<Thread> > removedThreads;
+ {
+ Synchronized s(monitor_);
+ if (value > workerMaxCount_) {
+ throw InvalidArgumentException();
+ }
+
+ workerMaxCount_ -= value;
+
+ if (idleCount_ < value) {
+ for (size_t ix = 0; ix < idleCount_; ix++) {
+ monitor_.notify();
+ }
+ } else {
+ monitor_.notifyAll();
+ }
+ }
+
+ {
+ Synchronized s(workerMonitor_);
+
+ while (workerCount_ != workerMaxCount_) {
+ workerMonitor_.wait();
+ }
+
+ for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin();
+ ix != deadWorkers_.end();
+ ++ix) {
+ idMap_.erase((*ix)->getId());
+ workers_.erase(*ix);
+ }
+
+ deadWorkers_.clear();
+ }
+}
+
+bool ThreadManager::Impl::canSleep() {
+ const Thread::id_t id = threadFactory_->getCurrentThreadId();
+ return idMap_.find(id) == idMap_.end();
+}
+
+void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
+ Guard g(mutex_, timeout);
+
+ if (!g) {
+ throw TimedOutException();
+ }
+
+ if (state_ != ThreadManager::STARTED) {
+ throw IllegalStateException(
+ "ThreadManager::Impl::add ThreadManager "
+ "not started");
+ }
+
+ removeExpiredTasks();
+ if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
+ if (canSleep() && timeout >= 0) {
+ while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
+ // This is thread safe because the mutex is shared between monitors.
+ maxMonitor_.wait(timeout);
+ }
+ } else {
+ throw TooManyPendingTasksException();
+ }
+ }
+
+ tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
+
+ // If idle thread is available notify it, otherwise all worker threads are
+ // running and will get around to this task in time.
+ if (idleCount_ > 0) {
+ monitor_.notify();
+ }
+}
+
+void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
+ (void)task;
+ Synchronized s(monitor_);
+ if (state_ != ThreadManager::STARTED) {
+ throw IllegalStateException(
+ "ThreadManager::Impl::remove ThreadManager not "
+ "started");
+ }
+}
+
+boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
+ Guard g(mutex_);
+ if (state_ != ThreadManager::STARTED) {
+ throw IllegalStateException(
+ "ThreadManager::Impl::removeNextPending "
+ "ThreadManager not started");
+ }
+
+ if (tasks_.empty()) {
+ return boost::shared_ptr<Runnable>();
+ }
+
+ shared_ptr<ThreadManager::Task> task = tasks_.front();
+ tasks_.pop();
+
+ return task->getRunnable();
+}
+
+void ThreadManager::Impl::removeExpiredTasks() {
+ int64_t now = 0LL; // we won't ask for the time untile we need it
+
+ // note that this loop breaks at the first non-expiring task
+ while (!tasks_.empty()) {
+ shared_ptr<ThreadManager::Task> task = tasks_.front();
+ if (task->getExpireTime() == 0LL) {
+ break;
+ }
+ if (now == 0LL) {
+ now = Util::currentTime();
+ }
+ if (task->getExpireTime() > now) {
+ break;
+ }
+ if (expireCallback_) {
+ expireCallback_(task->getRunnable());
+ }
+ tasks_.pop();
+ expiredCount_++;
+ }
+}
+
+void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
+ expireCallback_ = expireCallback;
+}
+
+class SimpleThreadManager : public ThreadManager::Impl {
+
+public:
+ SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
+ : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
+
+ void start() {
+ ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
+ ThreadManager::Impl::start();
+ addWorker(workerCount_);
+ }
+
+private:
+ const size_t workerCount_;
+ const size_t pendingTaskCountMax_;
+ Monitor monitor_;
+};
+
+shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
+ return shared_ptr<ThreadManager>(new ThreadManager::Impl());
+}
+
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
+ size_t pendingTaskCountMax) {
+ return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
+}
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.h
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.h
new file mode 100644
index 0000000..2112845
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.h
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
+#define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1
+
+#include <boost/shared_ptr.hpp>
+#include <thrift/cxxfunctional.h>
+#include <sys/types.h>
+#include <thrift/concurrency/Thread.h>
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+/**
+ * Thread Pool Manager and related classes
+ *
+ * @version $Id:$
+ */
+class ThreadManager;
+
+/**
+ * ThreadManager class
+ *
+ * This class manages a pool of threads. It uses a ThreadFactory to create
+ * threads. It never actually creates or destroys worker threads, rather
+ * it maintains statistics on number of idle threads, number of active threads,
+ * task backlog, and average wait and service times and informs the PoolPolicy
+ * object bound to instances of this manager of interesting transitions. It is
+ * then up the PoolPolicy object to decide if the thread pool size needs to be
+ * adjusted and call this object addWorker and removeWorker methods to make
+ * changes.
+ *
+ * This design allows different policy implementations to use this code to
+ * handle basic worker thread management and worker task execution and focus on
+ * policy issues. The simplest policy, StaticPolicy, does nothing other than
+ * create a fixed number of threads.
+ */
+class ThreadManager {
+
+protected:
+ ThreadManager() {}
+
+public:
+ typedef apache::thrift::stdcxx::function<void(boost::shared_ptr<Runnable>)> ExpireCallback;
+
+ virtual ~ThreadManager() {}
+
+ /**
+ * Starts the thread manager. Verifies all attributes have been properly
+ * initialized, then allocates necessary resources to begin operation
+ */
+ virtual void start() = 0;
+
+ /**
+ * Stops the thread manager. Aborts all remaining unprocessed task, shuts
+ * down all created worker threads, and realeases all allocated resources.
+ * This method blocks for all worker threads to complete, thus it can
+ * potentially block forever if a worker thread is running a task that
+ * won't terminate.
+ */
+ virtual void stop() = 0;
+
+ /**
+ * Joins the thread manager. This is the same as stop, except that it will
+ * block until all the workers have finished their work. At that point
+ * the ThreadManager will transition into the STOPPED state.
+ */
+ virtual void join() = 0;
+
+ enum STATE { UNINITIALIZED, STARTING, STARTED, JOINING, STOPPING, STOPPED };
+
+ virtual STATE state() const = 0;
+
+ virtual boost::shared_ptr<ThreadFactory> threadFactory() const = 0;
+
+ virtual void threadFactory(boost::shared_ptr<ThreadFactory> value) = 0;
+
+ virtual void addWorker(size_t value = 1) = 0;
+
+ virtual void removeWorker(size_t value = 1) = 0;
+
+ /**
+ * Gets the current number of idle worker threads
+ */
+ virtual size_t idleWorkerCount() const = 0;
+
+ /**
+ * Gets the current number of total worker threads
+ */
+ virtual size_t workerCount() const = 0;
+
+ /**
+ * Gets the current number of pending tasks
+ */
+ virtual size_t pendingTaskCount() const = 0;
+
+ /**
+ * Gets the current number of pending and executing tasks
+ */
+ virtual size_t totalTaskCount() const = 0;
+
+ /**
+ * Gets the maximum pending task count. 0 indicates no maximum
+ */
+ virtual size_t pendingTaskCountMax() const = 0;
+
+ /**
+ * Gets the number of tasks which have been expired without being run.
+ */
+ virtual size_t expiredTaskCount() = 0;
+
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount()
+ * is greater than or equalt to pendingTaskCountMax(). If this method is called in the
+ * context of a ThreadManager worker thread it will throw a
+ * TooManyPendingTasksException
+ *
+ * @param task The task to queue for execution
+ *
+ * @param timeout Time to wait in milliseconds to add a task when a pending-task-count
+ * is specified. Specific cases:
+ * timeout = 0 : Wait forever to queue task.
+ * timeout = -1 : Return immediately if pending task count exceeds specified max
+ * @param expiration when nonzero, the number of milliseconds the task is valid
+ * to be run; if exceeded, the task will be dropped off the queue and not run.
+ *
+ * @throws TooManyPendingTasksException Pending task count exceeds max pending task count
+ */
+ virtual void add(boost::shared_ptr<Runnable> task,
+ int64_t timeout = 0LL,
+ int64_t expiration = 0LL) = 0;
+
+ /**
+ * Removes a pending task
+ */
+ virtual void remove(boost::shared_ptr<Runnable> task) = 0;
+
+ /**
+ * Remove the next pending task which would be run.
+ *
+ * @return the task removed.
+ */
+ virtual boost::shared_ptr<Runnable> removeNextPending() = 0;
+
+ /**
+ * Remove tasks from front of task queue that have expired.
+ */
+ virtual void removeExpiredTasks() = 0;
+
+ /**
+ * Set a callback to be called when a task is expired and not run.
+ *
+ * @param expireCallback a function called with the shared_ptr<Runnable> for
+ * the expired task.
+ */
+ virtual void setExpireCallback(ExpireCallback expireCallback) = 0;
+
+ static boost::shared_ptr<ThreadManager> newThreadManager();
+
+ /**
+ * Creates a simple thread manager the uses count number of worker threads and has
+ * a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit
+ * on pending tasks
+ */
+ static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count = 4,
+ size_t pendingTaskCountMax = 0);
+
+ class Task;
+
+ class Worker;
+
+ class Impl;
+};
+}
+}
+} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp
new file mode 100644
index 0000000..122d26e
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/concurrency/TimerManager.h>
+#include <thrift/concurrency/Exception.h>
+#include <thrift/concurrency/Util.h>
+
+#include <assert.h>
+#include <iostream>
+#include <set>
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+using boost::shared_ptr;
+
+/**
+ * TimerManager class
+ *
+ * @version $Id:$
+ */
+class TimerManager::Task : public Runnable {
+
+public:
+ enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
+
+ Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
+
+ ~Task() {}
+
+ void run() {
+ if (state_ == EXECUTING) {
+ runnable_->run();
+ state_ = COMPLETE;
+ }
+ }
+
+private:
+ shared_ptr<Runnable> runnable_;
+ friend class TimerManager::Dispatcher;
+ STATE state_;
+};
+
+class TimerManager::Dispatcher : public Runnable {
+
+public:
+ Dispatcher(TimerManager* manager) : manager_(manager) {}
+
+ ~Dispatcher() {}
+
+ /**
+ * Dispatcher entry point
+ *
+ * As long as dispatcher thread is running, pull tasks off the task taskMap_
+ * and execute.
+ */
+ void run() {
+ {
+ Synchronized s(manager_->monitor_);
+ if (manager_->state_ == TimerManager::STARTING) {
+ manager_->state_ = TimerManager::STARTED;
+ manager_->monitor_.notifyAll();
+ }
+ }
+
+ do {
+ std::set<shared_ptr<TimerManager::Task> > expiredTasks;
+ {
+ Synchronized s(manager_->monitor_);
+ task_iterator expiredTaskEnd;
+ int64_t now = Util::currentTime();
+ while (manager_->state_ == TimerManager::STARTED
+ && (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
+ == manager_->taskMap_.begin()) {
+ int64_t timeout = 0LL;
+ if (!manager_->taskMap_.empty()) {
+ timeout = manager_->taskMap_.begin()->first - now;
+ }
+ assert((timeout != 0 && manager_->taskCount_ > 0)
+ || (timeout == 0 && manager_->taskCount_ == 0));
+ try {
+ manager_->monitor_.wait(timeout);
+ } catch (TimedOutException&) {
+ }
+ now = Util::currentTime();
+ }
+
+ if (manager_->state_ == TimerManager::STARTED) {
+ for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
+ shared_ptr<TimerManager::Task> task = ix->second;
+ expiredTasks.insert(task);
+ if (task->state_ == TimerManager::Task::WAITING) {
+ task->state_ = TimerManager::Task::EXECUTING;
+ }
+ manager_->taskCount_--;
+ }
+ manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
+ }
+ }
+
+ for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin();
+ ix != expiredTasks.end();
+ ++ix) {
+ (*ix)->run();
+ }
+
+ } while (manager_->state_ == TimerManager::STARTED);
+
+ {
+ Synchronized s(manager_->monitor_);
+ if (manager_->state_ == TimerManager::STOPPING) {
+ manager_->state_ = TimerManager::STOPPED;
+ manager_->monitor_.notify();
+ }
+ }
+ return;
+ }
+
+private:
+ TimerManager* manager_;
+ friend class TimerManager;
+};
+
+#if defined(_MSC_VER)
+#pragma warning(push)
+#pragma warning(disable : 4355) // 'this' used in base member initializer list
+#endif
+
+TimerManager::TimerManager()
+ : taskCount_(0),
+ state_(TimerManager::UNINITIALIZED),
+ dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
+}
+
+#if defined(_MSC_VER)
+#pragma warning(pop)
+#endif
+
+TimerManager::~TimerManager() {
+
+ // If we haven't been explicitly stopped, do so now. We don't need to grab
+ // the monitor here, since stop already takes care of reentrancy.
+
+ if (state_ != STOPPED) {
+ try {
+ stop();
+ } catch (...) {
+ throw;
+ // uhoh
+ }
+ }
+}
+
+void TimerManager::start() {
+ bool doStart = false;
+ {
+ Synchronized s(monitor_);
+ if (!threadFactory_) {
+ throw InvalidArgumentException();
+ }
+ if (state_ == TimerManager::UNINITIALIZED) {
+ state_ = TimerManager::STARTING;
+ doStart = true;
+ }
+ }
+
+ if (doStart) {
+ dispatcherThread_ = threadFactory_->newThread(dispatcher_);
+ dispatcherThread_->start();
+ }
+
+ {
+ Synchronized s(monitor_);
+ while (state_ == TimerManager::STARTING) {
+ monitor_.wait();
+ }
+ assert(state_ != TimerManager::STARTING);
+ }
+}
+
+void TimerManager::stop() {
+ bool doStop = false;
+ {
+ Synchronized s(monitor_);
+ if (state_ == TimerManager::UNINITIALIZED) {
+ state_ = TimerManager::STOPPED;
+ } else if (state_ != STOPPING && state_ != STOPPED) {
+ doStop = true;
+ state_ = STOPPING;
+ monitor_.notifyAll();
+ }
+ while (state_ != STOPPED) {
+ monitor_.wait();
+ }
+ }
+
+ if (doStop) {
+ // Clean up any outstanding tasks
+ taskMap_.clear();
+
+ // Remove dispatcher's reference to us.
+ dispatcher_->manager_ = NULL;
+ }
+}
+
+shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
+ Synchronized s(monitor_);
+ return threadFactory_;
+}
+
+void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
+ Synchronized s(monitor_);
+ threadFactory_ = value;
+}
+
+size_t TimerManager::taskCount() const {
+ return taskCount_;
+}
+
+void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
+ int64_t now = Util::currentTime();
+ timeout += now;
+
+ {
+ Synchronized s(monitor_);
+ if (state_ != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
+
+ // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
+ // if the expiration time is shorter than the current value. Need to test before we insert,
+ // because the new task might insert at the front.
+ bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first;
+
+ taskCount_++;
+ taskMap_.insert(
+ std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
+
+ // If the task map was empty, or if we have an expiration that is earlier
+ // than any previously seen, kick the dispatcher so it can update its
+ // timeout
+ if (notifyRequired) {
+ monitor_.notify();
+ }
+ }
+}
+
+void TimerManager::add(shared_ptr<Runnable> task, const struct THRIFT_TIMESPEC& value) {
+
+ int64_t expiration;
+ Util::toMilliseconds(expiration, value);
+
+ int64_t now = Util::currentTime();
+
+ if (expiration < now) {
+ throw InvalidArgumentException();
+ }
+
+ add(task, expiration - now);
+}
+
+void TimerManager::add(shared_ptr<Runnable> task, const struct timeval& value) {
+
+ int64_t expiration;
+ Util::toMilliseconds(expiration, value);
+
+ int64_t now = Util::currentTime();
+
+ if (expiration < now) {
+ throw InvalidArgumentException();
+ }
+
+ add(task, expiration - now);
+}
+
+void TimerManager::remove(shared_ptr<Runnable> task) {
+ (void)task;
+ Synchronized s(monitor_);
+ if (state_ != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
+}
+
+TimerManager::STATE TimerManager::state() const {
+ return state_;
+}
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.h
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.h
new file mode 100644
index 0000000..3946827
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.h
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_
+#define _THRIFT_CONCURRENCY_TIMERMANAGER_H_ 1
+
+#include <thrift/concurrency/Exception.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+#include <map>
+#include <time.h>
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+/**
+ * Timer Manager
+ *
+ * This class dispatches timer tasks when they fall due.
+ *
+ * @version $Id:$
+ */
+class TimerManager {
+
+public:
+ TimerManager();
+
+ virtual ~TimerManager();
+
+ virtual boost::shared_ptr<const ThreadFactory> threadFactory() const;
+
+ virtual void threadFactory(boost::shared_ptr<const ThreadFactory> value);
+
+ /**
+ * Starts the timer manager service
+ *
+ * @throws IllegalArgumentException Missing thread factory attribute
+ */
+ virtual void start();
+
+ /**
+ * Stops the timer manager service
+ */
+ virtual void stop();
+
+ virtual size_t taskCount() const;
+
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * @param task The task to execute
+ * @param timeout Time in milliseconds to delay before executing task
+ */
+ virtual void add(boost::shared_ptr<Runnable> task, int64_t timeout);
+
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * @param task The task to execute
+ * @param timeout Absolute time in the future to execute task.
+ */
+ virtual void add(boost::shared_ptr<Runnable> task, const struct THRIFT_TIMESPEC& timeout);
+
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * @param task The task to execute
+ * @param timeout Absolute time in the future to execute task.
+ */
+ virtual void add(boost::shared_ptr<Runnable> task, const struct timeval& timeout);
+
+ /**
+ * Removes a pending task
+ *
+ * @throws NoSuchTaskException Specified task doesn't exist. It was either
+ * processed already or this call was made for a
+ * task that was never added to this timer
+ *
+ * @throws UncancellableTaskException Specified task is already being
+ * executed or has completed execution.
+ */
+ virtual void remove(boost::shared_ptr<Runnable> task);
+
+ enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
+
+ virtual STATE state() const;
+
+private:
+ boost::shared_ptr<const ThreadFactory> threadFactory_;
+ class Task;
+ friend class Task;
+ std::multimap<int64_t, boost::shared_ptr<Task> > taskMap_;
+ size_t taskCount_;
+ Monitor monitor_;
+ STATE state_;
+ class Dispatcher;
+ friend class Dispatcher;
+ boost::shared_ptr<Dispatcher> dispatcher_;
+ boost::shared_ptr<Thread> dispatcherThread_;
+ typedef std::multimap<int64_t, boost::shared_ptr<TimerManager::Task> >::iterator task_iterator;
+ typedef std::pair<task_iterator, task_iterator> task_range;
+};
+}
+}
+} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.cpp
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.cpp
new file mode 100644
index 0000000..dd6d19f
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.cpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+
+#include <thrift/Thrift.h>
+#include <thrift/concurrency/Util.h>
+
+#if defined(HAVE_SYS_TIME_H)
+#include <sys/time.h>
+#endif
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+int64_t Util::currentTimeTicks(int64_t ticksPerSec) {
+ int64_t result;
+ struct timeval now;
+ int ret = THRIFT_GETTIMEOFDAY(&now, NULL);
+ assert(ret == 0);
+ THRIFT_UNUSED_VARIABLE(ret); // squelching "unused variable" warning
+ toTicks(result, now, ticksPerSec);
+ return result;
+}
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.h
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.h
new file mode 100644
index 0000000..ba070b6
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.h
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_UTIL_H_
+#define _THRIFT_CONCURRENCY_UTIL_H_ 1
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <time.h>
+
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#endif
+
+#include <thrift/transport/PlatformSocket.h>
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+/**
+ * Utility methods
+ *
+ * This class contains basic utility methods for converting time formats,
+ * and other common platform-dependent concurrency operations.
+ * It should not be included in API headers for other concurrency library
+ * headers, since it will, by definition, pull in all sorts of horrid
+ * platform dependent stuff. Rather it should be inluded directly in
+ * concurrency library implementation source.
+ *
+ * @version $Id:$
+ */
+class Util {
+
+ static const int64_t NS_PER_S = 1000000000LL;
+ static const int64_t US_PER_S = 1000000LL;
+ static const int64_t MS_PER_S = 1000LL;
+
+ static const int64_t NS_PER_MS = NS_PER_S / MS_PER_S;
+ static const int64_t NS_PER_US = NS_PER_S / US_PER_S;
+ static const int64_t US_PER_MS = US_PER_S / MS_PER_S;
+
+public:
+ /**
+ * Converts millisecond timestamp into a THRIFT_TIMESPEC struct
+ *
+ * @param struct THRIFT_TIMESPEC& result
+ * @param time or duration in milliseconds
+ */
+ static void toTimespec(struct THRIFT_TIMESPEC& result, int64_t value) {
+ result.tv_sec = value / MS_PER_S; // ms to s
+ result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns
+ }
+
+ static void toTimeval(struct timeval& result, int64_t value) {
+ result.tv_sec = static_cast<uint32_t>(value / MS_PER_S); // ms to s
+ result.tv_usec = static_cast<uint32_t>((value % MS_PER_S) * US_PER_MS); // ms to us
+ }
+
+ static void toTicks(int64_t& result,
+ int64_t secs,
+ int64_t oldTicks,
+ int64_t oldTicksPerSec,
+ int64_t newTicksPerSec) {
+ result = secs * newTicksPerSec;
+ result += oldTicks * newTicksPerSec / oldTicksPerSec;
+
+ int64_t oldPerNew = oldTicksPerSec / newTicksPerSec;
+ if (oldPerNew && ((oldTicks % oldPerNew) >= (oldPerNew / 2))) {
+ ++result;
+ }
+ }
+ /**
+ * Converts struct THRIFT_TIMESPEC to arbitrary-sized ticks since epoch
+ */
+ static void toTicks(int64_t& result, const struct THRIFT_TIMESPEC& value, int64_t ticksPerSec) {
+ return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec);
+ }
+
+ /**
+ * Converts struct timeval to arbitrary-sized ticks since epoch
+ */
+ static void toTicks(int64_t& result, const struct timeval& value, int64_t ticksPerSec) {
+ return toTicks(result, value.tv_sec, value.tv_usec, US_PER_S, ticksPerSec);
+ }
+
+ /**
+ * Converts struct THRIFT_TIMESPEC to milliseconds
+ */
+ static void toMilliseconds(int64_t& result, const struct THRIFT_TIMESPEC& value) {
+ return toTicks(result, value, MS_PER_S);
+ }
+
+ /**
+ * Converts struct timeval to milliseconds
+ */
+ static void toMilliseconds(int64_t& result, const struct timeval& value) {
+ return toTicks(result, value, MS_PER_S);
+ }
+
+ /**
+ * Converts struct THRIFT_TIMESPEC to microseconds
+ */
+ static void toUsec(int64_t& result, const struct THRIFT_TIMESPEC& value) {
+ return toTicks(result, value, US_PER_S);
+ }
+
+ /**
+ * Converts struct timeval to microseconds
+ */
+ static void toUsec(int64_t& result, const struct timeval& value) {
+ return toTicks(result, value, US_PER_S);
+ }
+
+ /**
+ * Get current time as a number of arbitrary-size ticks from epoch
+ */
+ static int64_t currentTimeTicks(int64_t ticksPerSec);
+
+ /**
+ * Get current time as milliseconds from epoch
+ */
+ static int64_t currentTime() { return currentTimeTicks(MS_PER_S); }
+
+ /**
+ * Get current time as micros from epoch
+ */
+ static int64_t currentTimeUsec() { return currentTimeTicks(US_PER_S); }
+};
+}
+}
+} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/cxxfunctional.h
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/cxxfunctional.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/cxxfunctional.h
new file mode 100644
index 0000000..dadaac3
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/cxxfunctional.h
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CXXFUNCTIONAL_H_
+#define _THRIFT_CXXFUNCTIONAL_H_ 1
+
+// clang-format off
+
+/**
+ * Loads <functional> from the 'right' location, depending
+ * on compiler and whether or not it's using C++03 with TR1
+ * or C++11.
+ */
+
+/*
+ * MSVC 10 and 11 have the <functional> stuff at <functional>.
+ * In MSVC 10 all of the implementations live in std::tr1.
+ * In MSVC 11 all of the implementations live in std, with aliases
+ * in std::tr1 to point to the ones in std.
+ */
+#ifdef _WIN32
+ #define _THRIFT_USING_MICROSOFT_STDLIB 1
+#endif
+
+#ifdef __clang__
+ /* Clang has two options, depending on standard library:
+ * - no -stdlib or -stdlib=libstdc++ set; uses GNU libstdc++.
+ * <tr1/functional>
+ * - -stdlib=libc++; uses LLVM libc++.
+ * <functional>, no 'std::tr1'.
+ *
+ * The compiler itself doesn't define anything differently
+ * depending on the value of -stdlib, but the library headers
+ * will set different preprocessor options. In order to check,
+ * though, we have to pull in some library header.
+ */
+ #include <utility>
+
+ /* With LLVM libc++, utility pulls in __config, which sets
+ _LIBCPP_VERSION. */
+ #if defined(_LIBCPP_VERSION)
+ #define _THRIFT_USING_CLANG_LIBCXX 1
+
+ /* With GNU libstdc++, utility pulls in bits/c++config.h,
+ which sets __GLIBCXX__. */
+ #elif defined(__GLIBCXX__)
+ #define _THRIFT_USING_GNU_LIBSTDCXX 1
+
+ /* No idea. */
+ #else
+ #error Unable to detect which C++ standard library is in use.
+ #endif
+#elif __GNUC__
+ #define _THRIFT_USING_GNU_LIBSTDCXX 1
+#endif
+
+#if _THRIFT_USING_MICROSOFT_STDLIB
+ #include <functional>
+
+ namespace apache { namespace thrift { namespace stdcxx {
+ using ::std::tr1::function;
+ using ::std::tr1::bind;
+
+ namespace placeholders {
+ using ::std::tr1::placeholders::_1;
+ using ::std::tr1::placeholders::_2;
+ using ::std::tr1::placeholders::_3;
+ using ::std::tr1::placeholders::_4;
+ using ::std::tr1::placeholders::_5;
+ using ::std::tr1::placeholders::_6;
+ } // apache::thrift::stdcxx::placeholders
+ }}} // apache::thrift::stdcxx
+
+#elif _THRIFT_USING_CLANG_LIBCXX
+ #include <functional>
+
+ namespace apache { namespace thrift { namespace stdcxx {
+ using ::std::function;
+ using ::std::bind;
+
+ namespace placeholders {
+ using ::std::placeholders::_1;
+ using ::std::placeholders::_2;
+ using ::std::placeholders::_3;
+ using ::std::placeholders::_4;
+ using ::std::placeholders::_5;
+ using ::std::placeholders::_6;
+ } // apache::thrift::stdcxx::placeholders
+ }}} // apache::thrift::stdcxx
+
+#elif _THRIFT_USING_GNU_LIBSTDCXX
+ #include <tr1/functional>
+
+ namespace apache { namespace thrift { namespace stdcxx {
+ using ::std::tr1::function;
+ using ::std::tr1::bind;
+
+ namespace placeholders {
+ using ::std::tr1::placeholders::_1;
+ using ::std::tr1::placeholders::_2;
+ using ::std::tr1::placeholders::_3;
+ using ::std::tr1::placeholders::_4;
+ using ::std::tr1::placeholders::_5;
+ using ::std::tr1::placeholders::_6;
+ } // apache::thrift::stdcxx::placeholders
+ }}} // apache::thrift::stdcxx
+#endif
+
+ // Alias for thrift c++ compatibility namespace
+ namespace tcxx = apache::thrift::stdcxx;
+
+#endif // #ifndef _THRIFT_CXXFUNCTIONAL_H_
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.cpp
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.cpp
new file mode 100644
index 0000000..8c9a463
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.cpp
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/processor/PeekProcessor.h>
+
+using namespace apache::thrift::transport;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift;
+
+namespace apache {
+namespace thrift {
+namespace processor {
+
+PeekProcessor::PeekProcessor() {
+ memoryBuffer_.reset(new TMemoryBuffer());
+ targetTransport_ = memoryBuffer_;
+}
+PeekProcessor::~PeekProcessor() {
+}
+
+void PeekProcessor::initialize(boost::shared_ptr<TProcessor> actualProcessor,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<TPipedTransportFactory> transportFactory) {
+ actualProcessor_ = actualProcessor;
+ pipedProtocol_ = protocolFactory->getProtocol(targetTransport_);
+ transportFactory_ = transportFactory;
+ transportFactory_->initializeTargetTransport(targetTransport_);
+}
+
+boost::shared_ptr<TTransport> PeekProcessor::getPipedTransport(boost::shared_ptr<TTransport> in) {
+ return transportFactory_->getTransport(in);
+}
+
+void PeekProcessor::setTargetTransport(boost::shared_ptr<TTransport> targetTransport) {
+ targetTransport_ = targetTransport;
+ if (boost::dynamic_pointer_cast<TMemoryBuffer>(targetTransport_)) {
+ memoryBuffer_ = boost::dynamic_pointer_cast<TMemoryBuffer>(targetTransport);
+ } else if (boost::dynamic_pointer_cast<TPipedTransport>(targetTransport_)) {
+ memoryBuffer_ = boost::dynamic_pointer_cast<TMemoryBuffer>(
+ boost::dynamic_pointer_cast<TPipedTransport>(targetTransport_)->getTargetTransport());
+ }
+
+ if (!memoryBuffer_) {
+ throw TException(
+ "Target transport must be a TMemoryBuffer or a TPipedTransport with TMemoryBuffer");
+ }
+}
+
+bool PeekProcessor::process(boost::shared_ptr<TProtocol> in,
+ boost::shared_ptr<TProtocol> out,
+ void* connectionContext) {
+
+ std::string fname;
+ TMessageType mtype;
+ int32_t seqid;
+ in->readMessageBegin(fname, mtype, seqid);
+
+ if (mtype != T_CALL && mtype != T_ONEWAY) {
+ throw TException("Unexpected message type");
+ }
+
+ // Peek at the name
+ peekName(fname);
+
+ TType ftype;
+ int16_t fid;
+ while (true) {
+ in->readFieldBegin(fname, ftype, fid);
+ if (ftype == T_STOP) {
+ break;
+ }
+
+ // Peek at the variable
+ peek(in, ftype, fid);
+ in->readFieldEnd();
+ }
+ in->readMessageEnd();
+ in->getTransport()->readEnd();
+
+ //
+ // All the data is now in memoryBuffer_ and ready to be processed
+ //
+
+ // Let's first take a peek at the full data in memory
+ uint8_t* buffer;
+ uint32_t size;
+ memoryBuffer_->getBuffer(&buffer, &size);
+ peekBuffer(buffer, size);
+
+ // Done peeking at variables
+ peekEnd();
+
+ bool ret = actualProcessor_->process(pipedProtocol_, out, connectionContext);
+ memoryBuffer_->resetBuffer();
+ return ret;
+}
+
+void PeekProcessor::peekName(const std::string& fname) {
+ (void)fname;
+}
+
+void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) {
+ (void)buffer;
+ (void)size;
+}
+
+void PeekProcessor::peek(boost::shared_ptr<TProtocol> in, TType ftype, int16_t fid) {
+ (void)fid;
+ in->skip(ftype);
+}
+
+void PeekProcessor::peekEnd() {
+}
+}
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.h
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.h
new file mode 100644
index 0000000..21c5999
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PEEKPROCESSOR_H
+#define PEEKPROCESSOR_H
+
+#include <string>
+#include <thrift/TProcessor.h>
+#include <thrift/transport/TTransport.h>
+#include <thrift/transport/TTransportUtils.h>
+#include <thrift/transport/TBufferTransports.h>
+#include <boost/shared_ptr.hpp>
+
+namespace apache {
+namespace thrift {
+namespace processor {
+
+/*
+ * Class for peeking at the raw data that is being processed by another processor
+ * and gives the derived class a chance to change behavior accordingly
+ *
+ */
+class PeekProcessor : public apache::thrift::TProcessor {
+
+public:
+ PeekProcessor();
+ virtual ~PeekProcessor();
+
+ // Input here: actualProcessor - the underlying processor
+ // protocolFactory - the protocol factory used to wrap the memory buffer
+ // transportFactory - this TPipedTransportFactory is used to wrap the source transport
+ // via a call to getPipedTransport
+ void initialize(
+ boost::shared_ptr<apache::thrift::TProcessor> actualProcessor,
+ boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocolFactory,
+ boost::shared_ptr<apache::thrift::transport::TPipedTransportFactory> transportFactory);
+
+ boost::shared_ptr<apache::thrift::transport::TTransport> getPipedTransport(
+ boost::shared_ptr<apache::thrift::transport::TTransport> in);
+
+ void setTargetTransport(boost::shared_ptr<apache::thrift::transport::TTransport> targetTransport);
+
+ virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> in,
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> out,
+ void* connectionContext);
+
+ // The following three functions can be overloaded by child classes to
+ // achieve desired peeking behavior
+ virtual void peekName(const std::string& fname);
+ virtual void peekBuffer(uint8_t* buffer, uint32_t size);
+ virtual void peek(boost::shared_ptr<apache::thrift::protocol::TProtocol> in,
+ apache::thrift::protocol::TType ftype,
+ int16_t fid);
+ virtual void peekEnd();
+
+private:
+ boost::shared_ptr<apache::thrift::TProcessor> actualProcessor_;
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> pipedProtocol_;
+ boost::shared_ptr<apache::thrift::transport::TPipedTransportFactory> transportFactory_;
+ boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> memoryBuffer_;
+ boost::shared_ptr<apache::thrift::transport::TTransport> targetTransport_;
+};
+}
+}
+} // apache::thrift::processor
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/StatsProcessor.h
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/StatsProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/StatsProcessor.h
new file mode 100644
index 0000000..e8ca067
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/StatsProcessor.h
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef STATSPROCESSOR_H
+#define STATSPROCESSOR_H
+
+#include <boost/shared_ptr.hpp>
+#include <thrift/transport/TTransport.h>
+#include <thrift/protocol/TProtocol.h>
+#include <TProcessor.h>
+
+namespace apache {
+namespace thrift {
+namespace processor {
+
+/*
+ * Class for keeping track of function call statistics and printing them if desired
+ *
+ */
+class StatsProcessor : public apache::thrift::TProcessor {
+public:
+ StatsProcessor(bool print, bool frequency) : print_(print), frequency_(frequency) {}
+ virtual ~StatsProcessor(){};
+
+ virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot,
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> poprot,
+ void* serverContext) {
+
+ piprot_ = piprot;
+
+ std::string fname;
+ apache::thrift::protocol::TMessageType mtype;
+ int32_t seqid;
+
+ piprot_->readMessageBegin(fname, mtype, seqid);
+ if (mtype != apache::thrift::protocol::T_CALL && mtype != apache::thrift::protocol::T_ONEWAY) {
+ if (print_) {
+ printf("Unknown message type\n");
+ }
+ throw apache::thrift::TException("Unexpected message type");
+ }
+ if (print_) {
+ printf("%s (", fname.c_str());
+ }
+ if (frequency_) {
+ if (frequency_map_.find(fname) != frequency_map_.end()) {
+ frequency_map_[fname]++;
+ } else {
+ frequency_map_[fname] = 1;
+ }
+ }
+
+ apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ while (true) {
+ piprot_->readFieldBegin(fname, ftype, fid);
+ if (ftype == apache::thrift::protocol::T_STOP) {
+ break;
+ }
+
+ printAndPassToBuffer(ftype);
+ if (print_) {
+ printf(", ");
+ }
+ }
+
+ if (print_) {
+ printf("\b\b)\n");
+ }
+ return true;
+ }
+
+ const std::map<std::string, int64_t>& get_frequency_map() { return frequency_map_; }
+
+protected:
+ void printAndPassToBuffer(apache::thrift::protocol::TType ftype) {
+ switch (ftype) {
+ case apache::thrift::protocol::T_BOOL: {
+ bool boolv;
+ piprot_->readBool(boolv);
+ if (print_) {
+ printf("%d", boolv);
+ }
+ } break;
+ case apache::thrift::protocol::T_BYTE: {
+ int8_t bytev;
+ piprot_->readByte(bytev);
+ if (print_) {
+ printf("%d", bytev);
+ }
+ } break;
+ case apache::thrift::protocol::T_I16: {
+ int16_t i16;
+ piprot_->readI16(i16);
+ if (print_) {
+ printf("%d", i16);
+ }
+ } break;
+ case apache::thrift::protocol::T_I32: {
+ int32_t i32;
+ piprot_->readI32(i32);
+ if (print_) {
+ printf("%d", i32);
+ }
+ } break;
+ case apache::thrift::protocol::T_I64: {
+ int64_t i64;
+ piprot_->readI64(i64);
+ if (print_) {
+ printf("%ld", i64);
+ }
+ } break;
+ case apache::thrift::protocol::T_DOUBLE: {
+ double dub;
+ piprot_->readDouble(dub);
+ if (print_) {
+ printf("%f", dub);
+ }
+ } break;
+ case apache::thrift::protocol::T_STRING: {
+ std::string str;
+ piprot_->readString(str);
+ if (print_) {
+ printf("%s", str.c_str());
+ }
+ } break;
+ case apache::thrift::protocol::T_STRUCT: {
+ std::string name;
+ int16_t fid;
+ apache::thrift::protocol::TType ftype;
+ piprot_->readStructBegin(name);
+ if (print_) {
+ printf("<");
+ }
+ while (true) {
+ piprot_->readFieldBegin(name, ftype, fid);
+ if (ftype == apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ printAndPassToBuffer(ftype);
+ if (print_) {
+ printf(",");
+ }
+ piprot_->readFieldEnd();
+ }
+ piprot_->readStructEnd();
+ if (print_) {
+ printf("\b>");
+ }
+ } break;
+ case apache::thrift::protocol::T_MAP: {
+ apache::thrift::protocol::TType keyType;
+ apache::thrift::protocol::TType valType;
+ uint32_t i, size;
+ piprot_->readMapBegin(keyType, valType, size);
+ if (print_) {
+ printf("{");
+ }
+ for (i = 0; i < size; i++) {
+ printAndPassToBuffer(keyType);
+ if (print_) {
+ printf("=>");
+ }
+ printAndPassToBuffer(valType);
+ if (print_) {
+ printf(",");
+ }
+ }
+ piprot_->readMapEnd();
+ if (print_) {
+ printf("\b}");
+ }
+ } break;
+ case apache::thrift::protocol::T_SET: {
+ apache::thrift::protocol::TType elemType;
+ uint32_t i, size;
+ piprot_->readSetBegin(elemType, size);
+ if (print_) {
+ printf("{");
+ }
+ for (i = 0; i < size; i++) {
+ printAndPassToBuffer(elemType);
+ if (print_) {
+ printf(",");
+ }
+ }
+ piprot_->readSetEnd();
+ if (print_) {
+ printf("\b}");
+ }
+ } break;
+ case apache::thrift::protocol::T_LIST: {
+ apache::thrift::protocol::TType elemType;
+ uint32_t i, size;
+ piprot_->readListBegin(elemType, size);
+ if (print_) {
+ printf("[");
+ }
+ for (i = 0; i < size; i++) {
+ printAndPassToBuffer(elemType);
+ if (print_) {
+ printf(",");
+ }
+ }
+ piprot_->readListEnd();
+ if (print_) {
+ printf("\b]");
+ }
+ } break;
+ default:
+ break;
+ }
+ }
+
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot_;
+ std::map<std::string, int64_t> frequency_map_;
+
+ bool print_;
+ bool frequency_;
+};
+}
+}
+} // apache::thrift::processor
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h
new file mode 100644
index 0000000..0ef7261
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_
+#define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1
+
+#include <thrift/protocol/TProtocolDecorator.h>
+#include <thrift/TApplicationException.h>
+#include <thrift/TProcessor.h>
+#include <boost/tokenizer.hpp>
+
+namespace apache {
+namespace thrift {
+using boost::shared_ptr;
+
+namespace protocol {
+
+/**
+ * To be able to work with any protocol, we needed
+ * to allow them to call readMessageBegin() and get a TMessage in exactly
+ * the standard format, without the service name prepended to TMessage.name.
+ */
+class StoredMessageProtocol : public TProtocolDecorator {
+public:
+ StoredMessageProtocol(shared_ptr<protocol::TProtocol> _protocol,
+ const std::string& _name,
+ const TMessageType _type,
+ const int32_t _seqid)
+ : TProtocolDecorator(_protocol), name(_name), type(_type), seqid(_seqid) {}
+
+ uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid) {
+
+ _name = name;
+ _type = type;
+ _seqid = seqid;
+
+ return 0; // (Normal TProtocol read functions return number of bytes read)
+ }
+
+ std::string name;
+ TMessageType type;
+ int32_t seqid;
+};
+} // namespace protocol
+
+/**
+ * <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing
+ * a single <code>TServer</code> to provide multiple services.
+ *
+ * <p>To do so, you instantiate the processor and then register additional
+ * processors with it, as shown in the following example:</p>
+ *
+ * <blockquote><code>
+ * shared_ptr<TMultiplexedProcessor> processor(new TMultiplexedProcessor());
+ *
+ * processor->registerProcessor(
+ * "Calculator",
+ * shared_ptr<TProcessor>( new CalculatorProcessor(
+ * shared_ptr<CalculatorHandler>( new CalculatorHandler()))));
+ *
+ * processor->registerProcessor(
+ * "WeatherReport",
+ * shared_ptr<TProcessor>( new WeatherReportProcessor(
+ * shared_ptr<WeatherReportHandler>( new WeatherReportHandler()))));
+ *
+ * shared_ptr<TServerTransport> transport(new TServerSocket(9090));
+ * TSimpleServer server(processor, transport);
+ *
+ * server.serve();
+ * </code></blockquote>
+ */
+class TMultiplexedProcessor : public TProcessor {
+public:
+ typedef std::map<std::string, shared_ptr<TProcessor> > services_t;
+
+ /**
+ * 'Register' a service with this <code>TMultiplexedProcessor</code>. This
+ * allows us to broker requests to individual services by using the service
+ * name to select them at request time.
+ *
+ * \param [in] serviceName Name of a service, has to be identical to the name
+ * declared in the Thrift IDL, e.g. "WeatherReport".
+ * \param [in] processor Implementation of a service, usually referred to
+ * as "handlers", e.g. WeatherReportHandler,
+ * implementing WeatherReportIf interface.
+ */
+ void registerProcessor(const std::string& serviceName, shared_ptr<TProcessor> processor) {
+ services[serviceName] = processor;
+ }
+
+ /**
+ * This implementation of <code>process</code> performs the following steps:
+ *
+ * <ol>
+ * <li>Read the beginning of the message.</li>
+ * <li>Extract the service name from the message.</li>
+ * <li>Using the service name to locate the appropriate processor.</li>
+ * <li>Dispatch to the processor, with a decorated instance of TProtocol
+ * that allows readMessageBegin() to return the original TMessage.</li>
+ * </ol>
+ *
+ * \throws TException If the message type is not T_CALL or T_ONEWAY, if
+ * the service name was not found in the message, or if the service
+ * name was not found in the service map.
+ */
+ bool process(shared_ptr<protocol::TProtocol> in,
+ shared_ptr<protocol::TProtocol> out,
+ void* connectionContext) {
+ std::string name;
+ protocol::TMessageType type;
+ int32_t seqid;
+
+ // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
+ // message header. This pulls the message "off the wire", which we'll
+ // deal with at the end of this method.
+ in->readMessageBegin(name, type, seqid);
+
+ if (type != protocol::T_CALL && type != protocol::T_ONEWAY) {
+ // Unexpected message type.
+ in->skip(::apache::thrift::protocol::T_STRUCT);
+ in->readMessageEnd();
+ in->getTransport()->readEnd();
+ const std::string msg("TMultiplexedProcessor: Unexpected message type");
+ ::apache::thrift::TApplicationException
+ x(::apache::thrift::TApplicationException::PROTOCOL_ERROR, msg);
+ out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
+ x.write(out.get());
+ out->writeMessageEnd();
+ out->getTransport()->writeEnd();
+ out->getTransport()->flush();
+ throw TException(msg);
+ }
+
+ // Extract the service name
+
+ boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":"));
+
+ std::vector<std::string> tokens;
+ std::copy(tok.begin(), tok.end(), std::back_inserter(tokens));
+
+ // A valid message should consist of two tokens: the service
+ // name and the name of the method to call.
+ if (tokens.size() == 2) {
+ // Search for a processor associated with this service name.
+ services_t::iterator it = services.find(tokens[0]);
+
+ if (it != services.end()) {
+ shared_ptr<TProcessor> processor = it->second;
+ // Let the processor registered for this service name
+ // process the message.
+ return processor
+ ->process(shared_ptr<protocol::TProtocol>(
+ new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)),
+ out,
+ connectionContext);
+ } else {
+ // Unknown service.
+ in->skip(::apache::thrift::protocol::T_STRUCT);
+ in->readMessageEnd();
+ in->getTransport()->readEnd();
+
+ std::string msg("TMultiplexedProcessor: Unknown service: ");
+ msg += tokens[0];
+ ::apache::thrift::TApplicationException
+ x(::apache::thrift::TApplicationException::PROTOCOL_ERROR, msg);
+ out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
+ x.write(out.get());
+ out->writeMessageEnd();
+ out->getTransport()->writeEnd();
+ out->getTransport()->flush();
+ msg += ". Did you forget to call registerProcessor()?";
+ throw TException(msg);
+ }
+ }
+ return false;
+ }
+
+private:
+ /** Map of service processor objects, indexed by service names. */
+ services_t services;
+};
+}
+}
+
+#endif // THRIFT_TMULTIPLEXEDPROCESSOR_H_
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/protocol/TBase64Utils.cpp
----------------------------------------------------------------------
diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/protocol/TBase64Utils.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/protocol/TBase64Utils.cpp
new file mode 100644
index 0000000..beb76eb
--- /dev/null
+++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/protocol/TBase64Utils.cpp
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/protocol/TBase64Utils.h>
+
+#include <boost/static_assert.hpp>
+
+using std::string;
+
+namespace apache {
+namespace thrift {
+namespace protocol {
+
+static const uint8_t* kBase64EncodeTable
+ = (const uint8_t*)"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+
+void base64_encode(const uint8_t* in, uint32_t len, uint8_t* buf) {
+ buf[0] = kBase64EncodeTable[(in[0] >> 2) & 0x3f];
+ if (len == 3) {
+ buf[1] = kBase64EncodeTable[((in[0] << 4) & 0x30) | ((in[1] >> 4) & 0x0f)];
+ buf[2] = kBase64EncodeTable[((in[1] << 2) & 0x3c) | ((in[2] >> 6) & 0x03)];
+ buf[3] = kBase64EncodeTable[in[2] & 0x3f];
+ } else if (len == 2) {
+ buf[1] = kBase64EncodeTable[((in[0] << 4) & 0x30) | ((in[1] >> 4) & 0x0f)];
+ buf[2] = kBase64EncodeTable[(in[1] << 2) & 0x3c];
+ } else { // len == 1
+ buf[1] = kBase64EncodeTable[(in[0] << 4) & 0x30];
+ }
+}
+
+static const uint8_t kBase64DecodeTable[256] = {
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0x3e,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0x3f,
+ 0x34,
+ 0x35,
+ 0x36,
+ 0x37,
+ 0x38,
+ 0x39,
+ 0x3a,
+ 0x3b,
+ 0x3c,
+ 0x3d,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0x00,
+ 0x01,
+ 0x02,
+ 0x03,
+ 0x04,
+ 0x05,
+ 0x06,
+ 0x07,
+ 0x08,
+ 0x09,
+ 0x0a,
+ 0x0b,
+ 0x0c,
+ 0x0d,
+ 0x0e,
+ 0x0f,
+ 0x10,
+ 0x11,
+ 0x12,
+ 0x13,
+ 0x14,
+ 0x15,
+ 0x16,
+ 0x17,
+ 0x18,
+ 0x19,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0x1a,
+ 0x1b,
+ 0x1c,
+ 0x1d,
+ 0x1e,
+ 0x1f,
+ 0x20,
+ 0x21,
+ 0x22,
+ 0x23,
+ 0x24,
+ 0x25,
+ 0x26,
+ 0x27,
+ 0x28,
+ 0x29,
+ 0x2a,
+ 0x2b,
+ 0x2c,
+ 0x2d,
+ 0x2e,
+ 0x2f,
+ 0x30,
+ 0x31,
+ 0x32,
+ 0x33,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+ 0xff,
+};
+
+void base64_decode(uint8_t* buf, uint32_t len) {
+ buf[0] = (kBase64DecodeTable[buf[0]] << 2) | (kBase64DecodeTable[buf[1]] >> 4);
+ if (len > 2) {
+ buf[1] = ((kBase64DecodeTable[buf[1]] << 4) & 0xf0) | (kBase64DecodeTable[buf[2]] >> 2);
+ if (len > 3) {
+ buf[2] = ((kBase64DecodeTable[buf[2]] << 6) & 0xc0) | (kBase64DecodeTable[buf[3]]);
+ }
+ }
+}
+}
+}
+} // apache::thrift::protocol