You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by ca...@apache.org on 2013/06/07 15:43:09 UTC
[2/3] THRIFT-1753: Multiple C++ Windows, OSX,
and iOS portability issues Client: cpp Patch: Ben Craig
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
new file mode 100644
index 0000000..c300e0d
--- /dev/null
+++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ 1
+
+#include "Thread.h"
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A thread factory to create std::threads.
+ *
+ * @version $Id:$
+ */
+class StdThreadFactory : public ThreadFactory {
+
+ public:
+
+ /**
+ * Std thread factory. All threads created by a factory are reference-counted
+ * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
+ * the Runnable tasks they host will be properly cleaned up once the last strong reference
+ * to both is given up.
+ *
+ * By default threads are not joinable.
+ */
+
+ StdThreadFactory(bool detached=true);
+
+ // From ThreadFactory;
+ boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
+
+ // From ThreadFactory;
+ Thread::id_t getCurrentThreadId() const;
+
+ /**
+ * Sets detached mode of threads
+ */
+ virtual void setDetached(bool detached);
+
+ /**
+ * Gets current detached mode
+ */
+ virtual bool isDetached() const;
+
+private:
+ class Impl;
+ boost::shared_ptr<Impl> impl_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/concurrency/Thread.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h
index 164df0c..6be7d1d 100755
--- a/lib/cpp/src/thrift/concurrency/Thread.h
+++ b/lib/cpp/src/thrift/concurrency/Thread.h
@@ -28,12 +28,14 @@
#include <config.h>
#endif
-#ifdef USE_BOOST_THREAD
-#include <boost/thread.hpp>
-#endif
-
-#ifdef HAVE_PTHREAD_H
-#include <pthread.h>
+#if USE_BOOST_THREAD
+# include <boost/thread.hpp>
+#elif USE_STD_THREAD
+# include <thread>
+#else
+# ifdef HAVE_PTHREAD_H
+# include <pthread.h>
+# endif
#endif
namespace apache { namespace thrift { namespace concurrency {
@@ -80,13 +82,19 @@ class Thread {
public:
-#ifdef USE_BOOST_THREAD
+#if USE_BOOST_THREAD
typedef boost::thread::id id_t;
static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); }
static inline id_t get_current() { return boost::this_thread::get_id(); }
+#elif USE_STD_THREAD
+ typedef std::thread::id id_t;
+
+ static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); }
+ static inline id_t get_current() { return std::this_thread::get_id(); }
#else
typedef pthread_t id_t;
+
static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); }
static inline id_t get_current() { return pthread_self(); }
#endif
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/concurrency/TimerManager.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.cpp b/lib/cpp/src/thrift/concurrency/TimerManager.cpp
index 8be8a6e..98a4b28 100644
--- a/lib/cpp/src/thrift/concurrency/TimerManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/TimerManager.cpp
@@ -263,7 +263,7 @@ void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
}
}
-void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
+void TimerManager::add(shared_ptr<Runnable> task, const struct THRIFT_TIMESPEC& value) {
int64_t expiration;
Util::toMilliseconds(expiration, value);
@@ -277,6 +277,19 @@ void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value)
add(task, expiration - now);
}
+void TimerManager::add(shared_ptr<Runnable> task, const struct timeval& value) {
+
+ int64_t expiration;
+ Util::toMilliseconds(expiration, value);
+
+ int64_t now = Util::currentTime();
+
+ if (expiration < now) {
+ throw InvalidArgumentException();
+ }
+
+ add(task, expiration - now);
+}
void TimerManager::remove(shared_ptr<Runnable> task) {
(void) task;
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/concurrency/TimerManager.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.h b/lib/cpp/src/thrift/concurrency/TimerManager.h
index d905ddb..fdda263 100644
--- a/lib/cpp/src/thrift/concurrency/TimerManager.h
+++ b/lib/cpp/src/thrift/concurrency/TimerManager.h
@@ -77,7 +77,15 @@ class TimerManager {
* @param task The task to execute
* @param timeout Absolute time in the future to execute task.
*/
- virtual void add(boost::shared_ptr<Runnable> task, const struct timespec& timeout);
+ virtual void add(boost::shared_ptr<Runnable> task, const struct THRIFT_TIMESPEC& timeout);
+
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * @param task The task to execute
+ * @param timeout Absolute time in the future to execute task.
+ */
+ virtual void add(boost::shared_ptr<Runnable> task, const struct timeval& timeout);
/**
* Removes a pending task
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/concurrency/Util.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Util.cpp b/lib/cpp/src/thrift/concurrency/Util.cpp
index 764b6f5..1f291da 100644
--- a/lib/cpp/src/thrift/concurrency/Util.cpp
+++ b/lib/cpp/src/thrift/concurrency/Util.cpp
@@ -20,35 +20,22 @@
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
-
+#include <thrift/Thrift.h>
#include "Util.h"
-#if defined(HAVE_CLOCK_GETTIME)
-#include <time.h>
-#elif defined(HAVE_SYS_TIME_H)
+#if defined(HAVE_SYS_TIME_H)
#include <sys/time.h>
-#endif // defined(HAVE_CLOCK_GETTIME)
+#endif
namespace apache { namespace thrift { namespace concurrency {
int64_t Util::currentTimeTicks(int64_t ticksPerSec) {
int64_t result;
-
-#if defined(HAVE_CLOCK_GETTIME)
- struct timespec now;
- int ret = clock_gettime(CLOCK_REALTIME, &now);
- assert(ret == 0);
- ret = ret; //squelching "unused variable" warning
- toTicks(result, now, ticksPerSec);
-#elif defined(HAVE_GETTIMEOFDAY)
struct timeval now;
- int ret = gettimeofday(&now, NULL);
+ int ret = THRIFT_GETTIMEOFDAY(&now, NULL);
assert(ret == 0);
+ THRIFT_UNUSED_VARIABLE(ret); //squelching "unused variable" warning
toTicks(result, now, ticksPerSec);
-#else
-#error "No high-precision clock is available."
-#endif // defined(HAVE_CLOCK_GETTIME)
-
return result;
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/concurrency/Util.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Util.h b/lib/cpp/src/thrift/concurrency/Util.h
index e454227..d63776a 100644
--- a/lib/cpp/src/thrift/concurrency/Util.h
+++ b/lib/cpp/src/thrift/concurrency/Util.h
@@ -56,12 +56,12 @@ class Util {
public:
/**
- * Converts millisecond timestamp into a timespec struct
+ * Converts millisecond timestamp into a THRIFT_TIMESPEC struct
*
- * @param struct timespec& result
+ * @param struct THRIFT_TIMESPEC& result
* @param time or duration in milliseconds
*/
- static void toTimespec(struct timespec& result, int64_t value) {
+ static void toTimespec(struct THRIFT_TIMESPEC& result, int64_t value) {
result.tv_sec = value / MS_PER_S; // ms to s
result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns
}
@@ -82,10 +82,10 @@ class Util {
}
}
/**
- * Converts struct timespec to arbitrary-sized ticks since epoch
+ * Converts struct THRIFT_TIMESPEC to arbitrary-sized ticks since epoch
*/
static void toTicks(int64_t& result,
- const struct timespec& value,
+ const struct THRIFT_TIMESPEC& value,
int64_t ticksPerSec) {
return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec);
}
@@ -100,10 +100,10 @@ class Util {
}
/**
- * Converts struct timespec to milliseconds
+ * Converts struct THRIFT_TIMESPEC to milliseconds
*/
static void toMilliseconds(int64_t& result,
- const struct timespec& value) {
+ const struct THRIFT_TIMESPEC& value) {
return toTicks(result, value, MS_PER_S);
}
@@ -116,9 +116,9 @@ class Util {
}
/**
- * Converts struct timespec to microseconds
+ * Converts struct THRIFT_TIMESPEC to microseconds
*/
- static void toUsec(int64_t& result, const struct timespec& value) {
+ static void toUsec(int64_t& result, const struct THRIFT_TIMESPEC& value) {
return toTicks(result, value, US_PER_S);
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp b/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp
index 8b69df4..9f55245 100644
--- a/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp
+++ b/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp
@@ -32,7 +32,7 @@ using std::string;
static string byte_to_hex(const uint8_t byte) {
char buf[3];
int ret = std::sprintf(buf, "%02x", (int)byte);
- ret = ret; //squelching "unused variable" warning
+ THRIFT_UNUSED_VARIABLE(ret);
assert(ret == 2);
assert(buf[2] == '\0');
return buf;
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 69f0e55..3277f5c 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -27,6 +27,7 @@
#include <thrift/concurrency/Exception.h>
#include <thrift/transport/TSocket.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/transport/PlatformSocket.h>
#include <iostream>
@@ -51,7 +52,6 @@
#include <fcntl.h>
#endif
-#include <errno.h>
#include <assert.h>
#ifdef HAVE_SCHED_H
@@ -62,7 +62,7 @@
#define AF_LOCAL AF_UNIX
#endif
-#ifdef _MSC_VER
+#if !defined(PRIu32)
#define PRIu32 "I32u"
#define PRIu64 "I64u"
#endif
@@ -221,7 +221,7 @@ class TNonblockingServer::TConnection {
class Task;
/// Constructor
- TConnection(int socket, TNonblockingIOThread* ioThread,
+ TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
const sockaddr* addr, socklen_t addrLen) {
readBuffer_ = NULL;
readBufferSize_ = 0;
@@ -232,8 +232,8 @@ class TNonblockingServer::TConnection {
// Allocate input and output transports these only need to be allocated
// once per TConnection (they don't need to be reallocated on init() call)
inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
- outputTransport_.reset(new TMemoryBuffer(
- server_->getWriteBufferDefaultSize()));
+ outputTransport_.reset(
+ new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
tSocket_.reset(new TSocket());
init(socket, ioThread, addr, addrLen);
}
@@ -254,7 +254,7 @@ class TNonblockingServer::TConnection {
void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
- void init(int socket, TNonblockingIOThread* ioThread,
+ void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
const sockaddr* addr, socklen_t addrLen);
/**
@@ -284,7 +284,7 @@ class TNonblockingServer::TConnection {
*
* Don't call this from the IO thread itself.
*
- * @return true if successful, false if unable to notify (check errno).
+ * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
*/
bool notifyIOThread() {
return ioThread_->notify(this);
@@ -389,7 +389,7 @@ class TNonblockingServer::TConnection::Task: public Runnable {
void* connectionContext_;
};
-void TNonblockingServer::TConnection::init(int socket,
+void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
TNonblockingIOThread* ioThread,
const sockaddr* addr,
socklen_t addrLen) {
@@ -827,7 +827,7 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) {
void TNonblockingServer::TConnection::close() {
// Delete the registered libevent
if (event_del(&event_) == -1) {
- GlobalOutput.perror("TConnection::close() event_del", errno);
+ GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
}
if (serverEventHandler_ != NULL) {
@@ -857,7 +857,7 @@ void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
// just start over
- outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
+ outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
largestWriteBufferSize_ = 0;
}
}
@@ -888,7 +888,7 @@ TNonblockingServer::~TNonblockingServer() {
* by allocating a new one entirely
*/
TNonblockingServer::TConnection* TNonblockingServer::createConnection(
- int socket, const sockaddr* addr, socklen_t addrLen) {
+ THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) {
// Check the stack
Guard g(connMutex_);
@@ -935,7 +935,7 @@ void TNonblockingServer::returnConnection(TConnection* connection) {
* Server socket had something happen. We accept all waiting client
* connections on fd and assign TConnection objects to handle those requests.
*/
-void TNonblockingServer::handleEvent(int fd, short which) {
+void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
(void) which;
// Make sure that libevent didn't mess up the socket handles
assert(fd == serverSocket_);
@@ -947,7 +947,7 @@ void TNonblockingServer::handleEvent(int fd, short which) {
addrLen = sizeof(addrStorage);
// Going to accept a new client socket
- int clientSocket;
+ THRIFT_SOCKET clientSocket;
// Accept as many new clients as possible, even though libevent signaled only
// one, this helps us to avoid having to go back into the libevent engine so
@@ -959,12 +959,12 @@ void TNonblockingServer::handleEvent(int fd, short which) {
nConnectionsDropped_++;
nTotalConnectionsDropped_++;
if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
- ::close(clientSocket);
+ ::THRIFT_CLOSESOCKET(clientSocket);
return;
} else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
if (!drainPendingTask()) {
// Nothing left to discard, so we drop connection instead.
- ::close(clientSocket);
+ ::THRIFT_CLOSESOCKET(clientSocket);
return;
}
}
@@ -972,10 +972,10 @@ void TNonblockingServer::handleEvent(int fd, short which) {
// Explicitly set this socket to NONBLOCK mode
int flags;
- if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
- fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
- GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
- ::close(clientSocket);
+ if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 ||
+ THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+ GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR);
+ ::THRIFT_CLOSESOCKET(clientSocket);
return;
}
@@ -986,7 +986,7 @@ void TNonblockingServer::handleEvent(int fd, short which) {
// Fail fast if we could not create a TConnection object
if (clientConnection == NULL) {
GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
- ::close(clientSocket);
+ ::THRIFT_CLOSESOCKET(clientSocket);
return;
}
@@ -1015,8 +1015,8 @@ void TNonblockingServer::handleEvent(int fd, short which) {
// Done looping accept, now we have to make sure the error is due to
// blocking. Any other error is a problem
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
+ if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
+ GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
}
}
@@ -1024,7 +1024,7 @@ void TNonblockingServer::handleEvent(int fd, short which) {
* Creates a socket to listen on and binds it to the local port.
*/
void TNonblockingServer::createAndListenOnSocket() {
- int s;
+ THRIFT_SOCKET s;
struct addrinfo hints, *res, *res0;
int error;
@@ -1040,7 +1040,7 @@ void TNonblockingServer::createAndListenOnSocket() {
error = getaddrinfo(NULL, port, &hints, &res0);
if (error) {
throw TException("TNonblockingServer::serve() getaddrinfo " +
- string(gai_strerror(error)));
+ string(THRIFT_GAI_STRERROR(error)));
}
// Pick the ipv6 address first since ipv4 addresses can be mapped
@@ -1069,15 +1069,15 @@ void TNonblockingServer::createAndListenOnSocket() {
int one = 1;
- // Set reuseaddr to avoid 2MSL delay on server restart
- setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
+ // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
+ setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));
- if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
- ::close(s);
+ if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
+ ::THRIFT_CLOSESOCKET(s);
freeaddrinfo(res0);
throw TTransportException(TTransportException::NOT_OPEN,
"TNonblockingServer::serve() bind",
- errno);
+ THRIFT_GET_SOCKET_ERROR);
}
// Done with the addr info
@@ -1091,13 +1091,13 @@ void TNonblockingServer::createAndListenOnSocket() {
* Takes a socket created by listenSocket() and sets various options on it
* to prepare for use in the server.
*/
-void TNonblockingServer::listenSocket(int s) {
+void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
// Set socket to nonblocking mode
int flags;
- if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
- fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
- ::close(s);
- throw TException("TNonblockingServer::serve() O_NONBLOCK");
+ if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 ||
+ THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+ ::THRIFT_CLOSESOCKET(s);
+ throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
}
int one = 1;
@@ -1122,7 +1122,7 @@ void TNonblockingServer::listenSocket(int s) {
#endif
if (listen(s, LISTEN_BACKLOG) == -1) {
- ::close(s);
+ ::THRIFT_CLOSESOCKET(s);
throw TException("TNonblockingServer::serve() listen");
}
@@ -1209,7 +1209,7 @@ void TNonblockingServer::serve() {
for (uint32_t id = 0; id < numIOThreads_; ++id) {
// the first IO thread also does the listening on server socket
- int listenFd = (id == 0 ? serverSocket_ : -1);
+ THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : -1);
shared_ptr<TNonblockingIOThread> thread(
new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
@@ -1232,7 +1232,7 @@ void TNonblockingServer::serve() {
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
ioThreadFactory_.reset(new PlatformThreadFactory(
-#ifndef USE_BOOST_THREAD
+#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD)
PlatformThreadFactory::OTHER, // scheduler
PlatformThreadFactory::NORMAL, // priority
1, // stack size (MB)
@@ -1263,7 +1263,7 @@ void TNonblockingServer::serve() {
TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
int number,
- int listenSocket,
+ THRIFT_SOCKET listenSocket,
bool useHighPriority)
: server_(server)
, number_(number)
@@ -1283,18 +1283,18 @@ TNonblockingIOThread::~TNonblockingIOThread() {
}
if (listenSocket_ >= 0) {
- if (0 != ::close(listenSocket_)) {
+ if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
- errno);
+ THRIFT_GET_SOCKET_ERROR);
}
listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE;
}
for (int i = 0; i < 2; ++i) {
if (notificationPipeFDs_[i] >= 0) {
- if (0 != ::close(notificationPipeFDs_[i])) {
+ if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
- errno);
+ THRIFT_GET_SOCKET_ERROR);
}
notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE;
}
@@ -1308,20 +1308,20 @@ void TNonblockingIOThread::createNotificationPipe() {
}
if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
- ::close(notificationPipeFDs_[0]);
- ::close(notificationPipeFDs_[1]);
- throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
+ ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
+ ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
+ throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
}
for (int i = 0; i < 2; ++i) {
#if LIBEVENT_VERSION_NUMBER < 0x02000000
int flags;
- if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
- fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+ if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
+ THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
#else
if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
#endif
- ::close(notificationPipeFDs_[0]);
- ::close(notificationPipeFDs_[1]);
+ ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
+ ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() "
"FD_CLOEXEC");
}
@@ -1372,7 +1372,7 @@ void TNonblockingIOThread::registerEvents() {
}
bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
- int fd = getNotificationSendFD();
+ THRIFT_SOCKET fd = getNotificationSendFD();
if (fd < 0) {
return false;
}
@@ -1412,9 +1412,9 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
// exit the loop
break;
} else { // nBytes < 0
- if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
GlobalOutput.perror(
- "TNonblocking: notifyHandler read() failed: ", errno);
+ "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
ioThread->breakLoop(true);
return;
}
@@ -1474,7 +1474,7 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
GlobalOutput.printf(
"TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
} else {
- GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
+ GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
}
#endif
}
@@ -1520,7 +1520,7 @@ void TNonblockingIOThread::cleanupEvents() {
// stop the listen socket, if any
if (listenSocket_ >= 0) {
if (event_del(&serverEvent_) == -1) {
- GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno);
+ GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
}
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/server/TNonblockingServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h
index e7bbdc5..9e6ba17 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.h
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.h
@@ -22,6 +22,7 @@
#include <thrift/Thrift.h>
#include <thrift/server/TServer.h>
+#include <thrift/transport/PlatformSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/concurrency/ThreadManager.h>
@@ -32,7 +33,6 @@
#include <stack>
#include <vector>
#include <string>
-#include <errno.h>
#include <cstdlib>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
@@ -67,7 +67,7 @@ using apache::thrift::concurrency::Guard;
#endif
#if LIBEVENT_VERSION_NUMBER < 0x02000000
- typedef int evutil_socket_t;
+ typedef THRIFT_SOCKET evutil_socket_t;
#endif
#ifndef SOCKOPT_CAST_T
@@ -146,7 +146,7 @@ class TNonblockingServer : public TServer {
static const int DEFAULT_IO_THREADS = 1;
/// File descriptor of an invalid socket
- static const int INVALID_SOCKET_VALUE = -1;
+ static const THRIFT_SOCKET INVALID_SOCKET_VALUE = -1;
/// # of IO threads this server will use
size_t numIOThreads_;
@@ -155,7 +155,7 @@ class TNonblockingServer : public TServer {
bool useHighPriorityIOThreads_;
/// Server socket file descriptor
- int serverSocket_;
+ THRIFT_SOCKET serverSocket_;
/// Port server runs on
int port_;
@@ -271,7 +271,7 @@ class TNonblockingServer : public TServer {
* @param fd the listen socket.
* @param which the event flag that triggered the handler.
*/
- void handleEvent(int fd, short which);
+ void handleEvent(THRIFT_SOCKET fd, short which);
void init(int port) {
serverSocket_ = -1;
@@ -774,7 +774,7 @@ class TNonblockingServer : public TServer {
*
* @param fd descriptor of socket to be initialized/
*/
- void listenSocket(int fd);
+ void listenSocket(THRIFT_SOCKET fd);
/**
* Return an initialized connection object. Creates or recovers from
* pool a TConnection and initializes it with the provided socket FD
@@ -785,7 +785,7 @@ class TNonblockingServer : public TServer {
* @param addrLen the length of addr
* @return pointer to initialized TConnection object.
*/
- TConnection* createConnection(int socket, const sockaddr* addr,
+ TConnection* createConnection(THRIFT_SOCKET socket, const sockaddr* addr,
socklen_t addrLen);
/**
@@ -805,7 +805,7 @@ class TNonblockingIOThread : public Runnable {
// listenSocket is < 0, accepting will not be done.
TNonblockingIOThread(TNonblockingServer* server,
int number,
- int listenSocket,
+ THRIFT_SOCKET listenSocket,
bool useHighPriority);
~TNonblockingIOThread();
@@ -896,7 +896,7 @@ class TNonblockingIOThread : public Runnable {
Thread::id_t threadId_;
/// If listenSocket_ >= 0, adds an event on the event_base to accept conns
- int listenSocket_;
+ THRIFT_SOCKET listenSocket_;
/// Sets a high scheduling priority when running
bool useHighPriority_;
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/transport/PlatformSocket.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/PlatformSocket.h b/lib/cpp/src/thrift/transport/PlatformSocket.h
new file mode 100644
index 0000000..40a4246
--- /dev/null
+++ b/lib/cpp/src/thrift/transport/PlatformSocket.h
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_PLATFORM_SOCKET_H_
+# define _THRIFT_TRANSPORT_PLATFORM_SOCKET_H_
+
+#ifdef _WIN32
+# define THRIFT_GET_SOCKET_ERROR ::WSAGetLastError()
+# define THRIFT_EINPROGRESS WSAEINPROGRESS
+# define THRIFT_EAGAIN WSAEWOULDBLOCK
+# define THRIFT_EINTR WSAEINTR
+# define THRIFT_ECONNRESET WSAECONNRESET
+# define THRIFT_ENOTCONN WSAENOTCONN
+# define THRIFT_ETIMEDOUT WSAETIMEDOUT
+# define THRIFT_EWOULDBLOCK WSAEWOULDBLOCK
+# define THRIFT_EPIPE WSAECONNRESET
+# define THRIFT_NO_SOCKET_CACHING SO_EXCLUSIVEADDRUSE
+# define THRIFT_SOCKET SOCKET
+# define THRIFT_SOCKETPAIR thrift_socketpair
+# define THRIFT_FCNTL thrift_fcntl
+# define THRIFT_O_NONBLOCK 1
+# define THRIFT_F_GETFL 0
+# define THRIFT_F_SETFL 1
+# define THRIFT_GETTIMEOFDAY thrift_gettimeofday
+# define THRIFT_CLOSESOCKET closesocket
+# define THRIFT_GAI_STRERROR gai_strerrorA
+# define THRIFT_SSIZET ptrdiff_t
+# define THRIFT_SNPRINTF _snprintf
+# define THRIFT_SLEEP_SEC thrift_sleep
+# define THRIFT_SLEEP_USEC thrift_usleep
+# define THRIFT_TIMESPEC thrift_timespec
+# define THRIFT_CTIME_R thrift_ctime_r
+# define THRIFT_POLL thrift_poll
+# if WINVER <= 0x0502 //XP, Server2003
+# define THRIFT_POLLFD thrift_pollfd
+# define THRIFT_POLLIN 0x0300
+# define THRIFT_POLLOUT 0x0010
+# else //Vista, Win7...
+# define THRIFT_POLLFD pollfd
+# define THRIFT_POLLIN POLLIN
+# define THRIFT_POLLOUT POLLOUT
+# endif //WINVER
+# define THRIFT_SHUT_RDWR SD_BOTH
+#else //not _WIN32
+# include <errno.h>
+# define THRIFT_GET_SOCKET_ERROR errno
+# define THRIFT_EINTR EINTR
+# define THRIFT_EINPROGRESS EINPROGRESS
+# define THRIFT_ECONNRESET ECONNRESET
+# define THRIFT_ENOTCONN ENOTCONN
+# define THRIFT_ETIMEDOUT ETIMEDOUT
+# define THRIFT_EWOULDBLOCK EWOULDBLOCK
+# define THRIFT_EAGAIN EAGAIN
+# define THRIFT_EPIPE EPIPE
+# define THRIFT_NO_SOCKET_CACHING SO_REUSEADDR
+# define THRIFT_SOCKET int
+# define THRIFT_SOCKETPAIR socketpair
+# define THRIFT_FCNTL fcntl
+# define THRIFT_O_NONBLOCK O_NONBLOCK
+# define THRIFT_F_GETFL F_GETFL
+# define THRIFT_F_SETFL F_SETFL
+# define THRIFT_GETTIMEOFDAY gettimeofday
+# define THRIFT_CLOSESOCKET close
+# define THRIFT_GAI_STRERROR gai_strerror
+# define THRIFT_SSIZET ssize_t
+# define THRIFT_SNPRINTF snprintf
+# define THRIFT_SLEEP_SEC sleep
+# define THRIFT_SLEEP_USEC usleep
+# define THRIFT_TIMESPEC timespec
+# define THRIFT_CTIME_R ctime_r
+# define THRIFT_POLL poll
+# define THRIFT_POLLFD pollfd
+# define THRIFT_POLLIN POLLIN
+# define THRIFT_POLLOUT POLLOUT
+# define THRIFT_SHUT_RDWR SHUT_RDWR
+#endif
+
+#endif // _THRIFT_TRANSPORT_PLATFORM_SOCKET_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/transport/TFDTransport.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TFDTransport.cpp b/lib/cpp/src/thrift/transport/TFDTransport.cpp
index 176e7bf..3b72de5 100644
--- a/lib/cpp/src/thrift/transport/TFDTransport.cpp
+++ b/lib/cpp/src/thrift/transport/TFDTransport.cpp
@@ -21,6 +21,7 @@
#include <exception>
#include <thrift/transport/TFDTransport.h>
+#include <thrift/transport/PlatformSocket.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
@@ -39,8 +40,8 @@ void TFDTransport::close() {
return;
}
- int rv = ::close(fd_);
- int errno_copy = errno;
+ int rv = ::THRIFT_CLOSESOCKET(fd_);
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
fd_ = -1;
// Have to check uncaught_exception because this is called in the destructor.
if (rv < 0 && !std::uncaught_exception()) {
@@ -54,14 +55,14 @@ uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) {
unsigned int maxRetries = 5; // same as the TSocket default
unsigned int retries = 0;
while (true) {
- ssize_t rv = ::read(fd_, buf, len);
+ THRIFT_SSIZET rv = ::read(fd_, buf, len);
if (rv < 0) {
- if (errno == EINTR && retries < maxRetries) {
+ if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && retries < maxRetries) {
// If interrupted, try again
++retries;
continue;
}
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
throw TTransportException(TTransportException::UNKNOWN,
"TFDTransport::read()",
errno_copy);
@@ -74,10 +75,10 @@ uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) {
void TFDTransport::write(const uint8_t* buf, uint32_t len) {
while (len > 0) {
- ssize_t rv = ::write(fd_, buf, len);
+ THRIFT_SSIZET rv = ::write(fd_, buf, len);
if (rv < 0) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
throw TTransportException(TTransportException::UNKNOWN,
"TFDTransport::write()",
errno_copy);
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/transport/TFileTransport.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.cpp b/lib/cpp/src/thrift/transport/TFileTransport.cpp
index 4b6ea47..137e47d 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.cpp
+++ b/lib/cpp/src/thrift/transport/TFileTransport.cpp
@@ -23,17 +23,15 @@
#include "TFileTransport.h"
#include "TTransportUtils.h"
+#include "PlatformSocket.h"
+#include <thrift/concurrency/FunctionRunner.h>
-#ifdef HAVE_PTHREAD_H
-#include <pthread.h>
-#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#else
#include <time.h>
#endif
#include <fcntl.h>
-#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
@@ -60,27 +58,6 @@ using namespace std;
using namespace apache::thrift::protocol;
using namespace apache::thrift::concurrency;
-#ifndef HAVE_CLOCK_GETTIME
-
-/**
- * Fake clock_gettime for systems like darwin
- *
- */
-#define CLOCK_REALTIME 0
-static int clock_gettime(int clk_id /*ignored*/, struct timespec *tp) {
- struct timeval now;
-
- int rv = gettimeofday(&now, NULL);
- if (rv != 0) {
- return rv;
- }
-
- tp->tv_sec = now.tv_sec;
- tp->tv_nsec = now.tv_usec * 1000;
- return 0;
-}
-#endif
-
TFileTransport::TFileTransport(string path, bool readOnly)
: readState_()
, readBuff_(NULL)
@@ -96,7 +73,6 @@ TFileTransport::TFileTransport(string path, bool readOnly)
, eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
, corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
, writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US)
- , writerThreadId_(0)
, dequeueBuffer_(NULL)
, enqueueBuffer_(NULL)
, notFull_(&mutex_)
@@ -112,6 +88,7 @@ TFileTransport::TFileTransport(string path, bool readOnly)
, numCorruptedEventsInChunk_(0)
, readOnly_(readOnly)
{
+ threadFactory_.setDetached(false);
openLogFile();
}
@@ -124,8 +101,8 @@ void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
// flush any events in the queue
flush();
GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
- if (-1 == ::close(fd_)) {
- int errno_copy = errno;
+ if (-1 == ::THRIFT_CLOSESOCKET(fd_)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
} else {
@@ -145,25 +122,16 @@ void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
TFileTransport::~TFileTransport() {
// flush the buffer if a writer thread is active
-#ifdef USE_BOOST_THREAD
- if(writerThreadId_.get()) {
-#else
- if (writerThreadId_ > 0) {
-#endif
+ if(writerThread_.get()) {
// set state to closing
closing_ = true;
// wake up the writer thread
// Since closing_ is true, it will attempt to flush all data, then exit.
- notEmpty_.notify();
+ notEmpty_.notify();
-#ifdef USE_BOOST_THREAD
- writerThreadId_->join();
- writerThreadId_.reset();
-#else
- pthread_join(writerThreadId_, NULL);
- writerThreadId_ = 0;
-#endif
+ writerThread_->join();
+ writerThread_.reset();
}
if (dequeueBuffer_) {
@@ -188,8 +156,8 @@ TFileTransport::~TFileTransport() {
// close logfile
if (fd_ > 0) {
- if(-1 == ::close(fd_)) {
- GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", errno);
+ if(-1 == ::THRIFT_CLOSESOCKET(fd_)) {
+ GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_GET_SOCKET_ERROR);
} else {
//successfully closed fd
fd_ = 0;
@@ -203,18 +171,11 @@ bool TFileTransport::initBufferAndWriteThread() {
return false;
}
-#ifdef USE_BOOST_THREAD
- if(!writerThreadId_.get()) {
- writerThreadId_ = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(startWriterThread, (void *)this)));
- }
-#else
- if (writerThreadId_ == 0) {
- if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
- T_ERROR("%s", "Could not create writer thread");
- return false;
- }
+ if(!writerThread_.get()) {
+ writerThread_ = threadFactory_.newThread(
+ apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
+ writerThread_->start();
}
-#endif
dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
@@ -295,7 +256,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
// it is probably a non-factor for the time being
}
-bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
+bool TFileTransport::swapEventBuffers(struct timeval* deadline) {
bool swap;
Guard g(mutex_);
@@ -341,7 +302,7 @@ void TFileTransport::writerThread() {
try {
openLogFile();
} catch (...) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
fd_ = 0;
hasIOError = true;
@@ -361,14 +322,14 @@ void TFileTransport::writerThread() {
#endif
readState_.resetAllValues();
} catch (...) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
hasIOError = true;
}
}
// Figure out the next time by which a flush must take place
- struct timespec ts_next_flush;
+ struct timeval ts_next_flush;
getNextFlushTime(&ts_next_flush);
uint32_t unflushed = 0;
@@ -376,11 +337,7 @@ void TFileTransport::writerThread() {
// this will only be true when the destructor is being invoked
if (closing_) {
if (hasIOError) {
-#ifndef USE_BOOST_THREAD
- pthread_exit(NULL);
-#else
- return;
-#endif
+ return;
}
// Try to empty buffers before exit
@@ -388,19 +345,15 @@ void TFileTransport::writerThread() {
#ifndef _WIN32
fsync(fd_);
#endif
- if (-1 == ::close(fd_)) {
- int errno_copy = errno;
+ if (-1 == ::THRIFT_CLOSESOCKET(fd_)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
} else {
//fd successfully closed
fd_ = 0;
}
-#ifndef USE_BOOST_THREAD
- pthread_exit(NULL);
-#else
return;
-#endif
- }
+ }
}
if (swapEventBuffers(&ts_next_flush)) {
@@ -413,16 +366,12 @@ void TFileTransport::writerThread() {
while (hasIOError) {
T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
- usleep(writerThreadIOErrorSleepTime_);
+ THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
if (closing_) {
-#ifndef USE_BOOST_THREAD
- pthread_exit(NULL);
-#else
return;
-#endif
}
if (!fd_) {
- ::close(fd_);
+ ::THRIFT_CLOSESOCKET(fd_);
fd_ = 0;
}
try {
@@ -463,7 +412,7 @@ void TFileTransport::writerThread() {
memset(zeros, '\0', padding);
boost::scoped_array<uint8_t> array(zeros);
if (-1 == ::write(fd_, zeros, padding)) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
hasIOError = true;
continue;
@@ -476,7 +425,7 @@ void TFileTransport::writerThread() {
// write the dequeued event to the file
if (outEvent->eventSize_ > 0) {
if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
hasIOError = true;
continue;
@@ -523,11 +472,11 @@ void TFileTransport::writerThread() {
if (forced_flush || unflushed > flushMaxBytes_) {
flush = true;
} else {
- struct timespec current_time;
- clock_gettime(CLOCK_REALTIME, ¤t_time);
+ struct timeval current_time;
+ THRIFT_GETTIMEOFDAY(¤t_time, NULL);
if (current_time.tv_sec > ts_next_flush.tv_sec ||
(current_time.tv_sec == ts_next_flush.tv_sec &&
- current_time.tv_nsec > ts_next_flush.tv_nsec)) {
+ current_time.tv_usec > ts_next_flush.tv_usec)) {
if (unflushed > 0) {
flush = true;
} else {
@@ -560,15 +509,9 @@ void TFileTransport::writerThread() {
void TFileTransport::flush() {
// file must be open for writing for any flushing to take place
-#ifdef USE_BOOST_THREAD
- if (!writerThreadId_.get()) {
- return;
- }
-#else
- if (writerThreadId_ <= 0) {
+ if (!writerThread_.get()) {
return;
}
-#endif
// wait for flush to take place
Guard g(mutex_);
@@ -674,7 +617,7 @@ eventInfo* TFileTransport::readEvent() {
} else if (readState_.bufferLen_ == 0) { // EOF
// wait indefinitely if there is no timeout
if (readTimeout_ == TAIL_READ_TIMEOUT) {
- usleep(eofSleepTime_);
+ THRIFT_SLEEP_USEC(eofSleepTime_);
continue;
} else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
// reset state
@@ -686,7 +629,7 @@ eventInfo* TFileTransport::readEvent() {
readState_.resetState(0);
return NULL;
} else {
- usleep(readTimeout_ * 1000);
+ THRIFT_SLEEP_USEC(readTimeout_ * 1000);
readTries++;
continue;
}
@@ -818,7 +761,7 @@ void TFileTransport::performRecovery() {
// if tailing the file, wait until there is enough data to start
// the next chunk
while(curChunk == (getNumChunks() - 1)) {
- usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US);
+ THRIFT_SLEEP_USEC(DEFAULT_CORRUPTED_SLEEP_TIME_US);
}
seekToChunk(curChunk + 1);
} else {
@@ -910,7 +853,7 @@ uint32_t TFileTransport::getNumChunks() {
int rv = fstat(fd_, &f_info);
if (rv < 0) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
throw TTransportException(TTransportException::UNKNOWN,
"TFileTransport::getNumChunks() (fstat)",
errno_copy);
@@ -946,21 +889,22 @@ void TFileTransport::openLogFile() {
// make sure open call was successful
if(fd_ == -1) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
}
}
-void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) {
- clock_gettime(CLOCK_REALTIME, ts_next_flush);
- ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000;
- if (ts_next_flush->tv_nsec > 1000000000) {
- ts_next_flush->tv_nsec -= 1000000000;
- ts_next_flush->tv_sec += 1;
+void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) {
+ THRIFT_GETTIMEOFDAY(ts_next_flush, NULL);
+
+ ts_next_flush->tv_usec += flushMaxUs_;
+ if (ts_next_flush->tv_usec > 1000000) {
+ long extra_secs = ts_next_flush->tv_usec / 1000000;
+ ts_next_flush->tv_usec %= 1000000;
+ ts_next_flush->tv_sec += extra_secs;
}
- ts_next_flush->tv_sec += flushMaxUs_ / 1000000;
}
TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/transport/TFileTransport.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.h b/lib/cpp/src/thrift/transport/TFileTransport.h
index 267305d..75941cf 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.h
+++ b/lib/cpp/src/thrift/transport/TFileTransport.h
@@ -27,19 +27,13 @@
#include <string>
#include <stdio.h>
-#ifdef HAVE_PTHREAD_H
-#include <pthread.h>
-#endif
-
-#ifdef USE_BOOST_THREAD
-#include <boost/thread.hpp>
-#endif
-
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <thrift/concurrency/Mutex.h>
#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/Thread.h>
namespace apache { namespace thrift { namespace transport {
@@ -307,13 +301,13 @@ class TFileTransport : public TFileReaderTransport,
private:
// helper functions for writing to a file
void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
- bool swapEventBuffers(struct timespec* deadline);
+ bool swapEventBuffers(struct timeval* deadline);
bool initBufferAndWriteThread();
// control for writer thread
static void* startWriterThread(void* ptr) {
- (((TFileTransport*)ptr)->writerThread());
- return 0;
+ static_cast<TFileTransport*>(ptr)->writerThread();
+ return NULL;
}
void writerThread();
@@ -326,7 +320,7 @@ class TFileTransport : public TFileReaderTransport,
// Utility functions
void openLogFile();
- void getNextFlushTime(struct timespec* ts_next_flush);
+ void getNextFlushTime(struct timeval* ts_next_flush);
// Class variables
readState readState_;
@@ -375,12 +369,9 @@ class TFileTransport : public TFileReaderTransport,
uint32_t writerThreadIOErrorSleepTime_;
static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
- // writer thread id
-#ifdef USE_BOOST_THREAD
- std::auto_ptr<boost::thread> writerThreadId_;
-#else
- pthread_t writerThreadId_;
-#endif
+ // writer thread
+ apache::thrift::concurrency::PlatformThreadFactory threadFactory_;
+ boost::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;
// buffers to hold data before it is flushed. Each element of the buffer stores a msg that
// needs to be written to the file. The buffers are swapped by the writer thread.
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/transport/TPipe.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp
index 464272d..0b5080b 100644
--- a/lib/cpp/src/thrift/transport/TPipe.cpp
+++ b/lib/cpp/src/thrift/transport/TPipe.cpp
@@ -108,7 +108,7 @@ void TPipe::open() {
NULL); // no template file
if (hPipe_ == INVALID_HANDLE_VALUE)
- sleep(SleepInterval);
+ ::Sleep(SleepInterval);
else
break;
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/transport/TSSLSocket.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
index 14c13dc..bf29c41 100644
--- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
@@ -37,6 +37,7 @@
#include <openssl/x509v3.h>
#include <thrift/concurrency/Mutex.h>
#include "TSSLSocket.h"
+#include "PlatformSocket.h"
#define OPENSSL_VERSION_NO_THREAD_ID 0x10000000L
@@ -105,8 +106,9 @@ bool TSSLSocket::isOpen() {
return false;
}
int shutdown = SSL_get_shutdown(ssl_);
- bool shutdownReceived = (shutdown & SSL_RECEIVED_SHUTDOWN);
- bool shutdownSent = (shutdown & SSL_SENT_SHUTDOWN);
+ // "!!" is squelching C4800 "forcing bool -> true or false" perfomance warning
+ bool shutdownReceived = !!(shutdown & SSL_RECEIVED_SHUTDOWN);
+ bool shutdownSent = !!(shutdown & SSL_SENT_SHUTDOWN);
if (shutdownReceived && shutdownSent) {
return false;
}
@@ -122,7 +124,7 @@ bool TSSLSocket::peek() {
uint8_t byte;
rc = SSL_peek(ssl_, &byte, 1);
if (rc < 0) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
string errors;
buildErrors(errors, errno_copy);
throw TSSLException("SSL_peek: " + errors);
@@ -147,7 +149,7 @@ void TSSLSocket::close() {
rc = SSL_shutdown(ssl_);
}
if (rc < 0) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
string errors;
buildErrors(errors, errno_copy);
GlobalOutput(("SSL_shutdown: " + errors).c_str());
@@ -166,9 +168,9 @@ uint32_t TSSLSocket::read(uint8_t* buf, uint32_t len) {
bytes = SSL_read(ssl_, buf, len);
if (bytes >= 0)
break;
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
if (SSL_get_error(ssl_, bytes) == SSL_ERROR_SYSCALL) {
- if (ERR_get_error() == 0 && errno_copy == EINTR) {
+ if (ERR_get_error() == 0 && errno_copy == THRIFT_EINTR) {
continue;
}
}
@@ -186,7 +188,7 @@ void TSSLSocket::write(const uint8_t* buf, uint32_t len) {
while (written < len) {
int32_t bytes = SSL_write(ssl_, &buf[written], len - written);
if (bytes <= 0) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
string errors;
buildErrors(errors, errno_copy);
throw TSSLException("SSL_write: " + errors);
@@ -206,7 +208,7 @@ void TSSLSocket::flush() {
throw TSSLException("SSL_get_wbio returns NULL");
}
if (BIO_flush(bio) != 1) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
string errors;
buildErrors(errors, errno_copy);
throw TSSLException("BIO_flush: " + errors);
@@ -229,7 +231,7 @@ void TSSLSocket::checkHandshake() {
rc = SSL_connect(ssl_);
}
if (rc <= 0) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
string fname(server() ? "SSL_accept" : "SSL_connect");
string errors;
buildErrors(errors, errno_copy);
@@ -426,7 +428,7 @@ void TSSLSocketFactory::loadCertificate(const char* path, const char* format) {
}
if (strcmp(format, "PEM") == 0) {
if (SSL_CTX_use_certificate_chain_file(ctx_->get(), path) == 0) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
string errors;
buildErrors(errors, errno_copy);
throw TSSLException("SSL_CTX_use_certificate_chain_file: " + errors);
@@ -443,7 +445,7 @@ void TSSLSocketFactory::loadPrivateKey(const char* path, const char* format) {
}
if (strcmp(format, "PEM") == 0) {
if (SSL_CTX_use_PrivateKey_file(ctx_->get(), path, SSL_FILETYPE_PEM) == 0) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
string errors;
buildErrors(errors, errno_copy);
throw TSSLException("SSL_CTX_use_PrivateKey_file: " + errors);
@@ -457,7 +459,7 @@ void TSSLSocketFactory::loadTrustedCertificates(const char* path) {
"loadTrustedCertificates: <path> is NULL");
}
if (SSL_CTX_load_verify_locations(ctx_->get(), path, NULL) == 0) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
string errors;
buildErrors(errors, errno_copy);
throw TSSLException("SSL_CTX_load_verify_locations: " + errors);
@@ -579,7 +581,7 @@ void buildErrors(string& errors, int errno_copy) {
}
const char* reason = ERR_reason_error_string(errorCode);
if (reason == NULL) {
- snprintf(message, sizeof(message) - 1, "SSL error # %lu", errorCode);
+ THRIFT_SNPRINTF(message, sizeof(message) - 1, "SSL error # %lu", errorCode);
reason = message;
}
errors += reason;
@@ -598,7 +600,7 @@ void buildErrors(string& errors, int errno_copy) {
* Default implementation of AccessManager
*/
Decision DefaultClientAccessManager::verify(const sockaddr_storage& sa)
- throw() {
+ throw() {
(void) sa;
return SKIP;
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/transport/TServerSocket.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.cpp b/lib/cpp/src/thrift/transport/TServerSocket.cpp
index 1000367..cb3833e 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TServerSocket.cpp
@@ -39,13 +39,13 @@
#include <netdb.h>
#endif
#include <fcntl.h>
-#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "TSocket.h"
#include "TServerSocket.h"
+#include "PlatformSocket.h"
#include <boost/shared_ptr.hpp>
#ifndef AF_LOCAL
@@ -155,9 +155,9 @@ void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
}
void TServerSocket::listen() {
- SOCKET sv[2];
- if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
- GlobalOutput.perror("TServerSocket::listen() socketpair() ", errno);
+ THRIFT_SOCKET sv[2];
+ if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput.perror("TServerSocket::listen() socketpair() ", THRIFT_GET_SOCKET_ERROR);
intSock1_ = -1;
intSock2_ = -1;
} else {
@@ -177,7 +177,7 @@ void TServerSocket::listen() {
// Wildcard address
error = getaddrinfo(NULL, port, &hints, &res0);
if (error) {
- GlobalOutput.printf("getaddrinfo %d: %s", error, gai_strerror(error));
+ GlobalOutput.printf("getaddrinfo %d: %s", error, THRIFT_GAI_STRERROR(error));
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for server socket.");
}
@@ -196,27 +196,32 @@ void TServerSocket::listen() {
}
if (serverSocket_ == -1) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.", errno_copy);
}
- // Set reusaddress to prevent 2MSL delay on accept
+ // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
int one = 1;
- if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR,
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING,
cast_sockopt(&one), sizeof(one))) {
- int errno_copy = errno;
- GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_REUSEADDR ", errno_copy);
+ //ignore errors coming out of this setsockopt on Windows. This is because
+ //SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
+ //want to force servers to be an admin.
+#ifndef _WIN32
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ", errno_copy);
close();
- throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_REUSEADDR", errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set THRIFT_NO_SOCKET_CACHING", errno_copy);
+#endif
}
// Set TCP buffer sizes
if (tcpSendBuffer_ > 0) {
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_SNDBUF,
cast_sockopt(&tcpSendBuffer_), sizeof(tcpSendBuffer_))) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_SNDBUF", errno_copy);
@@ -226,7 +231,7 @@ void TServerSocket::listen() {
if (tcpRecvBuffer_ > 0) {
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_RCVBUF,
cast_sockopt(&tcpRecvBuffer_), sizeof(tcpRecvBuffer_))) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_RCVBUF", errno_copy);
@@ -237,7 +242,7 @@ void TServerSocket::listen() {
#ifdef TCP_DEFER_ACCEPT
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
&one, sizeof(one))) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT", errno_copy);
@@ -249,7 +254,7 @@ void TServerSocket::listen() {
int zero = 0;
if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY,
cast_sockopt(&zero), sizeof(zero))) {
- GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", errno);
+ GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
}
}
#endif // #ifdef IPV6_V6ONLY
@@ -258,7 +263,7 @@ void TServerSocket::listen() {
struct linger ling = {0, 0};
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
cast_sockopt(&ling), sizeof(ling))) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
@@ -269,7 +274,7 @@ void TServerSocket::listen() {
// TCP Nodelay, speed over bandwidth
if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
cast_sockopt(&one), sizeof(one))) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
@@ -277,21 +282,21 @@ void TServerSocket::listen() {
}
// Set NONBLOCK on the accept socket
- int flags = fcntl(serverSocket_, F_GETFL, 0);
+ int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
if (flags == -1) {
- int errno_copy = errno;
- GlobalOutput.perror("TServerSocket::listen() fcntl() F_GETFL ", errno_copy);
- throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
}
- if (-1 == fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK)) {
- int errno_copy = errno;
- GlobalOutput.perror("TServerSocket::listen() fcntl() O_NONBLOCK ", errno_copy);
- throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
+ if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
}
// prepare the port information
- // we may want to try to bind more than once, since SO_REUSEADDR doesn't
+ // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
// always seem to work. The client can configure the retry variables.
int retries = 0;
@@ -304,13 +309,13 @@ void TServerSocket::listen() {
socklen_t len;
if (path_.length() > sizeof(address.sun_path)) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TSocket::listen() Unix Domain socket path too long", errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
}
address.sun_family = AF_UNIX;
- snprintf(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());
+ THRIFT_SNPRINTF(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());
len = sizeof(address);
do {
@@ -318,7 +323,7 @@ void TServerSocket::listen() {
break;
}
// use short circuit evaluation here to only sleep if we need to
- } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+ } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
#else
GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path not supported");
@@ -329,7 +334,7 @@ void TServerSocket::listen() {
break;
}
// use short circuit evaluation here to only sleep if we need to
- } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+ } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
// free addrinfo
freeaddrinfo(res0);
@@ -347,12 +352,12 @@ void TServerSocket::listen() {
GlobalOutput(errbuf);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not bind",
- errno);
+ THRIFT_GET_SOCKET_ERROR);
}
// Call listen
if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
@@ -366,7 +371,7 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
}
- struct pollfd fds[2];
+ struct THRIFT_POLLFD fds[2];
int maxEintrs = 5;
int numEintrs = 0;
@@ -374,71 +379,71 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
while (true) {
std::memset(fds, 0 , sizeof(fds));
fds[0].fd = serverSocket_;
- fds[0].events = POLLIN;
+ fds[0].events = THRIFT_POLLIN;
if (intSock2_ != -1) {
fds[1].fd = intSock2_;
- fds[1].events = POLLIN;
+ fds[1].events = THRIFT_POLLIN;
}
/*
- TODO: if EINTR is received, we'll restart the timeout.
+ TODO: if THRIFT_EINTR is received, we'll restart the timeout.
To be accurate, we need to fix this in the future.
*/
- int ret = poll(fds, 2, accTimeout_);
+ int ret = THRIFT_POLL(fds, 2, accTimeout_);
if (ret < 0) {
// error cases
- if (errno == EINTR && (numEintrs++ < maxEintrs)) {
- // EINTR needs to be handled manually and we can tolerate
+ if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && (numEintrs++ < maxEintrs)) {
+ // THRIFT_EINTR needs to be handled manually and we can tolerate
// a certain number
continue;
}
- int errno_copy = errno;
- GlobalOutput.perror("TServerSocket::acceptImpl() poll() ", errno_copy);
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_POLL() ", errno_copy);
throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
} else if (ret > 0) {
// Check for an interrupt signal
- if (intSock2_ != -1 && (fds[1].revents & POLLIN)) {
+ if (intSock2_ != -1 && (fds[1].revents & THRIFT_POLLIN)) {
int8_t buf;
if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
- GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", errno);
+ GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", THRIFT_GET_SOCKET_ERROR);
}
throw TTransportException(TTransportException::INTERRUPTED);
}
// Check for the actual server socket being ready
- if (fds[0].revents & POLLIN) {
+ if (fds[0].revents & THRIFT_POLLIN) {
break;
}
} else {
- GlobalOutput("TServerSocket::acceptImpl() poll 0");
+ GlobalOutput("TServerSocket::acceptImpl() THRIFT_POLL 0");
throw TTransportException(TTransportException::UNKNOWN);
}
}
struct sockaddr_storage clientAddress;
int size = sizeof(clientAddress);
- SOCKET clientSocket = ::accept(serverSocket_,
+ THRIFT_SOCKET clientSocket = ::accept(serverSocket_,
(struct sockaddr *) &clientAddress,
(socklen_t *) &size);
if (clientSocket == -1) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy);
throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
}
// Make sure client socket is blocking
- int flags = fcntl(clientSocket, F_GETFL, 0);
+ int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0);
if (flags == -1) {
- int errno_copy = errno;
- GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_GETFL ", errno_copy);
- throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_GETFL)", errno_copy);
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "THRIFT_FCNTL(THRIFT_F_GETFL)", errno_copy);
}
- if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) {
- int errno_copy = errno;
- GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_SETFL ~O_NONBLOCK ", errno_copy);
- throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_SETFL)", errno_copy);
+ if (-1 == THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "THRIFT_FCNTL(THRIFT_F_SETFL)", errno_copy);
}
shared_ptr<TSocket> client = createSocket(clientSocket);
@@ -453,7 +458,7 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
return client;
}
-shared_ptr<TSocket> TServerSocket::createSocket(SOCKET clientSocket) {
+shared_ptr<TSocket> TServerSocket::createSocket(THRIFT_SOCKET clientSocket) {
return shared_ptr<TSocket>(new TSocket(clientSocket));
}
@@ -461,28 +466,21 @@ void TServerSocket::interrupt() {
if (intSock1_ != -1) {
int8_t byte = 0;
if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0)) {
- GlobalOutput.perror("TServerSocket::interrupt() send() ", errno);
+ GlobalOutput.perror("TServerSocket::interrupt() send() ", THRIFT_GET_SOCKET_ERROR);
}
}
}
void TServerSocket::close() {
if (serverSocket_ != -1) {
-
-#ifdef _WIN32
- shutdown(serverSocket_, SD_BOTH);
- ::closesocket(serverSocket_);
-#else
- shutdown(serverSocket_, SHUT_RDWR);
- ::close(serverSocket_);
-#endif
-
+ shutdown(serverSocket_, THRIFT_SHUT_RDWR);
+ ::THRIFT_CLOSESOCKET(serverSocket_);
}
if (intSock1_ != -1) {
- ::close(intSock1_);
+ ::THRIFT_CLOSESOCKET(intSock1_);
}
if (intSock2_ != -1) {
- ::close(intSock2_);
+ ::THRIFT_CLOSESOCKET(intSock2_);
}
serverSocket_ = -1;
intSock1_ = -1;
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/transport/TServerSocket.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.h b/lib/cpp/src/thrift/transport/TServerSocket.h
index e562a19..17a00b6 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.h
+++ b/lib/cpp/src/thrift/transport/TServerSocket.h
@@ -21,10 +21,8 @@
#define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1
#include "TServerTransport.h"
+#include "PlatformSocket.h"
#include <boost/shared_ptr.hpp>
-#ifndef _WIN32
- typedef int SOCKET;
-#endif
namespace apache { namespace thrift { namespace transport {
@@ -64,12 +62,12 @@ class TServerSocket : public TServerTransport {
protected:
boost::shared_ptr<TTransport> acceptImpl();
- virtual boost::shared_ptr<TSocket> createSocket(SOCKET client);
+ virtual boost::shared_ptr<TSocket> createSocket(THRIFT_SOCKET client);
private:
int port_;
std::string path_;
- SOCKET serverSocket_;
+ THRIFT_SOCKET serverSocket_;
int acceptBacklog_;
int sendTimeout_;
int recvTimeout_;
@@ -79,8 +77,8 @@ class TServerSocket : public TServerTransport {
int tcpSendBuffer_;
int tcpRecvBuffer_;
- SOCKET intSock1_;
- SOCKET intSock2_;
+ THRIFT_SOCKET intSock1_;
+ THRIFT_SOCKET intSock2_;
};
}}} // apache::thrift::transport
http://git-wip-us.apache.org/repos/asf/thrift/blob/7cb7fc8a/lib/cpp/src/thrift/transport/TSocket.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp
index e59f4a1..ba0b1e3 100644
--- a/lib/cpp/src/thrift/transport/TSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSocket.cpp
@@ -39,12 +39,12 @@
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
-#include <errno.h>
#include <fcntl.h>
#include <thrift/concurrency/Monitor.h>
#include "TSocket.h"
#include "TTransportException.h"
+#include "PlatformSocket.h"
#ifndef SOCKOPT_CAST_T
# ifndef _WIN32
@@ -126,7 +126,7 @@ TSocket::TSocket() :
cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
}
-TSocket::TSocket(SOCKET socket) :
+TSocket::TSocket(THRIFT_SOCKET socket) :
host_(""),
port_(0),
path_(""),
@@ -158,13 +158,13 @@ bool TSocket::peek() {
uint8_t buf;
int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK));
if (r == -1) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
#if defined __FreeBSD__ || defined __MACH__
/* shigin:
- * freebsd returns -1 and ECONNRESET if socket was closed by
+ * freebsd returns -1 and THRIFT_ECONNRESET if socket was closed by
* the other side
*/
- if (errno_copy == ECONNRESET)
+ if (errno_copy == THRIFT_ECONNRESET)
{
close();
return false;
@@ -189,7 +189,7 @@ void TSocket::openConnection(struct addrinfo *res) {
}
if (socket_ == -1) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
}
@@ -220,18 +220,18 @@ void TSocket::openConnection(struct addrinfo *res) {
// Set the socket to be non blocking for connect if a timeout exists
- int flags = fcntl(socket_, F_GETFL, 0);
+ int flags = THRIFT_FCNTL(socket_, THRIFT_F_GETFL, 0);
if (connTimeout_ > 0) {
- if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) {
- int errno_copy = errno;
- GlobalOutput.perror("TSocket::open() fcntl() " + getSocketInfo(), errno_copy);
- throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
+ if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TSocket::open() THRIFT_FCNTL() " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
}
} else {
- if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) {
- int errno_copy = errno;
- GlobalOutput.perror("TSocket::open() fcntl " + getSocketInfo(), errno_copy);
- throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
+ if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
}
}
@@ -245,13 +245,13 @@ void TSocket::openConnection(struct addrinfo *res) {
socklen_t len;
if (path_.length() > sizeof(address.sun_path)) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
}
address.sun_family = AF_UNIX;
- snprintf(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());
+ THRIFT_SNPRINTF(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());
len = sizeof(address);
ret = connect(socket_, (struct sockaddr *) &address, len);
@@ -269,18 +269,18 @@ void TSocket::openConnection(struct addrinfo *res) {
goto done;
}
- if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK)) {
- int errno_copy = errno;
+ if ((THRIFT_GET_SOCKET_ERROR != THRIFT_EINPROGRESS) && (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK)) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy);
}
- struct pollfd fds[1];
+ struct THRIFT_POLLFD fds[1];
std::memset(fds, 0 , sizeof(fds));
fds[0].fd = socket_;
- fds[0].events = POLLOUT;
- ret = poll(fds, 1, connTimeout_);
+ fds[0].events = THRIFT_POLLOUT;
+ ret = THRIFT_POLL(fds, 1, connTimeout_);
if (ret > 0) {
// Ensure the socket is connected and that there are no errors set
@@ -289,7 +289,7 @@ void TSocket::openConnection(struct addrinfo *res) {
lon = sizeof(int);
int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);
if (ret2 == -1) {
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy);
}
@@ -297,7 +297,7 @@ void TSocket::openConnection(struct addrinfo *res) {
if (val == 0) {
goto done;
}
- GlobalOutput.perror("TSocket::open() error on socket (after poll) " + getSocketInfo(), val);
+ GlobalOutput.perror("TSocket::open() error on socket (after THRIFT_POLL) " + getSocketInfo(), val);
throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val);
} else if (ret == 0) {
// socket timed out
@@ -305,15 +305,15 @@ void TSocket::openConnection(struct addrinfo *res) {
GlobalOutput(errStr.c_str());
throw TTransportException(TTransportException::NOT_OPEN, "open() timed out");
} else {
- // error on poll()
- int errno_copy = errno;
- GlobalOutput.perror("TSocket::open() poll() " + getSocketInfo(), errno_copy);
- throw TTransportException(TTransportException::NOT_OPEN, "poll() failed", errno_copy);
+ // error on THRIFT_POLL()
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TSocket::open() THRIFT_POLL() " + getSocketInfo(), errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_POLL() failed", errno_copy);
}
done:
// Set socket back to normal mode (blocking)
- fcntl(socket_, F_SETFL, flags);
+ THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags);
if (path_.empty()) {
setCachedAddress(res->ai_addr, static_cast<socklen_t>(res->ai_addrlen));
@@ -367,7 +367,7 @@ void TSocket::local_open(){
error = getaddrinfo(host_.c_str(), port, &hints, &res0);
if (error) {
- string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo() + string(gai_strerror(error));
+ string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo() + string(THRIFT_GAI_STRERROR(error));
GlobalOutput(errStr.c_str());
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for client socket.");
@@ -396,20 +396,13 @@ void TSocket::local_open(){
void TSocket::close() {
if (socket_ != -1) {
-
-#ifdef _WIN32
- shutdown(socket_, SD_BOTH);
- ::closesocket(socket_);
-#else
- shutdown(socket_, SHUT_RDWR);
- ::close(socket_);
-#endif
-
+ shutdown(socket_, THRIFT_SHUT_RDWR);
+ ::THRIFT_CLOSESOCKET(socket_);
}
socket_ = -1;
}
-void TSocket::setSocketFD(int socket) {
+void TSocket::setSocketFD(THRIFT_SOCKET socket) {
if (socket_ != -1) {
close();
}
@@ -423,10 +416,10 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
int32_t retries = 0;
- // EAGAIN can be signalled both when a timeout has occurred and when
+ // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when
// the system is out of resources (an awesome undocumented feature).
// The following is an approximation of the time interval under which
- // EAGAIN is taken to indicate an out of resources error.
+ // THRIFT_EAGAIN is taken to indicate an out of resources error.
uint32_t eagainThresholdMicros = 0;
if (recvTimeout_) {
// if a readTimeout is specified along with a max number of recv retries, then
@@ -439,55 +432,55 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
// Read from the socket
struct timeval begin;
if (recvTimeout_ > 0) {
- gettimeofday(&begin, NULL);
+ THRIFT_GETTIMEOFDAY(&begin, NULL);
} else {
// if there is no read timeout we don't need the TOD to determine whether
- // an EAGAIN is due to a timeout or an out-of-resource condition.
+ // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
begin.tv_sec = begin.tv_usec = 0;
}
int got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
- int errno_copy = errno; //gettimeofday can change errno
+ int errno_copy = THRIFT_GET_SOCKET_ERROR; //THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
++g_socket_syscalls;
// Check for error on read
if (got < 0) {
- if (errno_copy == EAGAIN) {
+ if (errno_copy == THRIFT_EAGAIN) {
// if no timeout we can assume that resource exhaustion has occurred.
if (recvTimeout_ == 0) {
throw TTransportException(TTransportException::TIMED_OUT,
- "EAGAIN (unavailable resources)");
+ "THRIFT_EAGAIN (unavailable resources)");
}
// check if this is the lack of resources or timeout case
struct timeval end;
- gettimeofday(&end, NULL);
+ THRIFT_GETTIMEOFDAY(&end, NULL);
uint32_t readElapsedMicros = static_cast<uint32_t>(
((end.tv_sec - begin.tv_sec) * 1000 * 1000)
+ (((uint64_t)(end.tv_usec - begin.tv_usec))));
if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
if (retries++ < maxRecvRetries_) {
- usleep(50);
+ THRIFT_SLEEP_USEC(50);
goto try_again;
} else {
throw TTransportException(TTransportException::TIMED_OUT,
- "EAGAIN (unavailable resources)");
+ "THRIFT_EAGAIN (unavailable resources)");
}
} else {
// infer that timeout has been hit
throw TTransportException(TTransportException::TIMED_OUT,
- "EAGAIN (timed out)");
+ "THRIFT_EAGAIN (timed out)");
}
}
// If interrupted, try again
- if (errno_copy == EINTR && retries++ < maxRecvRetries_) {
+ if (errno_copy == THRIFT_EINTR && retries++ < maxRecvRetries_) {
goto try_again;
}
#if defined __FreeBSD__ || defined __MACH__
- if (errno_copy == ECONNRESET) {
+ if (errno_copy == THRIFT_ECONNRESET) {
/* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
- * ECONNRESET if peer performed shutdown
+ * THRIFT_ECONNRESET if peer performed shutdown
* edhall: eliminated close() since we do that in the destructor.
*/
return 0;
@@ -504,18 +497,18 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
// If we disconnect with no linger time
- if (errno_copy == ECONNRESET) {
- throw TTransportException(TTransportException::NOT_OPEN, "ECONNRESET");
+ if (errno_copy == THRIFT_ECONNRESET) {
+ throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ECONNRESET");
}
// This ish isn't open
- if (errno_copy == ENOTCONN) {
- throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN");
+ if (errno_copy == THRIFT_ENOTCONN) {
+ throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ENOTCONN");
}
// Timed out!
- if (errno_copy == ETIMEDOUT) {
- throw TTransportException(TTransportException::TIMED_OUT, "ETIMEDOUT");
+ if (errno_copy == THRIFT_ETIMEDOUT) {
+ throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT");
}
// Some other error, whatevz
@@ -558,7 +551,7 @@ uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
int flags = 0;
#ifdef MSG_NOSIGNAL
// Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
- // check for the EPIPE return condition and close the socket in that case
+ // check for the THRIFT_EPIPE return condition and close the socket in that case
flags |= MSG_NOSIGNAL;
#endif // ifdef MSG_NOSIGNAL
@@ -566,14 +559,14 @@ uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
++g_socket_syscalls;
if (b < 0) {
- if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ if (THRIFT_GET_SOCKET_ERROR == THRIFT_EWOULDBLOCK || THRIFT_GET_SOCKET_ERROR == THRIFT_EAGAIN) {
return 0;
}
// Fail on a send error
- int errno_copy = errno;
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy);
- if (errno_copy == EPIPE || errno_copy == ECONNRESET || errno_copy == ENOTCONN) {
+ if (errno_copy == THRIFT_EPIPE || errno_copy == THRIFT_ECONNRESET || errno_copy == THRIFT_ENOTCONN) {
close();
throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
}
@@ -614,7 +607,7 @@ void TSocket::setLinger(bool on, int linger) {
struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&l), sizeof(l));
if (ret == -1) {
- int errno_copy = errno; // Copy errno because we're allocating memory.
+ int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy);
}
}
@@ -629,7 +622,7 @@ void TSocket::setNoDelay(bool noDelay) {
int v = noDelay_ ? 1 : 0;
int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&v), sizeof(v));
if (ret == -1) {
- int errno_copy = errno; // Copy errno because we're allocating memory.
+ int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy);
}
}
@@ -654,11 +647,11 @@ void TSocket::setRecvTimeout(int ms) {
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
- // Copy because poll may modify
+ // Copy because THRIFT_POLL may modify
struct timeval r = recvTimeval_;
int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, cast_sockopt(&r), sizeof(r));
if (ret == -1) {
- int errno_copy = errno; // Copy errno because we're allocating memory.
+ int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
GlobalOutput.perror("TSocket::setRecvTimeout() setsockopt() " + getSocketInfo(), errno_copy);
}
}
@@ -680,7 +673,7 @@ void TSocket::setSendTimeout(int ms) {
(int)((sendTimeout_%1000)*1000)};
int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, cast_sockopt(&s), sizeof(s));
if (ret == -1) {
- int errno_copy = errno; // Copy errno because we're allocating memory.
+ int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
GlobalOutput.perror("TSocket::setSendTimeout() setsockopt() " + getSocketInfo(), errno_copy);
}
}