You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2008/12/08 05:25:51 UTC
svn commit: r724247 [3/6] - in /hadoop/hive/trunk/service: ./ if/ include/
include/thrift/ include/thrift/concurrency/ include/thrift/fb303/
include/thrift/fb303/if/ include/thrift/if/ include/thrift/processor/
include/thrift/protocol/ include/thrift/s...
Added: hadoop/hive/trunk/service/include/thrift/protocol/TOneWayProtocol.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/protocol/TOneWayProtocol.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/protocol/TOneWayProtocol.h (added)
+++ hadoop/hive/trunk/service/include/thrift/protocol/TOneWayProtocol.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,147 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_PROTOCOL_TONEWAYPROTOCOL_H_
+#define _THRIFT_PROTOCOL_TONEWAYPROTOCOL_H_ 1
+
+#include "TProtocol.h"
+
+namespace facebook { namespace thrift { namespace protocol {
+
+/**
+ * Abstract class for implementing a protocol that can only be written,
+ * not read.
+ *
+ * @author David Reiss <dr...@facebook.com>
+ */
+class TWriteOnlyProtocol : public TProtocol {
+ public:
+ /**
+ * @param subclass_name The name of the concrete subclass.
+ */
+ TWriteOnlyProtocol(boost::shared_ptr<TTransport> trans,
+ const std::string& subclass_name)
+ : TProtocol(trans)
+ , subclass_(subclass_name)
+ {}
+
+ // All writing functions remain abstract.
+
+ /**
+ * Reading functions all throw an exception.
+ */
+
+ uint32_t readMessageBegin(std::string& name,
+ TMessageType& messageType,
+ int32_t& seqid) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readMessageEnd() {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readStructBegin(std::string& name) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readStructEnd() {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readFieldBegin(std::string& name,
+ TType& fieldType,
+ int16_t& fieldId) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readFieldEnd() {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readMapBegin(TType& keyType,
+ TType& valType,
+ uint32_t& size) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readMapEnd() {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readListBegin(TType& elemType,
+ uint32_t& size) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readListEnd() {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readSetBegin(TType& elemType,
+ uint32_t& size) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readSetEnd() {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readBool(bool& value) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readByte(int8_t& byte) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readI16(int16_t& i16) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readI32(int32_t& i32) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readI64(int64_t& i64) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readDouble(double& dub) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+ uint32_t readString(std::string& str) {
+ throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
+ subclass_ + " does not support reading (yet).");
+ }
+
+
+ private:
+ std::string subclass_;
+};
+
+}}} // facebook::thrift::protocol
+
+#endif // #ifndef _THRIFT_PROTOCOL_TBINARYPROTOCOL_H_
Added: hadoop/hive/trunk/service/include/thrift/protocol/TProtocol.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/protocol/TProtocol.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/protocol/TProtocol.h (added)
+++ hadoop/hive/trunk/service/include/thrift/protocol/TProtocol.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,353 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_PROTOCOL_TPROTOCOL_H_
+#define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1
+
+#include <transport/TTransport.h>
+#include <protocol/TProtocolException.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include <netinet/in.h>
+#include <sys/types.h>
+#include <string>
+#include <map>
+
+namespace facebook { namespace thrift { namespace protocol {
+
+using facebook::thrift::transport::TTransport;
+
+#ifdef HAVE_ENDIAN_H
+#include <endian.h>
+#endif
+
+#ifndef __BYTE_ORDER
+# if defined(BYTE_ORDER) && defined(LITTLE_ENDIAN) && defined(BIG_ENDIAN)
+# define __BYTE_ORDER BYTE_ORDER
+# define __LITTLE_ENDIAN LITTLE_ENDIAN
+# define __BIG_ENDIAN BIG_ENDIAN
+# else
+# error "Cannot determine endianness"
+# endif
+#endif
+
+#if __BYTE_ORDER == __BIG_ENDIAN
+# define ntohll(n) (n)
+# define htonll(n) (n)
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+# if defined(__GNUC__) && defined(__GLIBC__)
+# include <byteswap.h>
+# define ntohll(n) bswap_64(n)
+# define htonll(n) bswap_64(n)
+# else /* GNUC & GLIBC */
+# define ntohll(n) ( (((unsigned long long)ntohl(n)) << 32) + ntohl(n >> 32) )
+# define htonll(n) ( (((unsigned long long)htonl(n)) << 32) + htonl(n >> 32) )
+# endif /* GNUC & GLIBC */
+#else /* __BYTE_ORDER */
+# error "Can't define htonll or ntohll!"
+#endif
+
+/**
+ * Enumerated definition of the types that the Thrift protocol supports.
+ * Take special note of the T_END type which is used specifically to mark
+ * the end of a sequence of fields.
+ */
+enum TType {
+ T_STOP = 0,
+ T_VOID = 1,
+ T_BOOL = 2,
+ T_BYTE = 3,
+ T_I08 = 3,
+ T_I16 = 6,
+ T_I32 = 8,
+ T_U64 = 9,
+ T_I64 = 10,
+ T_DOUBLE = 4,
+ T_STRING = 11,
+ T_UTF7 = 11,
+ T_STRUCT = 12,
+ T_MAP = 13,
+ T_SET = 14,
+ T_LIST = 15,
+ T_UTF8 = 16,
+ T_UTF16 = 17
+};
+
+/**
+ * Enumerated definition of the message types that the Thrift protocol
+ * supports.
+ */
+enum TMessageType {
+ T_CALL = 1,
+ T_REPLY = 2,
+ T_EXCEPTION = 3
+};
+
+/**
+ * Abstract class for a thrift protocol driver. These are all the methods that
+ * a protocol must implement. Essentially, there must be some way of reading
+ * and writing all the base types, plus a mechanism for writing out structs
+ * with indexed fields.
+ *
+ * TProtocol objects should not be shared across multiple encoding contexts,
+ * as they may need to maintain internal state in some protocols (i.e. XML).
+ * Note that is is acceptable for the TProtocol module to do its own internal
+ * buffered reads/writes to the underlying TTransport where appropriate (i.e.
+ * when parsing an input XML stream, reading should be batched rather than
+ * looking ahead character by character for a close tag).
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TProtocol {
+ public:
+ virtual ~TProtocol() {}
+
+ /**
+ * Writing functions.
+ */
+
+ virtual uint32_t writeMessageBegin(const std::string& name,
+ const TMessageType messageType,
+ const int32_t seqid) = 0;
+
+ virtual uint32_t writeMessageEnd() = 0;
+
+
+ virtual uint32_t writeStructBegin(const std::string& name) = 0;
+
+ virtual uint32_t writeStructEnd() = 0;
+
+ virtual uint32_t writeFieldBegin(const std::string& name,
+ const TType fieldType,
+ const int16_t fieldId) = 0;
+
+ virtual uint32_t writeFieldEnd() = 0;
+
+ virtual uint32_t writeFieldStop() = 0;
+
+ virtual uint32_t writeMapBegin(const TType keyType,
+ const TType valType,
+ const uint32_t size) = 0;
+
+ virtual uint32_t writeMapEnd() = 0;
+
+ virtual uint32_t writeListBegin(const TType elemType,
+ const uint32_t size) = 0;
+
+ virtual uint32_t writeListEnd() = 0;
+
+ virtual uint32_t writeSetBegin(const TType elemType,
+ const uint32_t size) = 0;
+
+ virtual uint32_t writeSetEnd() = 0;
+
+ virtual uint32_t writeBool(const bool value) = 0;
+
+ virtual uint32_t writeByte(const int8_t byte) = 0;
+
+ virtual uint32_t writeI16(const int16_t i16) = 0;
+
+ virtual uint32_t writeI32(const int32_t i32) = 0;
+
+ virtual uint32_t writeI64(const int64_t i64) = 0;
+
+ virtual uint32_t writeDouble(const double dub) = 0;
+
+ virtual uint32_t writeString(const std::string& str) = 0;
+
+ /**
+ * Reading functions
+ */
+
+ virtual uint32_t readMessageBegin(std::string& name,
+ TMessageType& messageType,
+ int32_t& seqid) = 0;
+
+ virtual uint32_t readMessageEnd() = 0;
+
+ virtual uint32_t readStructBegin(std::string& name) = 0;
+
+ virtual uint32_t readStructEnd() = 0;
+
+ virtual uint32_t readFieldBegin(std::string& name,
+ TType& fieldType,
+ int16_t& fieldId) = 0;
+
+ virtual uint32_t readFieldEnd() = 0;
+
+ virtual uint32_t readMapBegin(TType& keyType,
+ TType& valType,
+ uint32_t& size) = 0;
+
+ virtual uint32_t readMapEnd() = 0;
+
+ virtual uint32_t readListBegin(TType& elemType,
+ uint32_t& size) = 0;
+
+ virtual uint32_t readListEnd() = 0;
+
+ virtual uint32_t readSetBegin(TType& elemType,
+ uint32_t& size) = 0;
+
+ virtual uint32_t readSetEnd() = 0;
+
+ virtual uint32_t readBool(bool& value) = 0;
+
+ virtual uint32_t readByte(int8_t& byte) = 0;
+
+ virtual uint32_t readI16(int16_t& i16) = 0;
+
+ virtual uint32_t readI32(int32_t& i32) = 0;
+
+ virtual uint32_t readI64(int64_t& i64) = 0;
+
+ virtual uint32_t readDouble(double& dub) = 0;
+
+ virtual uint32_t readString(std::string& str) = 0;
+
+ /**
+ * Method to arbitrarily skip over data.
+ */
+ uint32_t skip(TType type) {
+ switch (type) {
+ case T_BOOL:
+ {
+ bool boolv;
+ return readBool(boolv);
+ }
+ case T_BYTE:
+ {
+ int8_t bytev;
+ return readByte(bytev);
+ }
+ case T_I16:
+ {
+ int16_t i16;
+ return readI16(i16);
+ }
+ case T_I32:
+ {
+ int32_t i32;
+ return readI32(i32);
+ }
+ case T_I64:
+ {
+ int64_t i64;
+ return readI64(i64);
+ }
+ case T_DOUBLE:
+ {
+ double dub;
+ return readDouble(dub);
+ }
+ case T_STRING:
+ {
+ std::string str;
+ return readString(str);
+ }
+ case T_STRUCT:
+ {
+ uint32_t result = 0;
+ std::string name;
+ int16_t fid;
+ TType ftype;
+ result += readStructBegin(name);
+ while (true) {
+ result += readFieldBegin(name, ftype, fid);
+ if (ftype == T_STOP) {
+ break;
+ }
+ result += skip(ftype);
+ result += readFieldEnd();
+ }
+ result += readStructEnd();
+ return result;
+ }
+ case T_MAP:
+ {
+ uint32_t result = 0;
+ TType keyType;
+ TType valType;
+ uint32_t i, size;
+ result += readMapBegin(keyType, valType, size);
+ for (i = 0; i < size; i++) {
+ result += skip(keyType);
+ result += skip(valType);
+ }
+ result += readMapEnd();
+ return result;
+ }
+ case T_SET:
+ {
+ uint32_t result = 0;
+ TType elemType;
+ uint32_t i, size;
+ result += readSetBegin(elemType, size);
+ for (i = 0; i < size; i++) {
+ result += skip(elemType);
+ }
+ result += readSetEnd();
+ return result;
+ }
+ case T_LIST:
+ {
+ uint32_t result = 0;
+ TType elemType;
+ uint32_t i, size;
+ result += readListBegin(elemType, size);
+ for (i = 0; i < size; i++) {
+ result += skip(elemType);
+ }
+ result += readListEnd();
+ return result;
+ }
+ default:
+ return 0;
+ }
+ }
+
+ inline boost::shared_ptr<TTransport> getTransport() {
+ return ptrans_;
+ }
+
+ // TODO: remove these two calls, they are for backwards
+ // compatibility
+ inline boost::shared_ptr<TTransport> getInputTransport() {
+ return ptrans_;
+ }
+ inline boost::shared_ptr<TTransport> getOutputTransport() {
+ return ptrans_;
+ }
+
+ protected:
+ TProtocol(boost::shared_ptr<TTransport> ptrans):
+ ptrans_(ptrans) {
+ trans_ = ptrans.get();
+ }
+
+ boost::shared_ptr<TTransport> ptrans_;
+ TTransport* trans_;
+
+ private:
+ TProtocol() {}
+};
+
+/**
+ * Constructs input and output protocol objects given transports.
+ */
+class TProtocolFactory {
+ public:
+ TProtocolFactory() {}
+
+ virtual ~TProtocolFactory() {}
+
+ virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) = 0;
+};
+
+}}} // facebook::thrift::protocol
+
+#endif // #define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1
Added: hadoop/hive/trunk/service/include/thrift/protocol/TProtocolException.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/protocol/TProtocolException.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/protocol/TProtocolException.h (added)
+++ hadoop/hive/trunk/service/include/thrift/protocol/TProtocolException.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,93 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_PROTOCOL_TPROTOCOLEXCEPTION_H_
+#define _THRIFT_PROTOCOL_TPROTOCOLEXCEPTION_H_ 1
+
+#include <boost/lexical_cast.hpp>
+#include <string>
+
+namespace facebook { namespace thrift { namespace protocol {
+
+/**
+ * Class to encapsulate all the possible types of protocol errors that may
+ * occur in various protocol systems. This provides a sort of generic
+ * wrapper around the shitty UNIX E_ error codes that lets a common code
+ * base of error handling to be used for various types of protocols, i.e.
+ * pipes etc.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TProtocolException : public facebook::thrift::TException {
+ public:
+
+ /**
+ * Error codes for the various types of exceptions.
+ */
+ enum TProtocolExceptionType {
+ UNKNOWN = 0,
+ INVALID_DATA = 1,
+ NEGATIVE_SIZE = 2,
+ SIZE_LIMIT = 3,
+ BAD_VERSION = 4,
+ NOT_IMPLEMENTED = 5,
+ };
+
+ TProtocolException() :
+ facebook::thrift::TException(),
+ type_(UNKNOWN) {}
+
+ TProtocolException(TProtocolExceptionType type) :
+ facebook::thrift::TException(),
+ type_(type) {}
+
+ TProtocolException(const std::string& message) :
+ facebook::thrift::TException(message),
+ type_(UNKNOWN) {}
+
+ TProtocolException(TProtocolExceptionType type, const std::string& message) :
+ facebook::thrift::TException(message),
+ type_(type) {}
+
+ virtual ~TProtocolException() throw() {}
+
+ /**
+ * Returns an error code that provides information about the type of error
+ * that has occurred.
+ *
+ * @return Error code
+ */
+ TProtocolExceptionType getType() {
+ return type_;
+ }
+
+ virtual const char* what() const throw() {
+ if (message_.empty()) {
+ switch (type_) {
+ case UNKNOWN : return "TProtocolException: Unknown protocol exception";
+ case INVALID_DATA : return "TProtocolException: Invalid data";
+ case NEGATIVE_SIZE : return "TProtocolException: Negative size";
+ case SIZE_LIMIT : return "TProtocolException: Exceeded size limit";
+ case BAD_VERSION : return "TProtocolException: Invalid version";
+ case NOT_IMPLEMENTED : return "TProtocolException: Not implemented";
+ default : return "TProtocolException: (Invalid exception type)";
+ }
+ } else {
+ return message_.c_str();
+ }
+ }
+
+ protected:
+ /**
+ * Error code
+ */
+ TProtocolExceptionType type_;
+
+};
+
+}}} // facebook::thrift::protocol
+
+#endif // #ifndef _THRIFT_PROTOCOL_TPROTOCOLEXCEPTION_H_
Added: hadoop/hive/trunk/service/include/thrift/reflection_limited_types.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/reflection_limited_types.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/reflection_limited_types.h (added)
+++ hadoop/hive/trunk/service/include/thrift/reflection_limited_types.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,285 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+#ifndef reflection_limited_TYPES_H
+#define reflection_limited_TYPES_H
+
+#include <Thrift.h>
+#include <reflection_limited_types.h>
+#include <protocol/TProtocol.h>
+#include <transport/TTransport.h>
+
+
+
+namespace facebook { namespace thrift { namespace reflection { namespace limited {
+
+enum TTypeTag {
+ T_VOID = 1,
+ T_BOOL = 2,
+ T_BYTE = 3,
+ T_I16 = 6,
+ T_I32 = 8,
+ T_I64 = 10,
+ T_DOUBLE = 4,
+ T_STRING = 11,
+ T_STRUCT = 12,
+ T_MAP = 13,
+ T_SET = 14,
+ T_LIST = 15,
+ T_ENUM = 101,
+ T_NOT_REFLECTED = 102
+};
+
+class SimpleType {
+ public:
+
+ static char* ascii_fingerprint; // = "19B5240589E680301A7E32DF3971EFBE";
+ static char binary_fingerprint[16]; // = {0x19,0xB5,0x24,0x05,0x89,0xE6,0x80,0x30,0x1A,0x7E,0x32,0xDF,0x39,0x71,0xEF,0xBE};
+
+ SimpleType() : name("") {
+ }
+
+ virtual ~SimpleType() throw() {}
+
+ TTypeTag ttype;
+ std::string name;
+
+ struct __isset {
+ __isset() : ttype(false), name(false) {}
+ bool ttype;
+ bool name;
+ } __isset;
+
+ bool operator == (const SimpleType & rhs) const
+ {
+ if (!(ttype == rhs.ttype))
+ return false;
+ if (!(name == rhs.name))
+ return false;
+ return true;
+ }
+ bool operator != (const SimpleType &rhs) const {
+ return !(*this == rhs);
+ }
+
+ uint32_t read(facebook::thrift::protocol::TProtocol* iprot);
+ uint32_t write(facebook::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+class ContainerType {
+ public:
+
+ static char* ascii_fingerprint; // = "654FA6EFFF8242F4C2A604B970686634";
+ static char binary_fingerprint[16]; // = {0x65,0x4F,0xA6,0xEF,0xFF,0x82,0x42,0xF4,0xC2,0xA6,0x04,0xB9,0x70,0x68,0x66,0x34};
+
+ ContainerType() {
+ }
+
+ virtual ~ContainerType() throw() {}
+
+ TTypeTag ttype;
+ SimpleType subtype1;
+ SimpleType subtype2;
+
+ struct __isset {
+ __isset() : ttype(false), subtype1(false), subtype2(false) {}
+ bool ttype;
+ bool subtype1;
+ bool subtype2;
+ } __isset;
+
+ bool operator == (const ContainerType & rhs) const
+ {
+ if (!(ttype == rhs.ttype))
+ return false;
+ if (!(subtype1 == rhs.subtype1))
+ return false;
+ if (__isset.subtype2 != rhs.__isset.subtype2)
+ return false;
+ else if (__isset.subtype2 && !(subtype2 == rhs.subtype2))
+ return false;
+ return true;
+ }
+ bool operator != (const ContainerType &rhs) const {
+ return !(*this == rhs);
+ }
+
+ uint32_t read(facebook::thrift::protocol::TProtocol* iprot);
+ uint32_t write(facebook::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+class ThriftType {
+ public:
+
+ static char* ascii_fingerprint; // = "76BC1CC759001D7D85FEE75C4F183062";
+ static char binary_fingerprint[16]; // = {0x76,0xBC,0x1C,0xC7,0x59,0x00,0x1D,0x7D,0x85,0xFE,0xE7,0x5C,0x4F,0x18,0x30,0x62};
+
+ ThriftType() : is_container(0) {
+ }
+
+ virtual ~ThriftType() throw() {}
+
+ bool is_container;
+ SimpleType simple_type;
+ ContainerType container_type;
+
+ struct __isset {
+ __isset() : is_container(false), simple_type(false), container_type(false) {}
+ bool is_container;
+ bool simple_type;
+ bool container_type;
+ } __isset;
+
+ bool operator == (const ThriftType & rhs) const
+ {
+ if (!(is_container == rhs.is_container))
+ return false;
+ if (__isset.simple_type != rhs.__isset.simple_type)
+ return false;
+ else if (__isset.simple_type && !(simple_type == rhs.simple_type))
+ return false;
+ if (__isset.container_type != rhs.__isset.container_type)
+ return false;
+ else if (__isset.container_type && !(container_type == rhs.container_type))
+ return false;
+ return true;
+ }
+ bool operator != (const ThriftType &rhs) const {
+ return !(*this == rhs);
+ }
+
+ uint32_t read(facebook::thrift::protocol::TProtocol* iprot);
+ uint32_t write(facebook::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+class Argument {
+ public:
+
+ static char* ascii_fingerprint; // = "8C45506BE0EFBB22FB19FA40DDCECB3F";
+ static char binary_fingerprint[16]; // = {0x8C,0x45,0x50,0x6B,0xE0,0xEF,0xBB,0x22,0xFB,0x19,0xFA,0x40,0xDD,0xCE,0xCB,0x3F};
+
+ Argument() : key(0), name("") {
+ }
+
+ virtual ~Argument() throw() {}
+
+ int16_t key;
+ std::string name;
+ ThriftType type;
+
+ struct __isset {
+ __isset() : key(false), name(false), type(false) {}
+ bool key;
+ bool name;
+ bool type;
+ } __isset;
+
+ bool operator == (const Argument & rhs) const
+ {
+ if (!(key == rhs.key))
+ return false;
+ if (!(name == rhs.name))
+ return false;
+ if (!(type == rhs.type))
+ return false;
+ return true;
+ }
+ bool operator != (const Argument &rhs) const {
+ return !(*this == rhs);
+ }
+
+ uint32_t read(facebook::thrift::protocol::TProtocol* iprot);
+ uint32_t write(facebook::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+class Method {
+ public:
+
+ static char* ascii_fingerprint; // = "E6573428C492D24C84A19432D39A17B0";
+ static char binary_fingerprint[16]; // = {0xE6,0x57,0x34,0x28,0xC4,0x92,0xD2,0x4C,0x84,0xA1,0x94,0x32,0xD3,0x9A,0x17,0xB0};
+
+ Method() : name("") {
+ }
+
+ virtual ~Method() throw() {}
+
+ std::string name;
+ ThriftType return_type;
+ std::vector<Argument> arguments;
+
+ struct __isset {
+ __isset() : name(false), return_type(false), arguments(false) {}
+ bool name;
+ bool return_type;
+ bool arguments;
+ } __isset;
+
+ bool operator == (const Method & rhs) const
+ {
+ if (!(name == rhs.name))
+ return false;
+ if (!(return_type == rhs.return_type))
+ return false;
+ if (!(arguments == rhs.arguments))
+ return false;
+ return true;
+ }
+ bool operator != (const Method &rhs) const {
+ return !(*this == rhs);
+ }
+
+ uint32_t read(facebook::thrift::protocol::TProtocol* iprot);
+ uint32_t write(facebook::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+class Service {
+ public:
+
+ static char* ascii_fingerprint; // = "4673B0777B701D9B02A7A74CEC7908A7";
+ static char binary_fingerprint[16]; // = {0x46,0x73,0xB0,0x77,0x7B,0x70,0x1D,0x9B,0x02,0xA7,0xA7,0x4C,0xEC,0x79,0x08,0xA7};
+
+ Service() : name(""), fully_reflected(0) {
+ }
+
+ virtual ~Service() throw() {}
+
+ std::string name;
+ std::vector<Method> methods;
+ bool fully_reflected;
+
+ struct __isset {
+ __isset() : name(false), methods(false), fully_reflected(false) {}
+ bool name;
+ bool methods;
+ bool fully_reflected;
+ } __isset;
+
+ bool operator == (const Service & rhs) const
+ {
+ if (!(name == rhs.name))
+ return false;
+ if (!(methods == rhs.methods))
+ return false;
+ if (!(fully_reflected == rhs.fully_reflected))
+ return false;
+ return true;
+ }
+ bool operator != (const Service &rhs) const {
+ return !(*this == rhs);
+ }
+
+ uint32_t read(facebook::thrift::protocol::TProtocol* iprot);
+ uint32_t write(facebook::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+}}}} // namespace
+
+#endif
Added: hadoop/hive/trunk/service/include/thrift/server/TNonblockingServer.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/server/TNonblockingServer.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/server/TNonblockingServer.h (added)
+++ hadoop/hive/trunk/service/include/thrift/server/TNonblockingServer.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,331 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
+#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
+
+#include <Thrift.h>
+#include <server/TServer.h>
+#include <transport/TTransportUtils.h>
+#include <concurrency/ThreadManager.h>
+#include <stack>
+#include <event.h>
+
+namespace facebook { namespace thrift { namespace server {
+
+using facebook::thrift::transport::TMemoryBuffer;
+using facebook::thrift::protocol::TProtocol;
+using facebook::thrift::concurrency::Runnable;
+using facebook::thrift::concurrency::ThreadManager;
+
+// Forward declaration of class
+class TConnection;
+
+/**
+ * This is a non-blocking server in C++ for high performance that operates a
+ * single IO thread. It assumes that all incoming requests are framed with a
+ * 4 byte length indicator and writes out responses using the same framing.
+ *
+ * It does not use the TServerTransport framework, but rather has socket
+ * operations hardcoded for use with select.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TNonblockingServer : public TServer {
+ private:
+
+ // Listen backlog
+ static const int LISTEN_BACKLOG = 1024;
+
+ // Server socket file descriptor
+ int serverSocket_;
+
+ // Port server runs on
+ int port_;
+
+ // Whether to frame responses
+ bool frameResponses_;
+
+ // For processing via thread pool, may be NULL
+ boost::shared_ptr<ThreadManager> threadManager_;
+
+ // Is thread pool processing?
+ bool threadPoolProcessing_;
+
+ // The event base for libevent
+ event_base* eventBase_;
+
+ // Event struct, for use with eventBase_
+ struct event serverEvent_;
+
+ /**
+ * This is a stack of all the objects that have been created but that
+ * are NOT currently in use. When we close a connection, we place it on this
+ * stack so that the object can be reused later, rather than freeing the
+ * memory and reallocating a new object later.
+ */
+ std::stack<TConnection*> connectionStack_;
+
+ void handleEvent(int fd, short which);
+
+ public:
+ TNonblockingServer(boost::shared_ptr<TProcessor> processor,
+ int port) :
+ TServer(processor),
+ serverSocket_(-1),
+ port_(port),
+ frameResponses_(true),
+ threadPoolProcessing_(false),
+ eventBase_(NULL) {}
+
+ TNonblockingServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ int port,
+ boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
+ TServer(processor),
+ serverSocket_(-1),
+ port_(port),
+ frameResponses_(true),
+ threadManager_(threadManager),
+ eventBase_(NULL) {
+ setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(protocolFactory);
+ setOutputProtocolFactory(protocolFactory);
+ setThreadManager(threadManager);
+ }
+
+ TNonblockingServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TTransportFactory> inputTransportFactory,
+ boost::shared_ptr<TTransportFactory> outputTransportFactory,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+ int port,
+ boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
+ TServer(processor),
+ serverSocket_(0),
+ port_(port),
+ frameResponses_(true),
+ threadManager_(threadManager),
+ eventBase_(NULL) {
+ setInputTransportFactory(inputTransportFactory);
+ setOutputTransportFactory(outputTransportFactory);
+ setInputProtocolFactory(inputProtocolFactory);
+ setOutputProtocolFactory(outputProtocolFactory);
+ setThreadManager(threadManager);
+ }
+
+ ~TNonblockingServer() {}
+
+ void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+ threadManager_ = threadManager;
+ threadPoolProcessing_ = (threadManager != NULL);
+ }
+
+ bool isThreadPoolProcessing() const {
+ return threadPoolProcessing_;
+ }
+
+ void addTask(boost::shared_ptr<Runnable> task) {
+ threadManager_->add(task);
+ }
+
+ void setFrameResponses(bool frameResponses) {
+ frameResponses_ = frameResponses;
+ }
+
+ bool getFrameResponses() const {
+ return frameResponses_;
+ }
+
+ event_base* getEventBase() const {
+ return eventBase_;
+ }
+
+ TConnection* createConnection(int socket, short flags);
+
+ void returnConnection(TConnection* connection);
+
+ static void eventHandler(int fd, short which, void* v) {
+ ((TNonblockingServer*)v)->handleEvent(fd, which);
+ }
+
+ void listenSocket();
+
+ void listenSocket(int fd);
+
+ void registerEvents(event_base* base);
+
+ void serve();
+
+};
+
+/**
+ * Two states for sockets, recv and send mode
+ */
+enum TSocketState {
+ SOCKET_RECV,
+ SOCKET_SEND
+};
+
+/**
+ * Four states for the nonblocking servr:
+ * 1) initialize
+ * 2) read 4 byte frame size
+ * 3) read frame of data
+ * 4) send back data (if any)
+ */
+enum TAppState {
+ APP_INIT,
+ APP_READ_FRAME_SIZE,
+ APP_READ_REQUEST,
+ APP_WAIT_TASK,
+ APP_SEND_FRAME_SIZE,
+ APP_SEND_RESULT
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TConnection {
+ private:
+
+ class Task;
+
+ // Server handle
+ TNonblockingServer* server_;
+
+ // Socket handle
+ int socket_;
+
+ // Libevent object
+ struct event event_;
+
+ // Libevent flags
+ short eventFlags_;
+
+ // Socket mode
+ TSocketState socketState_;
+
+ // Application state
+ TAppState appState_;
+
+ // How much data needed to read
+ uint32_t readWant_;
+
+ // Where in the read buffer are we
+ uint32_t readBufferPos_;
+
+ // Read buffer
+ uint8_t* readBuffer_;
+
+ // Read buffer size
+ uint32_t readBufferSize_;
+
+ // Write buffer
+ uint8_t* writeBuffer_;
+
+ // Write buffer size
+ uint32_t writeBufferSize_;
+
+ // How far through writing are we?
+ uint32_t writeBufferPos_;
+
+ // Frame size
+ int32_t frameSize_;
+
+ // Task handle
+ int taskHandle_;
+
+ // Task event
+ struct event taskEvent_;
+
+ // Transport to read from
+ boost::shared_ptr<TMemoryBuffer> inputTransport_;
+
+ // Transport that processor writes to
+ boost::shared_ptr<TMemoryBuffer> outputTransport_;
+
+ // extra transport generated by transport factory (e.g. BufferedRouterTransport)
+ boost::shared_ptr<TTransport> factoryInputTransport_;
+ boost::shared_ptr<TTransport> factoryOutputTransport_;
+
+ // Protocol decoder
+ boost::shared_ptr<TProtocol> inputProtocol_;
+
+ // Protocol encoder
+ boost::shared_ptr<TProtocol> outputProtocol_;
+
+ // Go into read mode
+ void setRead() {
+ setFlags(EV_READ | EV_PERSIST);
+ }
+
+ // Go into write mode
+ void setWrite() {
+ setFlags(EV_WRITE | EV_PERSIST);
+ }
+
+ // Set socket idle
+ void setIdle() {
+ setFlags(0);
+ }
+
+ // Set event flags
+ void setFlags(short eventFlags);
+
+ // Libevent handlers
+ void workSocket();
+
+ // Close this client and reset
+ void close();
+
+ public:
+
+ // Constructor
+ TConnection(int socket, short eventFlags, TNonblockingServer *s) {
+ readBuffer_ = (uint8_t*)malloc(1024);
+ if (readBuffer_ == NULL) {
+ throw new facebook::thrift::TException("Out of memory.");
+ }
+ readBufferSize_ = 1024;
+
+ // Allocate input and output tranpsorts
+ // these only need to be allocated once per TConnection (they don't need to be
+ // reallocated on init() call)
+ inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
+ outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
+
+ init(socket, eventFlags, s);
+ }
+
+ // Initialize
+ void init(int socket, short eventFlags, TNonblockingServer *s);
+
+ // Transition into a new state
+ void transition();
+
+ // Handler wrapper
+ static void eventHandler(int fd, short which, void* v) {
+ assert(fd == ((TConnection*)v)->socket_);
+ ((TConnection*)v)->workSocket();
+ }
+
+ // Handler wrapper for task block
+ static void taskHandler(int fd, short which, void* v) {
+ assert(fd == ((TConnection*)v)->taskHandle_);
+ if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
+ GlobalOutput("TConnection::taskHandler close handle failed, resource leak");
+ }
+ ((TConnection*)v)->transition();
+ }
+
+};
+
+}}} // facebook::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
Added: hadoop/hive/trunk/service/include/thrift/server/TServer.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/server/TServer.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/server/TServer.h (added)
+++ hadoop/hive/trunk/service/include/thrift/server/TServer.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,193 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_SERVER_TSERVER_H_
+#define _THRIFT_SERVER_TSERVER_H_ 1
+
+#include <TProcessor.h>
+#include <transport/TServerTransport.h>
+#include <protocol/TBinaryProtocol.h>
+#include <concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace server {
+
+using facebook::thrift::TProcessor;
+using facebook::thrift::protocol::TBinaryProtocolFactory;
+using facebook::thrift::protocol::TProtocol;
+using facebook::thrift::protocol::TProtocolFactory;
+using facebook::thrift::transport::TServerTransport;
+using facebook::thrift::transport::TTransport;
+using facebook::thrift::transport::TTransportFactory;
+
+/**
+ * Virtual interface class that can handle events from the server core. To
+ * use this you should subclass it and implement the methods that you care
+ * about. Your subclass can also store local data that you may care about,
+ * such as additional "arguments" to these methods (stored in the object
+ * instance's state).
+ */
+class TServerEventHandler {
+ public:
+
+ virtual ~TServerEventHandler() {}
+
+ /**
+ * Called before the server begins.
+ */
+ virtual void preServe() {}
+
+ /**
+ * Called when a new client has connected and is about to being processing.
+ */
+ virtual void clientBegin(boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output) {}
+
+ /**
+ * Called when a client has finished making requests.
+ */
+ virtual void clientEnd(boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output) {}
+
+ protected:
+
+ /**
+ * Prevent direct instantiation.
+ */
+ TServerEventHandler() {}
+
+};
+
+/**
+ * Thrift server.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TServer : public concurrency::Runnable {
+ public:
+
+ virtual ~TServer() {}
+
+ virtual void serve() = 0;
+
+ virtual void stop() {}
+
+ // Allows running the server as a Runnable thread
+ virtual void run() {
+ serve();
+ }
+
+ boost::shared_ptr<TProcessor> getProcessor() {
+ return processor_;
+ }
+
+ boost::shared_ptr<TServerTransport> getServerTransport() {
+ return serverTransport_;
+ }
+
+ boost::shared_ptr<TTransportFactory> getInputTransportFactory() {
+ return inputTransportFactory_;
+ }
+
+ boost::shared_ptr<TTransportFactory> getOutputTransportFactory() {
+ return outputTransportFactory_;
+ }
+
+ boost::shared_ptr<TProtocolFactory> getInputProtocolFactory() {
+ return inputProtocolFactory_;
+ }
+
+ boost::shared_ptr<TProtocolFactory> getOutputProtocolFactory() {
+ return outputProtocolFactory_;
+ }
+
+ boost::shared_ptr<TServerEventHandler> getEventHandler() {
+ return eventHandler_;
+ }
+
+protected:
+ TServer(boost::shared_ptr<TProcessor> processor):
+ processor_(processor) {
+ setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ }
+
+ TServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport):
+ processor_(processor),
+ serverTransport_(serverTransport) {
+ setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ }
+
+ TServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> transportFactory,
+ boost::shared_ptr<TProtocolFactory> protocolFactory):
+ processor_(processor),
+ serverTransport_(serverTransport),
+ inputTransportFactory_(transportFactory),
+ outputTransportFactory_(transportFactory),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory) {}
+
+ TServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> inputTransportFactory,
+ boost::shared_ptr<TTransportFactory> outputTransportFactory,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory):
+ processor_(processor),
+ serverTransport_(serverTransport),
+ inputTransportFactory_(inputTransportFactory),
+ outputTransportFactory_(outputTransportFactory),
+ inputProtocolFactory_(inputProtocolFactory),
+ outputProtocolFactory_(outputProtocolFactory) {}
+
+
+ // Class variables
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TServerTransport> serverTransport_;
+
+ boost::shared_ptr<TTransportFactory> inputTransportFactory_;
+ boost::shared_ptr<TTransportFactory> outputTransportFactory_;
+
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
+
+ boost::shared_ptr<TServerEventHandler> eventHandler_;
+
+public:
+ void setInputTransportFactory(boost::shared_ptr<TTransportFactory> inputTransportFactory) {
+ inputTransportFactory_ = inputTransportFactory;
+ }
+
+ void setOutputTransportFactory(boost::shared_ptr<TTransportFactory> outputTransportFactory) {
+ outputTransportFactory_ = outputTransportFactory;
+ }
+
+ void setInputProtocolFactory(boost::shared_ptr<TProtocolFactory> inputProtocolFactory) {
+ inputProtocolFactory_ = inputProtocolFactory;
+ }
+
+ void setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory> outputProtocolFactory) {
+ outputProtocolFactory_ = outputProtocolFactory;
+ }
+
+ void setServerEventHandler(boost::shared_ptr<TServerEventHandler> eventHandler) {
+ eventHandler_ = eventHandler;
+ }
+
+};
+
+}}} // facebook::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSERVER_H_
Added: hadoop/hive/trunk/service/include/thrift/server/TSimpleServer.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/server/TSimpleServer.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/server/TSimpleServer.h (added)
+++ hadoop/hive/trunk/service/include/thrift/server/TSimpleServer.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,58 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
+#define _THRIFT_SERVER_TSIMPLESERVER_H_ 1
+
+#include "server/TServer.h"
+#include "transport/TServerTransport.h"
+
+namespace facebook { namespace thrift { namespace server {
+
+/**
+ * This is the most basic simple server. It is single-threaded and runs a
+ * continuous loop of accepting a single connection, processing requests on
+ * that connection until it closes, and then repeating. It is a good example
+ * of how to extend the TServer interface.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TSimpleServer : public TServer {
+ public:
+ TSimpleServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> transportFactory,
+ boost::shared_ptr<TProtocolFactory> protocolFactory) :
+ TServer(processor, serverTransport, transportFactory, protocolFactory),
+ stop_(false) {}
+
+ TSimpleServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> inputTransportFactory,
+ boost::shared_ptr<TTransportFactory> outputTransportFactory,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory):
+ TServer(processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory),
+ stop_(false) {}
+
+ ~TSimpleServer() {}
+
+ void serve();
+
+ void stop() {
+ stop_ = true;
+ }
+
+ protected:
+ bool stop_;
+
+};
+
+}}} // facebook::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
Added: hadoop/hive/trunk/service/include/thrift/server/TThreadPoolServer.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/server/TThreadPoolServer.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/server/TThreadPoolServer.h (added)
+++ hadoop/hive/trunk/service/include/thrift/server/TThreadPoolServer.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,66 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_
+#define _THRIFT_SERVER_TTHREADPOOLSERVER_H_ 1
+
+#include <concurrency/ThreadManager.h>
+#include <server/TServer.h>
+#include <transport/TServerTransport.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace server {
+
+using facebook::thrift::concurrency::ThreadManager;
+using facebook::thrift::protocol::TProtocolFactory;
+using facebook::thrift::transport::TServerTransport;
+using facebook::thrift::transport::TTransportFactory;
+
+class TThreadPoolServer : public TServer {
+ public:
+ class Task;
+
+ TThreadPoolServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> transportFactory,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<ThreadManager> threadManager);
+
+ TThreadPoolServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> inputTransportFactory,
+ boost::shared_ptr<TTransportFactory> outputTransportFactory,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+ boost::shared_ptr<ThreadManager> threadManager);
+
+ virtual ~TThreadPoolServer();
+
+ virtual void serve();
+
+ virtual int64_t getTimeout() const;
+
+ virtual void setTimeout(int64_t value);
+
+ virtual void stop() {
+ stop_ = true;
+ serverTransport_->interrupt();
+ }
+
+ protected:
+
+ boost::shared_ptr<ThreadManager> threadManager_;
+
+ volatile bool stop_;
+
+ volatile int64_t timeout_;
+
+};
+
+}}} // facebook::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_
Added: hadoop/hive/trunk/service/include/thrift/server/TThreadedServer.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/server/TThreadedServer.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/server/TThreadedServer.h (added)
+++ hadoop/hive/trunk/service/include/thrift/server/TThreadedServer.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,55 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
+#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
+
+#include <server/TServer.h>
+#include <transport/TServerTransport.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace server {
+
+using facebook::thrift::TProcessor;
+using facebook::thrift::transport::TServerTransport;
+using facebook::thrift::transport::TTransportFactory;
+using facebook::thrift::concurrency::Monitor;
+using facebook::thrift::concurrency::ThreadFactory;
+
+class TThreadedServer : public TServer {
+
+ public:
+ class Task;
+
+ TThreadedServer(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TServerTransport> serverTransport,
+ boost::shared_ptr<TTransportFactory> transportFactory,
+ boost::shared_ptr<TProtocolFactory> protocolFactory);
+
+ virtual ~TThreadedServer();
+
+ virtual void serve();
+
+ void stop() {
+ stop_ = true;
+ serverTransport_->interrupt();
+ }
+
+ protected:
+ boost::shared_ptr<ThreadFactory> threadFactory_;
+ volatile bool stop_;
+
+ Monitor tasksMonitor_;
+ std::set<Task*> tasks_;
+
+};
+
+}}} // facebook::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
Added: hadoop/hive/trunk/service/include/thrift/transport/TFileTransport.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/transport/TFileTransport.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/transport/TFileTransport.h (added)
+++ hadoop/hive/trunk/service/include/thrift/transport/TFileTransport.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,429 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
+
+#include "TTransport.h"
+#include "Thrift.h"
+#include "TProcessor.h"
+
+#include <string>
+#include <stdio.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport {
+
+using facebook::thrift::TProcessor;
+using facebook::thrift::protocol::TProtocolFactory;
+
+// Data pertaining to a single event
+typedef struct eventInfo {
+ uint8_t* eventBuff_;
+ uint32_t eventSize_;
+ uint32_t eventBuffPos_;
+
+ eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
+ ~eventInfo() {
+ if (eventBuff_) {
+ delete[] eventBuff_;
+ }
+ }
+} eventInfo;
+
+// information about current read state
+typedef struct readState {
+ eventInfo* event_;
+
+ // keep track of event size
+ uint8_t eventSizeBuff_[4];
+ uint8_t eventSizeBuffPos_;
+ bool readingSize_;
+
+ // read buffer variables
+ int32_t bufferPtr_;
+ int32_t bufferLen_;
+
+ // last successful dispatch point
+ int32_t lastDispatchPtr_;
+
+ void resetState(uint32_t lastDispatchPtr) {
+ readingSize_ = true;
+ eventSizeBuffPos_ = 0;
+ lastDispatchPtr_ = lastDispatchPtr;
+ }
+
+ void resetAllValues() {
+ resetState(0);
+ bufferPtr_ = 0;
+ bufferLen_ = 0;
+ if (event_) {
+ delete(event_);
+ }
+ event_ = 0;
+ }
+
+ readState() {
+ event_ = 0;
+ resetAllValues();
+ }
+
+ ~readState() {
+ if (event_) {
+ delete(event_);
+ }
+ }
+
+} readState;
+
+/**
+ * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
+ * to be written to disk. Should be used in the following way:
+ * 1) Buffer created
+ * 2) Buffer written to (addEvent)
+ * 3) Buffer read from (getNext)
+ * 4) Buffer reset (reset)
+ * 5) Go back to 2, or destroy buffer
+ *
+ * The buffer should never be written to after it is read from, unless it is reset first.
+ * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
+ * which uses the buffer in this way.
+ *
+ * @author James Wang <jw...@facebook.com>
+ */
+class TFileTransportBuffer {
+ public:
+ TFileTransportBuffer(uint32_t size);
+ ~TFileTransportBuffer();
+
+ bool addEvent(eventInfo *event);
+ eventInfo* getNext();
+ void reset();
+ bool isFull();
+ bool isEmpty();
+
+ private:
+ TFileTransportBuffer(); // should not be used
+
+ enum mode {
+ WRITE,
+ READ
+ };
+ mode bufferMode_;
+
+ uint32_t writePoint_;
+ uint32_t readPoint_;
+ uint32_t size_;
+ eventInfo** buffer_;
+};
+
+/**
+ * Abstract interface for transports used to read files
+ */
+class TFileReaderTransport : virtual public TTransport {
+ public:
+ virtual int32_t getReadTimeout() = 0;
+ virtual void setReadTimeout(int32_t readTimeout) = 0;
+
+ virtual uint32_t getNumChunks() = 0;
+ virtual uint32_t getCurChunk() = 0;
+ virtual void seekToChunk(int32_t chunk) = 0;
+ virtual void seekToEnd() = 0;
+};
+
+/**
+ * Abstract interface for transports used to write files
+ */
+class TFileWriterTransport : virtual public TTransport {
+ public:
+ virtual uint32_t getChunkSize() = 0;
+ virtual void setChunkSize(uint32_t chunkSize) = 0;
+};
+
+/**
+ * File implementation of a transport. Reads and writes are done to a
+ * file on disk.
+ *
+ * @author Aditya Agarwal <ad...@facebook.com>
+ */
+class TFileTransport : public TFileReaderTransport,
+ public TFileWriterTransport {
+ public:
+ TFileTransport(std::string path, bool readOnly=false);
+ ~TFileTransport();
+
+ // TODO: what is the correct behaviour for this?
+ // the log file is generally always open
+ bool isOpen() {
+ return true;
+ }
+
+ void write(const uint8_t* buf, uint32_t len);
+ void flush();
+
+ uint32_t readAll(uint8_t* buf, uint32_t len);
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ // log-file specific functions
+ void seekToChunk(int32_t chunk);
+ void seekToEnd();
+ uint32_t getNumChunks();
+ uint32_t getCurChunk();
+
+ // for changing the output file
+ void resetOutputFile(int fd, std::string filename, int64_t offset);
+
+ // Setter/Getter functions for user-controllable options
+ void setReadBuffSize(uint32_t readBuffSize) {
+ if (readBuffSize) {
+ readBuffSize_ = readBuffSize;
+ }
+ }
+ uint32_t getReadBuffSize() {
+ return readBuffSize_;
+ }
+
+ static const int32_t TAIL_READ_TIMEOUT = -1;
+ static const int32_t NO_TAIL_READ_TIMEOUT = 0;
+ void setReadTimeout(int32_t readTimeout) {
+ readTimeout_ = readTimeout;
+ }
+ int32_t getReadTimeout() {
+ return readTimeout_;
+ }
+
+ void setChunkSize(uint32_t chunkSize) {
+ if (chunkSize) {
+ chunkSize_ = chunkSize;
+ }
+ }
+ uint32_t getChunkSize() {
+ return chunkSize_;
+ }
+
+ void setEventBufferSize(uint32_t bufferSize) {
+ if (bufferAndThreadInitialized_) {
+ GlobalOutput("Cannot change the buffer size after writer thread started");
+ return;
+ }
+ eventBufferSize_ = bufferSize;
+ }
+
+ uint32_t getEventBufferSize() {
+ return eventBufferSize_;
+ }
+
+ void setFlushMaxUs(uint32_t flushMaxUs) {
+ if (flushMaxUs) {
+ flushMaxUs_ = flushMaxUs;
+ }
+ }
+ uint32_t getFlushMaxUs() {
+ return flushMaxUs_;
+ }
+
+ void setFlushMaxBytes(uint32_t flushMaxBytes) {
+ if (flushMaxBytes) {
+ flushMaxBytes_ = flushMaxBytes;
+ }
+ }
+ uint32_t getFlushMaxBytes() {
+ return flushMaxBytes_;
+ }
+
+ void setMaxEventSize(uint32_t maxEventSize) {
+ maxEventSize_ = maxEventSize;
+ }
+ uint32_t getMaxEventSize() {
+ return maxEventSize_;
+ }
+
+ void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
+ maxCorruptedEvents_ = maxCorruptedEvents;
+ }
+ uint32_t getMaxCorruptedEvents() {
+ return maxCorruptedEvents_;
+ }
+
+ void setEofSleepTimeUs(uint32_t eofSleepTime) {
+ if (eofSleepTime) {
+ eofSleepTime_ = eofSleepTime;
+ }
+ }
+ uint32_t getEofSleepTimeUs() {
+ return eofSleepTime_;
+ }
+
+ private:
+ // helper functions for writing to a file
+ void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+ bool swapEventBuffers(struct timespec* deadline);
+ bool initBufferAndWriteThread();
+
+ // control for writer thread
+ static void* startWriterThread(void* ptr) {
+ (((TFileTransport*)ptr)->writerThread());
+ return 0;
+ }
+ void writerThread();
+
+ // helper functions for reading from a file
+ eventInfo* readEvent();
+
+ // event corruption-related functions
+ bool isEventCorrupted();
+ void performRecovery();
+
+ // Utility functions
+ void openLogFile();
+ void getNextFlushTime(struct timespec* ts_next_flush);
+
+ // Class variables
+ readState readState_;
+ uint8_t* readBuff_;
+ eventInfo* currentEvent_;
+
+ uint32_t readBuffSize_;
+ static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
+
+ int32_t readTimeout_;
+ static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
+
+ // size of chunks that file will be split up into
+ uint32_t chunkSize_;
+ static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+
+ // size of event buffers
+ uint32_t eventBufferSize_;
+ static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
+
+ // max number of microseconds that can pass without flushing
+ uint32_t flushMaxUs_;
+ static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
+
+ // max number of bytes that can be written without flushing
+ uint32_t flushMaxBytes_;
+ static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
+
+ // max event size
+ uint32_t maxEventSize_;
+ static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
+
+ // max number of corrupted events per chunk
+ uint32_t maxCorruptedEvents_;
+ static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
+
+ // sleep duration when EOF is hit
+ uint32_t eofSleepTime_;
+ static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
+
+ // sleep duration when a corrupted event is encountered
+ uint32_t corruptedEventSleepTime_;
+ static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
+
+ // writer thread id
+ pthread_t writerThreadId_;
+
+ // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
+ // needs to be written to the file. The buffers are swapped by the writer thread.
+ TFileTransportBuffer *dequeueBuffer_;
+ TFileTransportBuffer *enqueueBuffer_;
+
+ // conditions used to block when the buffer is full or empty
+ pthread_cond_t notFull_, notEmpty_;
+ volatile bool closing_;
+
+ // To keep track of whether the buffer has been flushed
+ pthread_cond_t flushed_;
+ volatile bool forceFlush_;
+
+ // Mutex that is grabbed when enqueueing and swapping the read/write buffers
+ pthread_mutex_t mutex_;
+
+ // File information
+ std::string filename_;
+ int fd_;
+
+ // Whether the writer thread and buffers have been initialized
+ bool bufferAndThreadInitialized_;
+
+ // Offset within the file
+ off_t offset_;
+
+ // event corruption information
+ uint32_t lastBadChunk_;
+ uint32_t numCorruptedEventsInChunk_;
+
+ bool readOnly_;
+};
+
+// Exception thrown when EOF is hit
+class TEOFException : public TTransportException {
+ public:
+ TEOFException():
+ TTransportException(TTransportException::END_OF_FILE) {};
+};
+
+
+// wrapper class to process events from a file containing thrift events
+class TFileProcessor {
+ public:
+ /**
+ * Constructor that defaults output transport to null transport
+ *
+ * @param processor processes log-file events
+ * @param protocolFactory protocol factory
+ * @param inputTransport file transport
+ */
+ TFileProcessor(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<TFileReaderTransport> inputTransport);
+
+ TFileProcessor(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+ boost::shared_ptr<TFileReaderTransport> inputTransport);
+
+ /**
+ * Constructor
+ *
+ * @param processor processes log-file events
+ * @param protocolFactory protocol factory
+ * @param inputTransport input file transport
+ * @param output output transport
+ */
+ TFileProcessor(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocolFactory> protocolFactory,
+ boost::shared_ptr<TFileReaderTransport> inputTransport,
+ boost::shared_ptr<TTransport> outputTransport);
+
+ /**
+ * processes events from the file
+ *
+ * @param numEvents number of events to process (0 for unlimited)
+ * @param tail tails the file if true
+ */
+ void process(uint32_t numEvents, bool tail);
+
+ /**
+ * process events until the end of the chunk
+ *
+ */
+ void processChunk();
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
+ boost::shared_ptr<TFileReaderTransport> inputTransport_;
+ boost::shared_ptr<TTransport> outputTransport_;
+};
+
+
+}}} // facebook::thrift::transport
+
+#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
Added: hadoop/hive/trunk/service/include/thrift/transport/THttpClient.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/transport/THttpClient.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/transport/THttpClient.h (added)
+++ hadoop/hive/trunk/service/include/thrift/transport/THttpClient.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,99 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
+#define _THRIFT_TRANSPORT_THTTPCLIENT_H_ 1
+
+#include <transport/TTransportUtils.h>
+
+namespace facebook { namespace thrift { namespace transport {
+
+/**
+ * HTTP client implementation of the thrift transport. This was irritating
+ * to write, but the alternatives in C++ land are daunting. Linking CURL
+ * requires 23 dynamic libraries last time I checked (WTF?!?). All we have
+ * here is a VERY basic HTTP/1.1 client which supports HTTP 100 Continue,
+ * chunked transfer encoding, keepalive, etc. Tested against Apache.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class THttpClient : public TTransport {
+ public:
+ THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path="");
+
+ THttpClient(std::string host, int port, std::string path="");
+
+ virtual ~THttpClient();
+
+ void open() {
+ transport_->open();
+ }
+
+ bool isOpen() {
+ return transport_->isOpen();
+ }
+
+ bool peek() {
+ return transport_->peek();
+ }
+
+ void close() {
+ transport_->close();
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void readEnd();
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+ private:
+ void init();
+
+ protected:
+
+ boost::shared_ptr<TTransport> transport_;
+
+ TMemoryBuffer writeBuffer_;
+ TMemoryBuffer readBuffer_;
+
+ std::string host_;
+ std::string path_;
+
+ bool readHeaders_;
+ bool chunked_;
+ bool chunkedDone_;
+ uint32_t chunkSize_;
+ uint32_t contentLength_;
+
+ char* httpBuf_;
+ uint32_t httpPos_;
+ uint32_t httpBufLen_;
+ uint32_t httpBufSize_;
+
+ uint32_t readMoreData();
+ char* readLine();
+
+ void readHeaders();
+ void parseHeader(char* header);
+ bool parseStatusLine(char* status);
+
+ uint32_t readChunked();
+ void readChunkedFooters();
+ uint32_t parseChunkSize(char* line);
+
+ uint32_t readContent(uint32_t size);
+
+ void refill();
+ void shift();
+
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
Added: hadoop/hive/trunk/service/include/thrift/transport/TServerSocket.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/transport/TServerSocket.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/transport/TServerSocket.h (added)
+++ hadoop/hive/trunk/service/include/thrift/transport/TServerSocket.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,59 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_
+#define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1
+
+#include "TServerTransport.h"
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport {
+
+class TSocket;
+
+/**
+ * Server socket implementation of TServerTransport. Wrapper around a unix
+ * socket listen and accept calls.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TServerSocket : public TServerTransport {
+ public:
+ TServerSocket(int port);
+ TServerSocket(int port, int sendTimeout, int recvTimeout);
+
+ ~TServerSocket();
+
+ void setSendTimeout(int sendTimeout);
+ void setRecvTimeout(int recvTimeout);
+
+ void setRetryLimit(int retryLimit);
+ void setRetryDelay(int retryDelay);
+
+ void listen();
+ void close();
+
+ void interrupt();
+
+ protected:
+ boost::shared_ptr<TTransport> acceptImpl();
+
+ private:
+ int port_;
+ int serverSocket_;
+ int acceptBacklog_;
+ int sendTimeout_;
+ int recvTimeout_;
+ int retryLimit_;
+ int retryDelay_;
+
+ int intSock1_;
+ int intSock2_;
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_
Added: hadoop/hive/trunk/service/include/thrift/transport/TServerTransport.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/transport/TServerTransport.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/transport/TServerTransport.h (added)
+++ hadoop/hive/trunk/service/include/thrift/transport/TServerTransport.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,80 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_ 1
+
+#include "TTransport.h"
+#include "TTransportException.h"
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport {
+
+/**
+ * Server transport framework. A server needs to have some facility for
+ * creating base transports to read/write from.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TServerTransport {
+ public:
+ virtual ~TServerTransport() {}
+
+ /**
+ * Starts the server transport listening for new connections. Prior to this
+ * call most transports will not return anything when accept is called.
+ *
+ * @throws TTransportException if we were unable to listen
+ */
+ virtual void listen() {}
+
+ /**
+ * Gets a new dynamically allocated transport object and passes it to the
+ * caller. Note that it is the explicit duty of the caller to free the
+ * allocated object. The returned TTransport object must always be in the
+ * opened state. NULL should never be returned, instead an Exception should
+ * always be thrown.
+ *
+ * @return A new TTransport object
+ * @throws TTransportException if there is an error
+ */
+ boost::shared_ptr<TTransport> accept() {
+ boost::shared_ptr<TTransport> result = acceptImpl();
+ if (result == NULL) {
+ throw TTransportException("accept() may not return NULL");
+ }
+ return result;
+ }
+
+ /**
+ * For "smart" TServerTransport implementations that work in a multi
+ * threaded context this can be used to break out of an accept() call.
+ * It is expected that the transport will throw a TTransportException
+ * with the interrupted error code.
+ */
+ virtual void interrupt() {}
+
+ /**
+ * Closes this transport such that future calls to accept will do nothing.
+ */
+ virtual void close() = 0;
+
+ protected:
+ TServerTransport() {}
+
+ /**
+ * Subclasses should implement this function for accept.
+ *
+ * @return A newly allocated TTransport object
+ * @throw TTransportException If an error occurs
+ */
+ virtual boost::shared_ptr<TTransport> acceptImpl() = 0;
+
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_
Added: hadoop/hive/trunk/service/include/thrift/transport/TSocket.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/transport/TSocket.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/transport/TSocket.h (added)
+++ hadoop/hive/trunk/service/include/thrift/transport/TSocket.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,232 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_TRANSPORT_TSOCKET_H_
+#define _THRIFT_TRANSPORT_TSOCKET_H_ 1
+
+#include <string>
+#include <sys/time.h>
+
+#include "TTransport.h"
+#include "TServerSocket.h"
+
+namespace facebook { namespace thrift { namespace transport {
+
+/**
+ * TCP Socket implementation of the TTransport interface.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ * @author Aditya Agarwal <ad...@facebook.com>
+ */
+class TSocket : public TTransport {
+ /**
+ * We allow the TServerSocket acceptImpl() method to access the private
+ * members of a socket so that it can access the TSocket(int socket)
+ * constructor which creates a socket object from the raw UNIX socket
+ * handle.
+ */
+ friend class TServerSocket;
+
+ public:
+ /**
+ * Constructs a new socket. Note that this does NOT actually connect the
+ * socket.
+ *
+ */
+ TSocket();
+
+ /**
+ * Constructs a new socket. Note that this does NOT actually connect the
+ * socket.
+ *
+ * @param host An IP address or hostname to connect to
+ * @param port The port to connect on
+ */
+ TSocket(std::string host, int port);
+
+ /**
+ * Destroyes the socket object, closing it if necessary.
+ */
+ virtual ~TSocket();
+
+ /**
+ * Whether the socket is alive.
+ *
+ * @return Is the socket alive?
+ */
+ bool isOpen();
+
+ /**
+ * Calls select on the socket to see if there is more data available.
+ */
+ bool peek();
+
+ /**
+ * Creates and opens the UNIX socket.
+ *
+ * @throws TTransportException If the socket could not connect
+ */
+ virtual void open();
+
+ /**
+ * Shuts down communications on the socket.
+ */
+ void close();
+
+ /**
+ * Reads from the underlying socket.
+ */
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ /**
+ * Writes to the underlying socket.
+ */
+ void write(const uint8_t* buf, uint32_t len);
+
+ /**
+ * Get the host that the socket is connected to
+ *
+ * @return string host identifier
+ */
+ std::string getHost();
+
+ /**
+ * Get the port that the socket is connected to
+ *
+ * @return int port number
+ */
+ int getPort();
+
+ /**
+ * Set the host that socket will connect to
+ *
+ * @param host host identifier
+ */
+ void setHost(std::string host);
+
+ /**
+ * Set the port that socket will connect to
+ *
+ * @param port port number
+ */
+ void setPort(int port);
+
+ /**
+ * Controls whether the linger option is set on the socket.
+ *
+ * @param on Whether SO_LINGER is on
+ * @param linger If linger is active, the number of seconds to linger for
+ */
+ void setLinger(bool on, int linger);
+
+ /**
+ * Whether to enable/disable Nagle's algorithm.
+ *
+ * @param noDelay Whether or not to disable the algorithm.
+ * @return
+ */
+ void setNoDelay(bool noDelay);
+
+ /**
+ * Set the connect timeout
+ */
+ void setConnTimeout(int ms);
+
+ /**
+ * Set the receive timeout
+ */
+ void setRecvTimeout(int ms);
+
+ /**
+ * Set the send timeout
+ */
+ void setSendTimeout(int ms);
+
+ /**
+ * Set the max number of recv retries in case of an EAGAIN
+ * error
+ */
+ void setMaxRecvRetries(int maxRecvRetries);
+
+ /**
+ * Get socket information formated as a string <Host: x Port: x>
+ */
+ std::string getSocketInfo();
+
+ /**
+ * Returns the DNS name of the host to which the socket is connected
+ */
+ std::string getPeerHost();
+
+ /**
+ * Returns the address of the host to which the socket is connected
+ */
+ std::string getPeerAddress();
+
+ /**
+ * Returns the port of the host to which the socket is connected
+ **/
+ int getPeerPort();
+
+
+ protected:
+ /**
+ * Constructor to create socket from raw UNIX handle. Never called directly
+ * but used by the TServerSocket class.
+ */
+ TSocket(int socket);
+
+ /** connect, called by open */
+ void openConnection(struct addrinfo *res);
+
+ /** Host to connect to */
+ std::string host_;
+
+ /** Peer hostname */
+ std::string peerHost_;
+
+ /** Peer address */
+ std::string peerAddress_;
+
+ /** Peer port */
+ int peerPort_;
+
+ /** Port number to connect on */
+ int port_;
+
+ /** Underlying UNIX socket handle */
+ int socket_;
+
+ /** Connect timeout in ms */
+ int connTimeout_;
+
+ /** Send timeout in ms */
+ int sendTimeout_;
+
+ /** Recv timeout in ms */
+ int recvTimeout_;
+
+ /** Linger on */
+ bool lingerOn_;
+
+ /** Linger val */
+ int lingerVal_;
+
+ /** Nodelay */
+ bool noDelay_;
+
+ /** Recv EGAIN retries */
+ int maxRecvRetries_;
+
+ /** Recv timeout timeval */
+ struct timeval recvTimeval_;
+
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TSOCKET_H_
+
Added: hadoop/hive/trunk/service/include/thrift/transport/TSocketPool.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/transport/TSocketPool.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/transport/TSocketPool.h (added)
+++ hadoop/hive/trunk/service/include/thrift/transport/TSocketPool.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,113 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_TRANSPORT_TSOCKETPOOL_H_
+#define _THRIFT_TRANSPORT_TSOCKETPOOL_H_ 1
+
+#include <vector>
+#include "TSocket.h"
+
+namespace facebook { namespace thrift { namespace transport {
+
+/**
+ * TCP Socket implementation of the TTransport interface.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TSocketPool : public TSocket {
+
+ public:
+ /**
+ * Socket pool constructor
+ *
+ * @param hosts list of host names
+ * @param ports list of port names
+ */
+ TSocketPool(const std::vector<std::string> &hosts,
+ const std::vector<int> &ports);
+
+ /**
+ * Socket pool constructor
+ *
+ * @param servers list of pairs of host name and port
+ */
+ TSocketPool(const std::vector<std::pair<std::string, int> > servers);
+
+ /**
+ * Socket pool constructor
+ *
+ * @param host single host
+ * @param port single port
+ */
+ TSocketPool(const std::string& host, int port);
+
+ /**
+ * Destroyes the socket object, closing it if necessary.
+ */
+ virtual ~TSocketPool();
+
+ /**
+ * Add a server to the pool
+ */
+ void addServer(const std::string& host, int port);
+
+ /**
+ * Sets how many times to keep retrying a host in the connect function.
+ */
+ void setNumRetries(int numRetries);
+
+ /**
+ * Sets how long to wait until retrying a host if it was marked down
+ */
+ void setRetryInterval(int retryInterval);
+
+ /**
+ * Sets how many times to keep retrying a host before marking it as down.
+ */
+ void setMaxConsecutiveFailures(int maxConsecutiveFailures);
+
+ /**
+ * Turns randomization in connect order on or off.
+ */
+ void setRandomize(bool randomize);
+
+ /**
+ * Whether to always try the last server.
+ */
+ void setAlwaysTryLast(bool alwaysTryLast);
+
+ /**
+ * Creates and opens the UNIX socket.
+ */
+ void open();
+
+ protected:
+
+ /** List of servers to connect to */
+ std::vector<std::pair<std::string, int> > servers_;
+
+ /** How many times to retry each host in connect */
+ int numRetries_;
+
+ /** Retry interval in seconds, how long to not try a host if it has been
+ * marked as down.
+ */
+ int retryInterval_;
+
+ /** Max consecutive failures before marking a host down. */
+ int maxConsecutiveFailures_;
+
+ /** Try hosts in order? or Randomized? */
+ bool randomize_;
+
+ /** Always try last host, even if marked down? */
+ bool alwaysTryLast_;
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TSOCKETPOOL_H_
+
Added: hadoop/hive/trunk/service/include/thrift/transport/TTransport.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/transport/TTransport.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/transport/TTransport.h (added)
+++ hadoop/hive/trunk/service/include/thrift/transport/TTransport.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,203 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
+
+#include <Thrift.h>
+#include <boost/shared_ptr.hpp>
+#include <transport/TTransportException.h>
+#include <string>
+
+namespace facebook { namespace thrift { namespace transport {
+
+/**
+ * Generic interface for a method of transporting data. A TTransport may be
+ * capable of either reading or writing, but not necessarily both.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TTransport {
+ public:
+ /**
+ * Virtual deconstructor.
+ */
+ virtual ~TTransport() {}
+
+ /**
+ * Whether this transport is open.
+ */
+ virtual bool isOpen() {
+ return false;
+ }
+
+ /**
+ * Tests whether there is more data to read or if the remote side is
+ * still open. By default this is true whenever the transport is open,
+ * but implementations should add logic to test for this condition where
+ * possible (i.e. on a socket).
+ * This is used by a server to check if it should listen for another
+ * request.
+ */
+ virtual bool peek() {
+ return isOpen();
+ }
+
+ /**
+ * Opens the transport for communications.
+ *
+ * @return bool Whether the transport was successfully opened
+ * @throws TTransportException if opening failed
+ */
+ virtual void open() {
+ throw TTransportException(TTransportException::NOT_OPEN, "Cannot open base TTransport.");
+ }
+
+ /**
+ * Closes the transport.
+ */
+ virtual void close() {
+ throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
+ }
+
+ /**
+ * Attempt to read up to the specified number of bytes into the string.
+ *
+ * @param buf Reference to the location to write the data
+ * @param len How many bytes to read
+ * @return How many bytes were actually read
+ * @throws TTransportException If an error occurs
+ */
+ virtual uint32_t read(uint8_t* buf, uint32_t len) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot read.");
+ }
+
+ /**
+ * Reads the given amount of data in its entirety no matter what.
+ *
+ * @param s Reference to location for read data
+ * @param len How many bytes to read
+ * @return How many bytes read, which must be equal to size
+ * @throws TTransportException If insufficient data was read
+ */
+ virtual uint32_t readAll(uint8_t* buf, uint32_t len) {
+ uint32_t have = 0;
+ uint32_t get = 0;
+
+ while (have < len) {
+ get = read(buf+have, len-have);
+ if (get <= 0) {
+ throw TTransportException("No more data to read.");
+ }
+ have += get;
+ }
+
+ return have;
+ }
+
+ /**
+ * Called when read is completed.
+ * This can be over-ridden to perform a transport-specific action
+ * e.g. logging the request to a file
+ *
+ */
+ virtual void readEnd() {
+ // default behaviour is to do nothing
+ return;
+ }
+
+ /**
+ * Writes the string in its entirety to the buffer.
+ *
+ * @param buf The data to write out
+ * @throws TTransportException if an error occurs
+ */
+ virtual void write(const uint8_t* buf, uint32_t len) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot write.");
+ }
+
+ /**
+ * Called when write is completed.
+ * This can be over-ridden to perform a transport-specific action
+ * at the end of a request.
+ *
+ */
+ virtual void writeEnd() {
+ // default behaviour is to do nothing
+ return;
+ }
+
+ /**
+ * Flushes any pending data to be written. Typically used with buffered
+ * transport mechanisms.
+ *
+ * @throws TTransportException if an error occurs
+ */
+ virtual void flush() {}
+
+ /**
+ * Attempts to copy len bytes from the transport into buf. Does not consume
+ * the bytes read (i.e.: a later read will return the same data). This
+ * method is meant to support protocols that need to read variable-length
+ * fields. They can attempt to borrow the maximum amount of data that they
+ * will need, then consume (see next method) what they actually use. Some
+ * transports will not support this method and others will fail occasionally,
+ * so protocols must be prepared to use read if borrow fails.
+ *
+ * @oaram buf The buffer to store the data
+ * @param len How much data to borrow
+ * @return true if the requested data has been borrowed, false otherwise
+ * @throws TTransportException if an error occurs
+ */
+ virtual bool borrow(uint8_t* buf, uint32_t len) {
+ return false;
+ }
+
+ /**
+ * Remove len bytes from the transport. This should always follow a borrow
+ * of at least len bytes, and should always succeed.
+ * TODO(dreiss): Is there any transport that could borrow but fail to
+ * consume, or that would require a buffer to dump the consumed data?
+ *
+ * @param len How many bytes to consume
+ * @throws TTransportException If an error occurs
+ */
+ virtual void consume(uint32_t len) {
+ throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot consume.");
+ }
+
+ protected:
+ /**
+ * Simple constructor.
+ */
+ TTransport() {}
+};
+
+/**
+ * Generic factory class to make an input and output transport out of a
+ * source transport. Commonly used inside servers to make input and output
+ * streams out of raw clients.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TTransportFactory {
+ public:
+ TTransportFactory() {}
+
+ virtual ~TTransportFactory() {}
+
+ /**
+ * Default implementation does nothing, just returns the transport given.
+ */
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return trans;
+ }
+
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
Added: hadoop/hive/trunk/service/include/thrift/transport/TTransportException.h
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/include/thrift/transport/TTransportException.h?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/include/thrift/transport/TTransportException.h (added)
+++ hadoop/hive/trunk/service/include/thrift/transport/TTransportException.h Sun Dec 7 20:25:22 2008
@@ -0,0 +1,106 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_
+#define _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_ 1
+
+#include <boost/lexical_cast.hpp>
+#include <string>
+#include <Thrift.h>
+
+namespace facebook { namespace thrift { namespace transport {
+
+/**
+ * Class to encapsulate all the possible types of transport errors that may
+ * occur in various transport systems. This provides a sort of generic
+ * wrapper around the shitty UNIX E_ error codes that lets a common code
+ * base of error handling to be used for various types of transports, i.e.
+ * pipes etc.
+ *
+ * @author Mark Slee <mc...@facebook.com>
+ */
+class TTransportException : public facebook::thrift::TException {
+ public:
+ /**
+ * Error codes for the various types of exceptions.
+ */
+ enum TTransportExceptionType {
+ UNKNOWN = 0,
+ NOT_OPEN = 1,
+ ALREADY_OPEN = 2,
+ TIMED_OUT = 3,
+ END_OF_FILE = 4,
+ INTERRUPTED = 5,
+ BAD_ARGS = 6,
+ CORRUPTED_DATA = 7,
+ INTERNAL_ERROR = 8,
+ };
+
+ TTransportException() :
+ facebook::thrift::TException(),
+ type_(UNKNOWN) {}
+
+ TTransportException(TTransportExceptionType type) :
+ facebook::thrift::TException(),
+ type_(type) {}
+
+ TTransportException(const std::string& message) :
+ facebook::thrift::TException(message),
+ type_(UNKNOWN) {}
+
+ TTransportException(TTransportExceptionType type, const std::string& message) :
+ facebook::thrift::TException(message),
+ type_(type) {}
+
+ TTransportException(TTransportExceptionType type,
+ const std::string& message,
+ int errno_copy) :
+ facebook::thrift::TException(message + ": " + strerror_s(errno_copy)),
+ type_(type) {}
+
+ virtual ~TTransportException() throw() {}
+
+ /**
+ * Returns an error code that provides information about the type of error
+ * that has occurred.
+ *
+ * @return Error code
+ */
+ TTransportExceptionType getType() const throw() {
+ return type_;
+ }
+
+ virtual const char* what() const throw() {
+ if (message_.empty()) {
+ switch (type_) {
+ case UNKNOWN : return "TTransportException: Unknown transport exception";
+ case NOT_OPEN : return "TTransportException: Transport not open";
+ case ALREADY_OPEN : return "TTransportException: Transport already open";
+ case TIMED_OUT : return "TTransportException: Timed out";
+ case END_OF_FILE : return "TTransportException: End of file";
+ case INTERRUPTED : return "TTransportException: Interrupted";
+ case BAD_ARGS : return "TTransportException: Invalid arguments";
+ case CORRUPTED_DATA : return "TTransportException: Corrupted Data";
+ case INTERNAL_ERROR : return "TTransportException: Internal error";
+ default : return "TTransportException: (Invalid exception type)";
+ }
+ } else {
+ return message_.c_str();
+ }
+ }
+
+ protected:
+ /** Just like strerror_r but returns a C++ string object. */
+ std::string strerror_s(int errno_copy);
+
+ /** Error code */
+ TTransportExceptionType type_;
+
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_