You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2008/12/08 05:25:51 UTC
svn commit: r724247 [6/6] - in /hadoop/hive/trunk/service: ./ if/ include/
include/thrift/ include/thrift/concurrency/ include/thrift/fb303/
include/thrift/fb303/if/ include/thrift/if/ include/thrift/processor/
include/thrift/protocol/ include/thrift/s...
Added: hadoop/hive/trunk/service/src/gen-py/hive_service/ThriftHive.py
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/src/gen-py/hive_service/ThriftHive.py?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/src/gen-py/hive_service/ThriftHive.py (added)
+++ hadoop/hive/trunk/service/src/gen-py/hive_service/ThriftHive.py Sun Dec 7 20:25:22 2008
@@ -0,0 +1,860 @@
+#
+# Autogenerated by Thrift
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+
+from thrift.Thrift import *
+import hive_metastore.ThriftHiveMetastore
+from ttypes import *
+from thrift.Thrift import TProcessor
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+try:
+ from thrift.protocol import fastbinary
+except:
+ fastbinary = None
+
+
+class Iface(hive_metastore.ThriftHiveMetastore.Iface):
+ def execute(self, query):
+ pass
+
+ def fetchOne(self, ):
+ pass
+
+ def fetchN(self, numRows):
+ pass
+
+ def fetchAll(self, ):
+ pass
+
+ def getSchema(self, ):
+ pass
+
+
+class Client(hive_metastore.ThriftHiveMetastore.Client, Iface):
+ def __init__(self, iprot, oprot=None):
+ hive_metastore.ThriftHiveMetastore.Client.__init__(self, iprot, oprot)
+
+ def execute(self, query):
+ self.send_execute(query)
+ self.recv_execute()
+
+ def send_execute(self, query):
+ self._oprot.writeMessageBegin('execute', TMessageType.CALL, self._seqid)
+ args = execute_args()
+ args.query = query
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_execute(self, ):
+ (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(self._iprot)
+ self._iprot.readMessageEnd()
+ raise x
+ result = execute_result()
+ result.read(self._iprot)
+ self._iprot.readMessageEnd()
+ if result.ex != None:
+ raise result.ex
+ return
+
+ def fetchOne(self, ):
+ self.send_fetchOne()
+ return self.recv_fetchOne()
+
+ def send_fetchOne(self, ):
+ self._oprot.writeMessageBegin('fetchOne', TMessageType.CALL, self._seqid)
+ args = fetchOne_args()
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_fetchOne(self, ):
+ (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(self._iprot)
+ self._iprot.readMessageEnd()
+ raise x
+ result = fetchOne_result()
+ result.read(self._iprot)
+ self._iprot.readMessageEnd()
+ if result.success != None:
+ return result.success
+ if result.ex != None:
+ raise result.ex
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "fetchOne failed: unknown result");
+
+ def fetchN(self, numRows):
+ self.send_fetchN(numRows)
+ return self.recv_fetchN()
+
+ def send_fetchN(self, numRows):
+ self._oprot.writeMessageBegin('fetchN', TMessageType.CALL, self._seqid)
+ args = fetchN_args()
+ args.numRows = numRows
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_fetchN(self, ):
+ (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(self._iprot)
+ self._iprot.readMessageEnd()
+ raise x
+ result = fetchN_result()
+ result.read(self._iprot)
+ self._iprot.readMessageEnd()
+ if result.success != None:
+ return result.success
+ if result.ex != None:
+ raise result.ex
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "fetchN failed: unknown result");
+
+ def fetchAll(self, ):
+ self.send_fetchAll()
+ return self.recv_fetchAll()
+
+ def send_fetchAll(self, ):
+ self._oprot.writeMessageBegin('fetchAll', TMessageType.CALL, self._seqid)
+ args = fetchAll_args()
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_fetchAll(self, ):
+ (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(self._iprot)
+ self._iprot.readMessageEnd()
+ raise x
+ result = fetchAll_result()
+ result.read(self._iprot)
+ self._iprot.readMessageEnd()
+ if result.success != None:
+ return result.success
+ if result.ex != None:
+ raise result.ex
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "fetchAll failed: unknown result");
+
+ def getSchema(self, ):
+ self.send_getSchema()
+ return self.recv_getSchema()
+
+ def send_getSchema(self, ):
+ self._oprot.writeMessageBegin('getSchema', TMessageType.CALL, self._seqid)
+ args = getSchema_args()
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_getSchema(self, ):
+ (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(self._iprot)
+ self._iprot.readMessageEnd()
+ raise x
+ result = getSchema_result()
+ result.read(self._iprot)
+ self._iprot.readMessageEnd()
+ if result.success != None:
+ return result.success
+ if result.ex != None:
+ raise result.ex
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "getSchema failed: unknown result");
+
+
+class Processor(hive_metastore.ThriftHiveMetastore.Processor, Iface, TProcessor):
+ def __init__(self, handler):
+ hive_metastore.ThriftHiveMetastore.Processor.__init__(self, handler)
+ self._processMap["execute"] = Processor.process_execute
+ self._processMap["fetchOne"] = Processor.process_fetchOne
+ self._processMap["fetchN"] = Processor.process_fetchN
+ self._processMap["fetchAll"] = Processor.process_fetchAll
+ self._processMap["getSchema"] = Processor.process_getSchema
+
+ def process(self, iprot, oprot):
+ (name, type, seqid) = iprot.readMessageBegin()
+ if name not in self._processMap:
+ iprot.skip(TType.STRUCT)
+ iprot.readMessageEnd()
+ x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name))
+ oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid)
+ x.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+ return
+ else:
+ self._processMap[name](self, seqid, iprot, oprot)
+ return True
+
+ def process_execute(self, seqid, iprot, oprot):
+ args = execute_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = execute_result()
+ try:
+ self._handler.execute(args.query)
+ except HiveServerException, ex:
+ result.ex = ex
+ oprot.writeMessageBegin("execute", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def process_fetchOne(self, seqid, iprot, oprot):
+ args = fetchOne_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = fetchOne_result()
+ try:
+ result.success = self._handler.fetchOne()
+ except HiveServerException, ex:
+ result.ex = ex
+ oprot.writeMessageBegin("fetchOne", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def process_fetchN(self, seqid, iprot, oprot):
+ args = fetchN_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = fetchN_result()
+ try:
+ result.success = self._handler.fetchN(args.numRows)
+ except HiveServerException, ex:
+ result.ex = ex
+ oprot.writeMessageBegin("fetchN", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def process_fetchAll(self, seqid, iprot, oprot):
+ args = fetchAll_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = fetchAll_result()
+ try:
+ result.success = self._handler.fetchAll()
+ except HiveServerException, ex:
+ result.ex = ex
+ oprot.writeMessageBegin("fetchAll", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def process_getSchema(self, seqid, iprot, oprot):
+ args = getSchema_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = getSchema_result()
+ try:
+ result.success = self._handler.getSchema()
+ except HiveServerException, ex:
+ result.ex = ex
+ oprot.writeMessageBegin("getSchema", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+
+# HELPER FUNCTIONS AND STRUCTURES
+
+class execute_args:
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'query', None, None, ), # 1
+ )
+
+ def __init__(self, d=None):
+ self.query = None
+ if isinstance(d, dict):
+ if 'query' in d:
+ self.query = d['query']
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.query = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('execute_args')
+ if self.query != None:
+ oprot.writeFieldBegin('query', TType.STRING, 1)
+ oprot.writeString(self.query)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class execute_result:
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRUCT, 'ex', (HiveServerException, HiveServerException.thrift_spec), None, ), # 1
+ )
+
+ def __init__(self, d=None):
+ self.ex = None
+ if isinstance(d, dict):
+ if 'ex' in d:
+ self.ex = d['ex']
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRUCT:
+ self.ex = HiveServerException()
+ self.ex.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('execute_result')
+ if self.ex != None:
+ oprot.writeFieldBegin('ex', TType.STRUCT, 1)
+ self.ex.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class fetchOne_args:
+
+ thrift_spec = (
+ )
+
+ def __init__(self, d=None):
+ pass
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('fetchOne_args')
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class fetchOne_result:
+
+ thrift_spec = (
+ (0, TType.STRING, 'success', None, None, ), # 0
+ (1, TType.STRUCT, 'ex', (HiveServerException, HiveServerException.thrift_spec), None, ), # 1
+ )
+
+ def __init__(self, d=None):
+ self.success = None
+ self.ex = None
+ if isinstance(d, dict):
+ if 'success' in d:
+ self.success = d['success']
+ if 'ex' in d:
+ self.ex = d['ex']
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 0:
+ if ftype == TType.STRING:
+ self.success = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 1:
+ if ftype == TType.STRUCT:
+ self.ex = HiveServerException()
+ self.ex.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('fetchOne_result')
+ if self.success != None:
+ oprot.writeFieldBegin('success', TType.STRING, 0)
+ oprot.writeString(self.success)
+ oprot.writeFieldEnd()
+ if self.ex != None:
+ oprot.writeFieldBegin('ex', TType.STRUCT, 1)
+ self.ex.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class fetchN_args:
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I32, 'numRows', None, None, ), # 1
+ )
+
+ def __init__(self, d=None):
+ self.numRows = None
+ if isinstance(d, dict):
+ if 'numRows' in d:
+ self.numRows = d['numRows']
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I32:
+ self.numRows = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('fetchN_args')
+ if self.numRows != None:
+ oprot.writeFieldBegin('numRows', TType.I32, 1)
+ oprot.writeI32(self.numRows)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class fetchN_result:
+
+ thrift_spec = (
+ (0, TType.LIST, 'success', (TType.STRING,None), None, ), # 0
+ (1, TType.STRUCT, 'ex', (HiveServerException, HiveServerException.thrift_spec), None, ), # 1
+ )
+
+ def __init__(self, d=None):
+ self.success = None
+ self.ex = None
+ if isinstance(d, dict):
+ if 'success' in d:
+ self.success = d['success']
+ if 'ex' in d:
+ self.ex = d['ex']
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 0:
+ if ftype == TType.LIST:
+ self.success = []
+ (_etype3, _size0) = iprot.readListBegin()
+ for _i4 in xrange(_size0):
+ _elem5 = iprot.readString();
+ self.success.append(_elem5)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 1:
+ if ftype == TType.STRUCT:
+ self.ex = HiveServerException()
+ self.ex.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('fetchN_result')
+ if self.success != None:
+ oprot.writeFieldBegin('success', TType.LIST, 0)
+ oprot.writeListBegin(TType.STRING, len(self.success))
+ for iter6 in self.success:
+ oprot.writeString(iter6)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.ex != None:
+ oprot.writeFieldBegin('ex', TType.STRUCT, 1)
+ self.ex.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class fetchAll_args:
+
+ thrift_spec = (
+ )
+
+ def __init__(self, d=None):
+ pass
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('fetchAll_args')
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class fetchAll_result:
+
+ thrift_spec = (
+ (0, TType.LIST, 'success', (TType.STRING,None), None, ), # 0
+ (1, TType.STRUCT, 'ex', (HiveServerException, HiveServerException.thrift_spec), None, ), # 1
+ )
+
+ def __init__(self, d=None):
+ self.success = None
+ self.ex = None
+ if isinstance(d, dict):
+ if 'success' in d:
+ self.success = d['success']
+ if 'ex' in d:
+ self.ex = d['ex']
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 0:
+ if ftype == TType.LIST:
+ self.success = []
+ (_etype10, _size7) = iprot.readListBegin()
+ for _i11 in xrange(_size7):
+ _elem12 = iprot.readString();
+ self.success.append(_elem12)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 1:
+ if ftype == TType.STRUCT:
+ self.ex = HiveServerException()
+ self.ex.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('fetchAll_result')
+ if self.success != None:
+ oprot.writeFieldBegin('success', TType.LIST, 0)
+ oprot.writeListBegin(TType.STRING, len(self.success))
+ for iter13 in self.success:
+ oprot.writeString(iter13)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.ex != None:
+ oprot.writeFieldBegin('ex', TType.STRUCT, 1)
+ self.ex.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class getSchema_args:
+
+ thrift_spec = (
+ )
+
+ def __init__(self, d=None):
+ pass
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('getSchema_args')
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class getSchema_result:
+
+ thrift_spec = (
+ (0, TType.STRING, 'success', None, None, ), # 0
+ (1, TType.STRUCT, 'ex', (HiveServerException, HiveServerException.thrift_spec), None, ), # 1
+ )
+
+ def __init__(self, d=None):
+ self.success = None
+ self.ex = None
+ if isinstance(d, dict):
+ if 'success' in d:
+ self.success = d['success']
+ if 'ex' in d:
+ self.ex = d['ex']
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 0:
+ if ftype == TType.STRING:
+ self.success = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 1:
+ if ftype == TType.STRUCT:
+ self.ex = HiveServerException()
+ self.ex.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('getSchema_result')
+ if self.success != None:
+ oprot.writeFieldBegin('success', TType.STRING, 0)
+ oprot.writeString(self.success)
+ oprot.writeFieldEnd()
+ if self.ex != None:
+ oprot.writeFieldBegin('ex', TType.STRUCT, 1)
+ self.ex.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+
Added: hadoop/hive/trunk/service/src/gen-py/hive_service/__init__.py
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/src/gen-py/hive_service/__init__.py?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/src/gen-py/hive_service/__init__.py (added)
+++ hadoop/hive/trunk/service/src/gen-py/hive_service/__init__.py Sun Dec 7 20:25:22 2008
@@ -0,0 +1 @@
+__all__ = ['ttypes', 'constants', 'ThriftHive']
Added: hadoop/hive/trunk/service/src/gen-py/hive_service/constants.py
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/src/gen-py/hive_service/constants.py?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/src/gen-py/hive_service/constants.py (added)
+++ hadoop/hive/trunk/service/src/gen-py/hive_service/constants.py Sun Dec 7 20:25:22 2008
@@ -0,0 +1,9 @@
+#
+# Autogenerated by Thrift
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+
+from thrift.Thrift import *
+from ttypes import *
+
Added: hadoop/hive/trunk/service/src/gen-py/hive_service/ttypes.py
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/src/gen-py/hive_service/ttypes.py?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/src/gen-py/hive_service/ttypes.py (added)
+++ hadoop/hive/trunk/service/src/gen-py/hive_service/ttypes.py Sun Dec 7 20:25:22 2008
@@ -0,0 +1,71 @@
+#
+# Autogenerated by Thrift
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+
+from thrift.Thrift import *
+import fb303.ttypes
+import hive_metastore.ttypes
+
+
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+try:
+ from thrift.protocol import fastbinary
+except:
+ fastbinary = None
+
+
+class HiveServerException(Exception):
+
+ thrift_spec = None
+ def __init__(self, d=None):
+ self.message = None
+ if isinstance(d, dict):
+ if 'message' in d:
+ self.message = d['message']
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == -1:
+ if ftype == TType.STRING:
+ self.message = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('HiveServerException')
+ if self.message != None:
+ oprot.writeFieldBegin('message', TType.STRING, -1)
+ oprot.writeString(self.message)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
Added: hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveClient.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveClient.java?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveClient.java (added)
+++ hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveClient.java Sun Dec 7 20:25:22 2008
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.service;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.service.ThriftHive;
+import org.apache.hadoop.hive.service.ThriftHive.*;
+import org.apache.hadoop.hive.service.HiveServerException;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.FacebookService;
+import com.facebook.fb303.fb_status;
+import com.facebook.thrift.TException;
+import com.facebook.thrift.protocol.TProtocol;
+
+import org.apache.hadoop.hive.metastore.api.*;
+
+/**
+ * Thrift Hive Client
+ * Just an empty class that can be used to run queries
+ * on a stand alone hive server
+ */
+public class HiveClient extends ThriftHive.Client implements HiveInterface {
+ public HiveClient(TProtocol prot) {
+ super(prot, prot);
+ }
+}
Added: hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveInterface.java?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveInterface.java (added)
+++ hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveInterface.java Sun Dec 7 20:25:22 2008
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.service;
+
+/**
+ * HiveInterface extends 2 interfaces, ThriftHive and ThriftHiveMetastore.
+ *
+ * ThriftHive.Iface is defined in:
+ * service/src/gen-javabean/org/apache/hadoop/hive/service/ThriftHive.java
+ * ThriftHiveMetastore.Iface is defined in:
+ * metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java
+ *
+ * These interfaces are generated by Thrift. The thrift files are in:
+ * ThriftHive: service/if/hive_service.thrift
+ * ThriftHiveMetastore: metastore/if/hive_metastore.thrift
+ */
+public interface HiveInterface extends ThriftHive.Iface, org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface {
+}
Added: hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java (added)
+++ hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java Sun Dec 7 20:25:22 2008
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.service;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.service.ThriftHive;
+import org.apache.hadoop.hive.service.ThriftHive.*;
+import org.apache.hadoop.hive.service.HiveServerException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.FacebookService;
+import com.facebook.fb303.fb_status;
+import com.facebook.thrift.TException;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.server.TServer;
+import com.facebook.thrift.server.TThreadPoolServer;
+import com.facebook.thrift.transport.TServerSocket;
+import com.facebook.thrift.transport.TServerTransport;
+import com.facebook.thrift.transport.TTransportFactory;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.*;
+
+/**
+ * Thrift Hive Server Implementation
+ */
+public class HiveServer extends ThriftHive {
+ private final static String VERSION = "0";
+
+ /**
+ * Handler which implements the Hive Interface
+ * This class can be used in lieu of the HiveClient class
+ * to get an embedded server
+ */
+ public static class HiveServerHandler extends HiveMetaStore.HMSHandler implements HiveInterface {
+
+ /**
+ * Hive server uses org.apache.hadoop.hive.ql.Driver for run() and
+ * getResults() methods.
+ * TODO: There should be one Driver object per query statement executed
+ * TODO: That will allow clients to run multiple queries simulteneously
+ */
+ private Driver driver;
+
+ /**
+ * Stores state per connection
+ */
+ private SessionState session;
+
+ public static final Log LOG = LogFactory.getLog(HiveServer.class.getName());
+
+ /**
+ * A constructor.
+ */
+ public HiveServerHandler() throws MetaException {
+ super(HiveServer.class.getName());
+ session = new SessionState(new HiveConf(SessionState.class));
+ SessionState.start(session);
+ HiveConf conf = session.get().getConf();
+ session.in = null;
+ session.out = null;
+ session.err = null;
+ driver = new Driver();
+ }
+
+ /**
+ * Executes a query.
+ *
+ * @param query HiveQL query to execute
+ */
+ @Override
+ public void execute(String query) throws HiveServerException, TException {
+ HiveServerHandler.LOG.info("Running the query: " + query);
+ int rc = 0;
+ // TODO: driver.run should either return int or throw exception, not both.
+ try {
+ rc = driver.run(query);
+ } catch (Exception e) {
+ throw new HiveServerException("Error running query: " + e.toString());
+ }
+ if (rc != 0) {
+ throw new HiveServerException("Query returned non-zero code: " + rc);
+ }
+ }
+
+ /**
+ * Return the schema of the query result
+ */
+ @Override
+ public String getSchema() throws HiveServerException, TException {
+ try {
+ return driver.getSchema();
+ }
+ catch (Exception e) {
+ throw new HiveServerException("Unable to get schema: " + e.toString());
+ }
+ }
+
+ /**
+ * Fetches the next row in a query result set.
+ *
+ * @return the next row in a query result set. null if there is no more row to fetch.
+ */
+ @Override
+ public String fetchOne() throws HiveServerException, TException {
+ driver.setMaxRows(1);
+ Vector<String> result = new Vector<String>();
+ if (driver.getResults(result)) {
+ return result.get(0);
+ }
+ // TODO: Cannot return null here because thrift cannot handle nulls
+ // TODO: Returning empty string for now. Need to figure out how to
+ // TODO: return null in some other way
+ return "";
+ }
+
+ /**
+ * Fetches numRows rows.
+ *
+ * @param numRows Number of rows to fetch.
+ * @return A list of rows. The size of the list is numRows if there are at least
+ * numRows rows available to return. The size is smaller than numRows if
+ * there aren't enough rows. The list will be empty if there is no more
+ * row to fetch or numRows == 0.
+ * @throws HiveServerException Invalid value for numRows (numRows < 0)
+ */
+ @Override
+ public List<String> fetchN(int numRows) throws HiveServerException, TException {
+ if (numRows < 0) {
+ throw new HiveServerException("Invalid argument for number of rows: " + numRows);
+ }
+ Vector<String> result = new Vector<String>();
+ driver.setMaxRows(numRows);
+ driver.getResults(result);
+ return result;
+ }
+
+ /**
+ * Fetches all the rows in a result set.
+ *
+ * @return All the rows in a result set of a query executed using execute method.
+ *
+ * TODO: Currently the server buffers all the rows before returning them
+ * to the client. Decide whether the buffering should be done in the client.
+ */
+ @Override
+ public List<String> fetchAll() throws HiveServerException, TException {
+ Vector<String> rows = new Vector<String>();
+ Vector<String> result = new Vector<String>();
+ while (driver.getResults(result)) {
+ rows.addAll(result);
+ result.clear();
+ }
+ return rows;
+ }
+
+ /**
+ * Return the status of the server
+ */
+ @Override
+ public int getStatus() {
+ return 0;
+ }
+
+ /**
+ * Return the version of the server software
+ */
+ @Override
+ public String getVersion() {
+ return VERSION;
+ }
+ }
+
+ public static void main(String[] args) {
+ try {
+ int port = 10000;
+ if (args.length > 1) {
+ port = Integer.getInteger(args[0]);
+ }
+ TServerTransport serverTransport = new TServerSocket(port);
+ Iface handler = new HiveServerHandler();
+ FacebookService.Processor processor = new ThriftHive.Processor(handler);
+ TThreadPoolServer.Options options = new TThreadPoolServer.Options();
+ TServer server = new TThreadPoolServer(processor, serverTransport,
+ new TTransportFactory(), new TTransportFactory(),
+ new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), options);
+ server.serve();
+ HiveServerHandler.LOG.info("Started the new hive server on port " + port);
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ }
+}
Added: hadoop/hive/trunk/service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java?rev=724247&view=auto
==============================================================================
--- hadoop/hive/trunk/service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java (added)
+++ hadoop/hive/trunk/service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java Sun Dec 7 20:25:22 2008
@@ -0,0 +1,194 @@
+package org.apache.hadoop.hive.service;
+
+import java.util.*;
+import org.apache.hadoop.fs.Path;
+import junit.framework.TestCase;
+import org.apache.hadoop.hive.service.HiveInterface;
+import org.apache.hadoop.hive.service.HiveClient;
+import org.apache.hadoop.hive.service.HiveServer;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.transport.TSocket;
+import com.facebook.thrift.transport.TTransport;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+
+public class TestHiveServer extends TestCase {
+
+ private HiveInterface client;
+ private final static String host = "localhost";
+ private final static int port = 10000;
+ private Path dataFilePath;
+
+ private static String tableName = "testhivedrivertable";
+ private HiveConf conf;
+ private boolean standAloneServer = false;
+ private TTransport transport;
+
+ public TestHiveServer(String name) {
+ super(name);
+ conf = new HiveConf(TestHiveServer.class);
+ String dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+ dataFilePath = new Path(dataFileDir, "kv1.txt");
+ // See data/conf/hive-site.xml
+ standAloneServer = System.getProperty("test.service.standalone.server").equals("true");
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ if (standAloneServer) {
+ try {
+ transport = new TSocket(host, port);
+ TProtocol protocol = new TBinaryProtocol(transport);
+ client = new HiveClient(protocol);
+ transport.open();
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ else {
+ client = new HiveServer.HiveServerHandler();
+ }
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (standAloneServer) {
+ transport.close();
+ }
+ }
+
+ public void testExecute() throws Exception {
+ try {
+ client.execute("drop table " + tableName);
+ } catch (Exception ex) {
+ }
+
+ try {
+ client.execute("create table " + tableName + " (num int)");
+ client.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tableName);
+ client.execute("select count(1) as cnt from " + tableName);
+ String row = client.fetchOne();
+ assertEquals(row, "500");
+ String schema = client.getSchema();
+ assertEquals("struct result { string cnt}", schema);
+ client.execute("drop table " + tableName);
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ public void notestExecute() throws Exception {
+ try {
+ client.execute("drop table " + tableName);
+ } catch (Exception ex) {
+ }
+
+ client.execute("create table " + tableName + " (num int)");
+ client.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tableName);
+ client.execute("select count(1) from " + tableName);
+ String row = client.fetchOne();
+ assertEquals(row, "500");
+ client.execute("drop table " + tableName);
+ transport.close();
+ }
+
+ /**
+ * Test metastore call
+ */
+ public void testMetastore() throws Exception {
+ try {
+ client.execute("drop table " + tableName);
+ } catch (Exception ex) {
+ }
+
+ client.execute("create table " + tableName + " (num int)");
+ List<String> tabs = client.get_tables("default", tableName);
+ assertEquals(tabs.get(0), tableName);
+ client.execute("drop table " + tableName);
+ }
+
+ /**
+ *
+ */
+ public void testFetch() throws Exception {
+ // create and populate a table with 500 rows.
+ try {
+ client.execute("drop table " + tableName);
+ } catch (Exception ex) {
+ }
+ client.execute("create table " + tableName + " (key int, value string)");
+ client.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tableName);
+
+ try {
+ // fetchAll test
+ client.execute("select key, value from " + tableName);
+ assertEquals(client.fetchAll().size(), 500);
+ assertEquals(client.fetchAll().size(), 0);
+
+ // fetchOne test
+ client.execute("select key, value from " + tableName);
+ for (int i = 0; i < 500; i++) {
+ String str = client.fetchOne();
+ if (str.equals("")) {
+ assertTrue(false);
+ }
+ }
+ assertEquals(client.fetchOne(), "");
+
+ // fetchN test
+ client.execute("select key, value from " + tableName);
+ assertEquals(client.fetchN(499).size(), 499);
+ assertEquals(client.fetchN(499).size(), 1);
+ assertEquals(client.fetchN(499).size(), 0);
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void testDynamicSerde() throws Exception {
+ try {
+ client.execute("drop table " + tableName);
+ } catch (Exception ex) {
+ }
+
+ client.execute("create table " + tableName + " (key int, value string)");
+ client.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tableName);
+ //client.execute("select key, count(1) from " + tableName + " where key > 10 group by key");
+ String sql = "select key, value from " + tableName + " where key > 10";
+ client.execute(sql);
+
+ // Instantiate DynamicSerDe
+ DynamicSerDe ds = new DynamicSerDe();
+ Properties dsp = new Properties();
+ dsp.setProperty(Constants.SERIALIZATION_FORMAT, org.apache.hadoop.hive.serde2.thrift.TCTLSeparatedProtocol.class.getName());
+ dsp.setProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_NAME, "result");
+ dsp.setProperty(Constants.SERIALIZATION_DDL, client.getSchema());
+ dsp.setProperty(Constants.SERIALIZATION_LIB, ds.getClass().toString());
+ dsp.setProperty(Constants.FIELD_DELIM, "9");
+ ds.initialize(new Configuration(), dsp);
+
+ String row = client.fetchOne();
+ Object o = ds.deserialize(new BytesWritable(row.getBytes()));
+
+ assertEquals(o.getClass().toString(), "class java.util.ArrayList");
+ List<?> lst = (List<?>)o;
+ assertEquals(lst.get(0), "238");
+
+ // TODO: serde doesn't like underscore -- struct result { string _c0}
+ sql = "select count(1) as c from " + tableName;
+ client.execute(sql);
+ row = client.fetchOne();
+ dsp.setProperty(Constants.SERIALIZATION_DDL, client.getSchema());
+ ds.initialize(new Configuration(), dsp);
+ o = ds.deserialize(new BytesWritable(row.getBytes()));
+ }
+
+}