You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2013/09/10 03:13:21 UTC
svn commit: r1521326 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/
jdbc/src/java/org/apache/hive/jdbc/ service/if/
service/src/gen/thrift/gen-cpp/
service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/
service/sr...
Author: thejas
Date: Tue Sep 10 01:13:20 2013
New Revision: 1521326
URL: http://svn.apache.org/r1521326
Log:
HIVE-4617: Asynchronous execution in HiveServer2 to run a query in non-blocking mode (Jaideep Dhok & Vaibhav Gumashta via Thejas Nair)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
hive/trunk/service/if/TCLIService.thrift
hive/trunk/service/src/gen/thrift/gen-cpp/TCLIService_types.cpp
hive/trunk/service/src/gen/thrift/gen-cpp/TCLIService_types.h
hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementReq.java
hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOpenSessionReq.java
hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOpenSessionResp.java
hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOperationState.java
hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TProtocolVersion.java
hive/trunk/service/src/gen/thrift/gen-py/TCLIService/ttypes.py
hive/trunk/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
hive/trunk/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java
hive/trunk/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
hive/trunk/service/src/java/org/apache/hive/service/cli/ICLIService.java
hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Sep 10 01:13:20 2013
@@ -736,7 +736,13 @@ public class HiveConf extends Configurat
HIVE_ENTITY_SEPARATOR("hive.entity.separator", "@"),
HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS("hive.server2.thrift.min.worker.threads", 5),
- HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 100),
+ HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 500),
+
+ // Configuration for async thread pool in SessionManager
+ // Number of async threads
+ HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 50),
+ // Number of seconds HiveServer2 shutdown will wait for async threads to terminate
+ HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10),
HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000),
HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", ""),
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Tue Sep 10 01:13:20 2013
@@ -1854,11 +1854,24 @@
<property>
<name>hive.server2.thrift.max.worker.threads</name>
- <value>100</value>
+ <value>500</value>
<description>Maximum number of Thrift worker threads</description>
</property>
<property>
+ <name>hive.server2.async.exec.threads</name>
+ <value>50</value>
+ <description>Number of threads in the async thread pool for HiveServer2</description>
+</property>
+
+<property>
+ <name>hive.server2.async.exec.shutdown.timeout</name>
+ <value>10</value>
+ <description>Time (in seconds) for which HiveServer2 shutdown will wait for async
+ threads to terminate</description>
+</property>
+
+<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
<description>Port number of HiveServer2 Thrift interface.
Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java Tue Sep 10 01:13:20 2013
@@ -101,9 +101,11 @@ public class HiveConnection implements j
openTransport(uri, connParams.getHost(), connParams.getPort(), connParams.getSessionVars());
}
- // currently only V1 is supported
+ // add supported protocols: V1 and V2 supported
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
+ supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2);
+
// open client session
openSession(uri);
Modified: hive/trunk/service/if/TCLIService.thrift
URL: http://svn.apache.org/viewvc/hive/trunk/service/if/TCLIService.thrift?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/if/TCLIService.thrift (original)
+++ hive/trunk/service/if/TCLIService.thrift Tue Sep 10 01:13:20 2013
@@ -38,7 +38,10 @@ namespace cpp apache.hive.service.cli.th
// List of protocol versions. A new token should be
// added to the end of this list every time a change is made.
enum TProtocolVersion {
- HIVE_CLI_SERVICE_PROTOCOL_V1
+ HIVE_CLI_SERVICE_PROTOCOL_V1,
+
+ // V2 adds support for asynchronous execution
+ HIVE_CLI_SERVICE_PROTOCOL_V2
}
enum TTypeId {
@@ -356,6 +359,9 @@ enum TOperationState {
// The operation is in an unrecognized state
UKNOWN_STATE,
+
+ // The operation is in an pending state
+ PENDING_STATE,
}
@@ -452,7 +458,7 @@ struct TOperationHandle {
// which operations may be executed.
struct TOpenSessionReq {
// The version of the HiveServer2 protocol that the client is using.
- 1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1
+ 1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
// Username and password for authentication.
// Depending on the authentication scheme being used,
@@ -471,7 +477,7 @@ struct TOpenSessionResp {
1: required TStatus status
// The protocol version that the server is using.
- 2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1
+ 2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
// Session Handle
3: optional TSessionHandle sessionHandle
@@ -582,7 +588,7 @@ struct TGetInfoResp {
// status of the statement, and to fetch results once the
// statement has finished executing.
struct TExecuteStatementReq {
- // The session to exexcute the statement against
+ // The session to execute the statement against
1: required TSessionHandle sessionHandle
// The statement to be executed (DML, DDL, SET, etc)
@@ -593,6 +599,9 @@ struct TExecuteStatementReq {
// is executed. These properties apply to this statement
// only and will not affect the subsequent state of the Session.
3: optional map<string, string> confOverlay
+
+ // Execute asynchronously when runAsync is true
+ 4: optional bool runAsync = false
}
struct TExecuteStatementResp {
@@ -600,7 +609,6 @@ struct TExecuteStatementResp {
2: optional TOperationHandle operationHandle
}
-
// GetTypeInfo()
//
// Get information about types supported by the HiveServer instance.
Modified: hive/trunk/service/src/gen/thrift/gen-cpp/TCLIService_types.cpp
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/gen/thrift/gen-cpp/TCLIService_types.cpp?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/gen/thrift/gen-cpp/TCLIService_types.cpp (original)
+++ hive/trunk/service/src/gen/thrift/gen-cpp/TCLIService_types.cpp Tue Sep 10 01:13:20 2013
@@ -11,12 +11,14 @@
namespace apache { namespace hive { namespace service { namespace cli { namespace thrift {
int _kTProtocolVersionValues[] = {
- TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1
+ TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1,
+ TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V2
};
const char* _kTProtocolVersionNames[] = {
- "HIVE_CLI_SERVICE_PROTOCOL_V1"
+ "HIVE_CLI_SERVICE_PROTOCOL_V1",
+ "HIVE_CLI_SERVICE_PROTOCOL_V2"
};
-const std::map<int, const char*> _TProtocolVersion_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(1, _kTProtocolVersionValues, _kTProtocolVersionNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _TProtocolVersion_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(2, _kTProtocolVersionValues, _kTProtocolVersionNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
int _kTTypeIdValues[] = {
TTypeId::BOOLEAN_TYPE,
@@ -83,7 +85,8 @@ int _kTOperationStateValues[] = {
TOperationState::CANCELED_STATE,
TOperationState::CLOSED_STATE,
TOperationState::ERROR_STATE,
- TOperationState::UKNOWN_STATE
+ TOperationState::UKNOWN_STATE,
+ TOperationState::PENDING_STATE
};
const char* _kTOperationStateNames[] = {
"INITIALIZED_STATE",
@@ -92,9 +95,10 @@ const char* _kTOperationStateNames[] = {
"CANCELED_STATE",
"CLOSED_STATE",
"ERROR_STATE",
- "UKNOWN_STATE"
+ "UKNOWN_STATE",
+ "PENDING_STATE"
};
-const std::map<int, const char*> _TOperationState_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(7, _kTOperationStateValues, _kTOperationStateNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _TOperationState_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(8, _kTOperationStateValues, _kTOperationStateNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
int _kTOperationTypeValues[] = {
TOperationType::EXECUTE_STATEMENT,
@@ -3259,8 +3263,8 @@ void swap(TGetInfoResp &a, TGetInfoResp
swap(a.infoValue, b.infoValue);
}
-const char* TExecuteStatementReq::ascii_fingerprint = "4CDA19909D21B7D9907F85E3387EAB27";
-const uint8_t TExecuteStatementReq::binary_fingerprint[16] = {0x4C,0xDA,0x19,0x90,0x9D,0x21,0xB7,0xD9,0x90,0x7F,0x85,0xE3,0x38,0x7E,0xAB,0x27};
+const char* TExecuteStatementReq::ascii_fingerprint = "FED75DB77E66D76EC1939A51FB0D96FA";
+const uint8_t TExecuteStatementReq::binary_fingerprint[16] = {0xFE,0xD7,0x5D,0xB7,0x7E,0x66,0xD7,0x6E,0xC1,0x93,0x9A,0x51,0xFB,0x0D,0x96,0xFA};
uint32_t TExecuteStatementReq::read(::apache::thrift::protocol::TProtocol* iprot) {
@@ -3323,6 +3327,14 @@ uint32_t TExecuteStatementReq::read(::ap
xfer += iprot->skip(ftype);
}
break;
+ case 4:
+ if (ftype == ::apache::thrift::protocol::T_BOOL) {
+ xfer += iprot->readBool(this->runAsync);
+ this->__isset.runAsync = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -3365,6 +3377,11 @@ uint32_t TExecuteStatementReq::write(::a
}
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.runAsync) {
+ xfer += oprot->writeFieldBegin("runAsync", ::apache::thrift::protocol::T_BOOL, 4);
+ xfer += oprot->writeBool(this->runAsync);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -3375,6 +3392,7 @@ void swap(TExecuteStatementReq &a, TExec
swap(a.sessionHandle, b.sessionHandle);
swap(a.statement, b.statement);
swap(a.confOverlay, b.confOverlay);
+ swap(a.runAsync, b.runAsync);
swap(a.__isset, b.__isset);
}
Modified: hive/trunk/service/src/gen/thrift/gen-cpp/TCLIService_types.h
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/gen/thrift/gen-cpp/TCLIService_types.h?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/gen/thrift/gen-cpp/TCLIService_types.h (original)
+++ hive/trunk/service/src/gen/thrift/gen-cpp/TCLIService_types.h Tue Sep 10 01:13:20 2013
@@ -18,7 +18,8 @@ namespace apache { namespace hive { name
struct TProtocolVersion {
enum type {
- HIVE_CLI_SERVICE_PROTOCOL_V1 = 0
+ HIVE_CLI_SERVICE_PROTOCOL_V1 = 0,
+ HIVE_CLI_SERVICE_PROTOCOL_V2 = 1
};
};
@@ -69,7 +70,8 @@ struct TOperationState {
CANCELED_STATE = 3,
CLOSED_STATE = 4,
ERROR_STATE = 5,
- UKNOWN_STATE = 6
+ UKNOWN_STATE = 6,
+ PENDING_STATE = 7
};
};
@@ -1467,8 +1469,8 @@ class TOpenSessionReq {
static const char* ascii_fingerprint; // = "C8FD0F306A16C16BDA7B57F58BFAE5B2";
static const uint8_t binary_fingerprint[16]; // = {0xC8,0xFD,0x0F,0x30,0x6A,0x16,0xC1,0x6B,0xDA,0x7B,0x57,0xF5,0x8B,0xFA,0xE5,0xB2};
- TOpenSessionReq() : client_protocol((TProtocolVersion::type)0), username(), password() {
- client_protocol = (TProtocolVersion::type)0;
+ TOpenSessionReq() : client_protocol((TProtocolVersion::type)1), username(), password() {
+ client_protocol = (TProtocolVersion::type)1;
}
@@ -1543,8 +1545,8 @@ class TOpenSessionResp {
static const char* ascii_fingerprint; // = "CFE7D7F4E9EC671F2518ED74FEE9F163";
static const uint8_t binary_fingerprint[16]; // = {0xCF,0xE7,0xD7,0xF4,0xE9,0xEC,0x67,0x1F,0x25,0x18,0xED,0x74,0xFE,0xE9,0xF1,0x63};
- TOpenSessionResp() : serverProtocolVersion((TProtocolVersion::type)0) {
- serverProtocolVersion = (TProtocolVersion::type)0;
+ TOpenSessionResp() : serverProtocolVersion((TProtocolVersion::type)1) {
+ serverProtocolVersion = (TProtocolVersion::type)1;
}
@@ -1850,17 +1852,18 @@ class TGetInfoResp {
void swap(TGetInfoResp &a, TGetInfoResp &b);
typedef struct _TExecuteStatementReq__isset {
- _TExecuteStatementReq__isset() : confOverlay(false) {}
+ _TExecuteStatementReq__isset() : confOverlay(false), runAsync(true) {}
bool confOverlay;
+ bool runAsync;
} _TExecuteStatementReq__isset;
class TExecuteStatementReq {
public:
- static const char* ascii_fingerprint; // = "4CDA19909D21B7D9907F85E3387EAB27";
- static const uint8_t binary_fingerprint[16]; // = {0x4C,0xDA,0x19,0x90,0x9D,0x21,0xB7,0xD9,0x90,0x7F,0x85,0xE3,0x38,0x7E,0xAB,0x27};
+ static const char* ascii_fingerprint; // = "FED75DB77E66D76EC1939A51FB0D96FA";
+ static const uint8_t binary_fingerprint[16]; // = {0xFE,0xD7,0x5D,0xB7,0x7E,0x66,0xD7,0x6E,0xC1,0x93,0x9A,0x51,0xFB,0x0D,0x96,0xFA};
- TExecuteStatementReq() : statement() {
+ TExecuteStatementReq() : statement(), runAsync(false) {
}
virtual ~TExecuteStatementReq() throw() {}
@@ -1868,6 +1871,7 @@ class TExecuteStatementReq {
TSessionHandle sessionHandle;
std::string statement;
std::map<std::string, std::string> confOverlay;
+ bool runAsync;
_TExecuteStatementReq__isset __isset;
@@ -1884,6 +1888,11 @@ class TExecuteStatementReq {
__isset.confOverlay = true;
}
+ void __set_runAsync(const bool val) {
+ runAsync = val;
+ __isset.runAsync = true;
+ }
+
bool operator == (const TExecuteStatementReq & rhs) const
{
if (!(sessionHandle == rhs.sessionHandle))
@@ -1894,6 +1903,10 @@ class TExecuteStatementReq {
return false;
else if (__isset.confOverlay && !(confOverlay == rhs.confOverlay))
return false;
+ if (__isset.runAsync != rhs.__isset.runAsync)
+ return false;
+ else if (__isset.runAsync && !(runAsync == rhs.runAsync))
+ return false;
return true;
}
bool operator != (const TExecuteStatementReq &rhs) const {
Modified: hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementReq.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementReq.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementReq.java (original)
+++ hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementReq.java Tue Sep 10 01:13:20 2013
@@ -37,6 +37,7 @@ public class TExecuteStatementReq implem
private static final org.apache.thrift.protocol.TField SESSION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("sessionHandle", org.apache.thrift.protocol.TType.STRUCT, (short)1);
private static final org.apache.thrift.protocol.TField STATEMENT_FIELD_DESC = new org.apache.thrift.protocol.TField("statement", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField CONF_OVERLAY_FIELD_DESC = new org.apache.thrift.protocol.TField("confOverlay", org.apache.thrift.protocol.TType.MAP, (short)3);
+ private static final org.apache.thrift.protocol.TField RUN_ASYNC_FIELD_DESC = new org.apache.thrift.protocol.TField("runAsync", org.apache.thrift.protocol.TType.BOOL, (short)4);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -47,12 +48,14 @@ public class TExecuteStatementReq implem
private TSessionHandle sessionHandle; // required
private String statement; // required
private Map<String,String> confOverlay; // optional
+ private boolean runAsync; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
SESSION_HANDLE((short)1, "sessionHandle"),
STATEMENT((short)2, "statement"),
- CONF_OVERLAY((short)3, "confOverlay");
+ CONF_OVERLAY((short)3, "confOverlay"),
+ RUN_ASYNC((short)4, "runAsync");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -73,6 +76,8 @@ public class TExecuteStatementReq implem
return STATEMENT;
case 3: // CONF_OVERLAY
return CONF_OVERLAY;
+ case 4: // RUN_ASYNC
+ return RUN_ASYNC;
default:
return null;
}
@@ -113,7 +118,9 @@ public class TExecuteStatementReq implem
}
// isset id assignments
- private _Fields optionals[] = {_Fields.CONF_OVERLAY};
+ private static final int __RUNASYNC_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ private _Fields optionals[] = {_Fields.CONF_OVERLAY,_Fields.RUN_ASYNC};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -125,11 +132,15 @@ public class TExecuteStatementReq implem
new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+ tmpMap.put(_Fields.RUN_ASYNC, new org.apache.thrift.meta_data.FieldMetaData("runAsync", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TExecuteStatementReq.class, metaDataMap);
}
public TExecuteStatementReq() {
+ this.runAsync = false;
+
}
public TExecuteStatementReq(
@@ -145,6 +156,7 @@ public class TExecuteStatementReq implem
* Performs a deep copy on <i>other</i>.
*/
public TExecuteStatementReq(TExecuteStatementReq other) {
+ __isset_bitfield = other.__isset_bitfield;
if (other.isSetSessionHandle()) {
this.sessionHandle = new TSessionHandle(other.sessionHandle);
}
@@ -166,6 +178,7 @@ public class TExecuteStatementReq implem
}
this.confOverlay = __this__confOverlay;
}
+ this.runAsync = other.runAsync;
}
public TExecuteStatementReq deepCopy() {
@@ -177,6 +190,8 @@ public class TExecuteStatementReq implem
this.sessionHandle = null;
this.statement = null;
this.confOverlay = null;
+ this.runAsync = false;
+
}
public TSessionHandle getSessionHandle() {
@@ -259,6 +274,28 @@ public class TExecuteStatementReq implem
}
}
+ public boolean isRunAsync() {
+ return this.runAsync;
+ }
+
+ public void setRunAsync(boolean runAsync) {
+ this.runAsync = runAsync;
+ setRunAsyncIsSet(true);
+ }
+
+ public void unsetRunAsync() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __RUNASYNC_ISSET_ID);
+ }
+
+ /** Returns true if field runAsync is set (has been assigned a value) and false otherwise */
+ public boolean isSetRunAsync() {
+ return EncodingUtils.testBit(__isset_bitfield, __RUNASYNC_ISSET_ID);
+ }
+
+ public void setRunAsyncIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __RUNASYNC_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case SESSION_HANDLE:
@@ -285,6 +322,14 @@ public class TExecuteStatementReq implem
}
break;
+ case RUN_ASYNC:
+ if (value == null) {
+ unsetRunAsync();
+ } else {
+ setRunAsync((Boolean)value);
+ }
+ break;
+
}
}
@@ -299,6 +344,9 @@ public class TExecuteStatementReq implem
case CONF_OVERLAY:
return getConfOverlay();
+ case RUN_ASYNC:
+ return Boolean.valueOf(isRunAsync());
+
}
throw new IllegalStateException();
}
@@ -316,6 +364,8 @@ public class TExecuteStatementReq implem
return isSetStatement();
case CONF_OVERLAY:
return isSetConfOverlay();
+ case RUN_ASYNC:
+ return isSetRunAsync();
}
throw new IllegalStateException();
}
@@ -360,6 +410,15 @@ public class TExecuteStatementReq implem
return false;
}
+ boolean this_present_runAsync = true && this.isSetRunAsync();
+ boolean that_present_runAsync = true && that.isSetRunAsync();
+ if (this_present_runAsync || that_present_runAsync) {
+ if (!(this_present_runAsync && that_present_runAsync))
+ return false;
+ if (this.runAsync != that.runAsync)
+ return false;
+ }
+
return true;
}
@@ -382,6 +441,11 @@ public class TExecuteStatementReq implem
if (present_confOverlay)
builder.append(confOverlay);
+ boolean present_runAsync = true && (isSetRunAsync());
+ builder.append(present_runAsync);
+ if (present_runAsync)
+ builder.append(runAsync);
+
return builder.toHashCode();
}
@@ -423,6 +487,16 @@ public class TExecuteStatementReq implem
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetRunAsync()).compareTo(typedOther.isSetRunAsync());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRunAsync()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.runAsync, typedOther.runAsync);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -468,6 +542,12 @@ public class TExecuteStatementReq implem
}
first = false;
}
+ if (isSetRunAsync()) {
+ if (!first) sb.append(", ");
+ sb.append("runAsync:");
+ sb.append(this.runAsync);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -498,6 +578,8 @@ public class TExecuteStatementReq implem
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -559,6 +641,14 @@ public class TExecuteStatementReq implem
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 4: // RUN_ASYNC
+ if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+ struct.runAsync = iprot.readBool();
+ struct.setRunAsyncIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -597,6 +687,11 @@ public class TExecuteStatementReq implem
oprot.writeFieldEnd();
}
}
+ if (struct.isSetRunAsync()) {
+ oprot.writeFieldBegin(RUN_ASYNC_FIELD_DESC);
+ oprot.writeBool(struct.runAsync);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -620,7 +715,10 @@ public class TExecuteStatementReq implem
if (struct.isSetConfOverlay()) {
optionals.set(0);
}
- oprot.writeBitSet(optionals, 1);
+ if (struct.isSetRunAsync()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
if (struct.isSetConfOverlay()) {
{
oprot.writeI32(struct.confOverlay.size());
@@ -631,6 +729,9 @@ public class TExecuteStatementReq implem
}
}
}
+ if (struct.isSetRunAsync()) {
+ oprot.writeBool(struct.runAsync);
+ }
}
@Override
@@ -641,7 +742,7 @@ public class TExecuteStatementReq implem
struct.setSessionHandleIsSet(true);
struct.statement = iprot.readString();
struct.setStatementIsSet(true);
- BitSet incoming = iprot.readBitSet(1);
+ BitSet incoming = iprot.readBitSet(2);
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TMap _map150 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
@@ -657,6 +758,10 @@ public class TExecuteStatementReq implem
}
struct.setConfOverlayIsSet(true);
}
+ if (incoming.get(1)) {
+ struct.runAsync = iprot.readBool();
+ struct.setRunAsyncIsSet(true);
+ }
}
}
Modified: hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOpenSessionReq.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOpenSessionReq.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOpenSessionReq.java (original)
+++ hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOpenSessionReq.java Tue Sep 10 01:13:20 2013
@@ -141,7 +141,7 @@ public class TOpenSessionReq implements
}
public TOpenSessionReq() {
- this.client_protocol = org.apache.hive.service.cli.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1;
+ this.client_protocol = org.apache.hive.service.cli.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2;
}
@@ -188,7 +188,7 @@ public class TOpenSessionReq implements
@Override
public void clear() {
- this.client_protocol = org.apache.hive.service.cli.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1;
+ this.client_protocol = org.apache.hive.service.cli.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2;
this.username = null;
this.password = null;
Modified: hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOpenSessionResp.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOpenSessionResp.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOpenSessionResp.java (original)
+++ hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOpenSessionResp.java Tue Sep 10 01:13:20 2013
@@ -141,7 +141,7 @@ public class TOpenSessionResp implements
}
public TOpenSessionResp() {
- this.serverProtocolVersion = org.apache.hive.service.cli.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1;
+ this.serverProtocolVersion = org.apache.hive.service.cli.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2;
}
@@ -191,7 +191,7 @@ public class TOpenSessionResp implements
@Override
public void clear() {
this.status = null;
- this.serverProtocolVersion = org.apache.hive.service.cli.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1;
+ this.serverProtocolVersion = org.apache.hive.service.cli.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2;
this.sessionHandle = null;
this.configuration = null;
Modified: hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOperationState.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOperationState.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOperationState.java (original)
+++ hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TOperationState.java Tue Sep 10 01:13:20 2013
@@ -18,7 +18,8 @@ public enum TOperationState implements o
CANCELED_STATE(3),
CLOSED_STATE(4),
ERROR_STATE(5),
- UKNOWN_STATE(6);
+ UKNOWN_STATE(6),
+ PENDING_STATE(7);
private final int value;
@@ -53,6 +54,8 @@ public enum TOperationState implements o
return ERROR_STATE;
case 6:
return UKNOWN_STATE;
+ case 7:
+ return PENDING_STATE;
default:
return null;
}
Modified: hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TProtocolVersion.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TProtocolVersion.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TProtocolVersion.java (original)
+++ hive/trunk/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TProtocolVersion.java Tue Sep 10 01:13:20 2013
@@ -12,7 +12,8 @@ import java.util.HashMap;
import org.apache.thrift.TEnum;
public enum TProtocolVersion implements org.apache.thrift.TEnum {
- HIVE_CLI_SERVICE_PROTOCOL_V1(0);
+ HIVE_CLI_SERVICE_PROTOCOL_V1(0),
+ HIVE_CLI_SERVICE_PROTOCOL_V2(1);
private final int value;
@@ -35,6 +36,8 @@ public enum TProtocolVersion implements
switch (value) {
case 0:
return HIVE_CLI_SERVICE_PROTOCOL_V1;
+ case 1:
+ return HIVE_CLI_SERVICE_PROTOCOL_V2;
default:
return null;
}
Modified: hive/trunk/service/src/gen/thrift/gen-py/TCLIService/ttypes.py
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/gen/thrift/gen-py/TCLIService/ttypes.py?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/gen/thrift/gen-py/TCLIService/ttypes.py (original)
+++ hive/trunk/service/src/gen/thrift/gen-py/TCLIService/ttypes.py Tue Sep 10 01:13:20 2013
@@ -18,13 +18,16 @@ except:
class TProtocolVersion:
HIVE_CLI_SERVICE_PROTOCOL_V1 = 0
+ HIVE_CLI_SERVICE_PROTOCOL_V2 = 1
_VALUES_TO_NAMES = {
0: "HIVE_CLI_SERVICE_PROTOCOL_V1",
+ 1: "HIVE_CLI_SERVICE_PROTOCOL_V2",
}
_NAMES_TO_VALUES = {
"HIVE_CLI_SERVICE_PROTOCOL_V1": 0,
+ "HIVE_CLI_SERVICE_PROTOCOL_V2": 1,
}
class TTypeId:
@@ -120,6 +123,7 @@ class TOperationState:
CLOSED_STATE = 4
ERROR_STATE = 5
UKNOWN_STATE = 6
+ PENDING_STATE = 7
_VALUES_TO_NAMES = {
0: "INITIALIZED_STATE",
@@ -129,6 +133,7 @@ class TOperationState:
4: "CLOSED_STATE",
5: "ERROR_STATE",
6: "UKNOWN_STATE",
+ 7: "PENDING_STATE",
}
_NAMES_TO_VALUES = {
@@ -139,6 +144,7 @@ class TOperationState:
"CLOSED_STATE": 4,
"ERROR_STATE": 5,
"UKNOWN_STATE": 6,
+ "PENDING_STATE": 7,
}
class TOperationType:
@@ -2431,7 +2437,7 @@ class TOpenSessionReq:
thrift_spec = (
None, # 0
- (1, TType.I32, 'client_protocol', None, 0, ), # 1
+ (1, TType.I32, 'client_protocol', None, 1, ), # 1
(2, TType.STRING, 'username', None, None, ), # 2
(3, TType.STRING, 'password', None, None, ), # 3
(4, TType.MAP, 'configuration', (TType.STRING,None,TType.STRING,None), None, ), # 4
@@ -2540,7 +2546,7 @@ class TOpenSessionResp:
thrift_spec = (
None, # 0
(1, TType.STRUCT, 'status', (TStatus, TStatus.thrift_spec), None, ), # 1
- (2, TType.I32, 'serverProtocolVersion', None, 0, ), # 2
+ (2, TType.I32, 'serverProtocolVersion', None, 1, ), # 2
(3, TType.STRUCT, 'sessionHandle', (TSessionHandle, TSessionHandle.thrift_spec), None, ), # 3
(4, TType.MAP, 'configuration', (TType.STRING,None,TType.STRING,None), None, ), # 4
)
@@ -3047,6 +3053,7 @@ class TExecuteStatementReq:
- sessionHandle
- statement
- confOverlay
+ - runAsync
"""
thrift_spec = (
@@ -3054,12 +3061,14 @@ class TExecuteStatementReq:
(1, TType.STRUCT, 'sessionHandle', (TSessionHandle, TSessionHandle.thrift_spec), None, ), # 1
(2, TType.STRING, 'statement', None, None, ), # 2
(3, TType.MAP, 'confOverlay', (TType.STRING,None,TType.STRING,None), None, ), # 3
+ (4, TType.BOOL, 'runAsync', None, False, ), # 4
)
- def __init__(self, sessionHandle=None, statement=None, confOverlay=None,):
+ def __init__(self, sessionHandle=None, statement=None, confOverlay=None, runAsync=thrift_spec[4][4],):
self.sessionHandle = sessionHandle
self.statement = statement
self.confOverlay = confOverlay
+ self.runAsync = runAsync
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -3092,6 +3101,11 @@ class TExecuteStatementReq:
iprot.readMapEnd()
else:
iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.BOOL:
+ self.runAsync = iprot.readBool();
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -3118,6 +3132,10 @@ class TExecuteStatementReq:
oprot.writeString(viter135)
oprot.writeMapEnd()
oprot.writeFieldEnd()
+ if self.runAsync is not None:
+ oprot.writeFieldBegin('runAsync', TType.BOOL, 4)
+ oprot.writeBool(self.runAsync)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
Modified: hive/trunk/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb (original)
+++ hive/trunk/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb Tue Sep 10 01:13:20 2013
@@ -8,8 +8,9 @@ require 'thrift'
module TProtocolVersion
HIVE_CLI_SERVICE_PROTOCOL_V1 = 0
- VALUE_MAP = {0 => "HIVE_CLI_SERVICE_PROTOCOL_V1"}
- VALID_VALUES = Set.new([HIVE_CLI_SERVICE_PROTOCOL_V1]).freeze
+ HIVE_CLI_SERVICE_PROTOCOL_V2 = 1
+ VALUE_MAP = {0 => "HIVE_CLI_SERVICE_PROTOCOL_V1", 1 => "HIVE_CLI_SERVICE_PROTOCOL_V2"}
+ VALID_VALUES = Set.new([HIVE_CLI_SERVICE_PROTOCOL_V1, HIVE_CLI_SERVICE_PROTOCOL_V2]).freeze
end
module TTypeId
@@ -53,8 +54,9 @@ module TOperationState
CLOSED_STATE = 4
ERROR_STATE = 5
UKNOWN_STATE = 6
- VALUE_MAP = {0 => "INITIALIZED_STATE", 1 => "RUNNING_STATE", 2 => "FINISHED_STATE", 3 => "CANCELED_STATE", 4 => "CLOSED_STATE", 5 => "ERROR_STATE", 6 => "UKNOWN_STATE"}
- VALID_VALUES = Set.new([INITIALIZED_STATE, RUNNING_STATE, FINISHED_STATE, CANCELED_STATE, CLOSED_STATE, ERROR_STATE, UKNOWN_STATE]).freeze
+ PENDING_STATE = 7
+ VALUE_MAP = {0 => "INITIALIZED_STATE", 1 => "RUNNING_STATE", 2 => "FINISHED_STATE", 3 => "CANCELED_STATE", 4 => "CLOSED_STATE", 5 => "ERROR_STATE", 6 => "UKNOWN_STATE", 7 => "PENDING_STATE"}
+ VALID_VALUES = Set.new([INITIALIZED_STATE, RUNNING_STATE, FINISHED_STATE, CANCELED_STATE, CLOSED_STATE, ERROR_STATE, UKNOWN_STATE, PENDING_STATE]).freeze
end
module TOperationType
@@ -724,7 +726,7 @@ class TOpenSessionReq
CONFIGURATION = 4
FIELDS = {
- CLIENT_PROTOCOL => {:type => ::Thrift::Types::I32, :name => 'client_protocol', :default => 0, :enum_class => ::TProtocolVersion},
+ CLIENT_PROTOCOL => {:type => ::Thrift::Types::I32, :name => 'client_protocol', :default => 1, :enum_class => ::TProtocolVersion},
USERNAME => {:type => ::Thrift::Types::STRING, :name => 'username', :optional => true},
PASSWORD => {:type => ::Thrift::Types::STRING, :name => 'password', :optional => true},
CONFIGURATION => {:type => ::Thrift::Types::MAP, :name => 'configuration', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true}
@@ -751,7 +753,7 @@ class TOpenSessionResp
FIELDS = {
STATUS => {:type => ::Thrift::Types::STRUCT, :name => 'status', :class => ::TStatus},
- SERVERPROTOCOLVERSION => {:type => ::Thrift::Types::I32, :name => 'serverProtocolVersion', :default => 0, :enum_class => ::TProtocolVersion},
+ SERVERPROTOCOLVERSION => {:type => ::Thrift::Types::I32, :name => 'serverProtocolVersion', :default => 1, :enum_class => ::TProtocolVersion},
SESSIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'sessionHandle', :class => ::TSessionHandle, :optional => true},
CONFIGURATION => {:type => ::Thrift::Types::MAP, :name => 'configuration', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true}
}
@@ -904,11 +906,13 @@ class TExecuteStatementReq
SESSIONHANDLE = 1
STATEMENT = 2
CONFOVERLAY = 3
+ RUNASYNC = 4
FIELDS = {
SESSIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'sessionHandle', :class => ::TSessionHandle},
STATEMENT => {:type => ::Thrift::Types::STRING, :name => 'statement'},
- CONFOVERLAY => {:type => ::Thrift::Types::MAP, :name => 'confOverlay', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true}
+ CONFOVERLAY => {:type => ::Thrift::Types::MAP, :name => 'confOverlay', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true},
+ RUNASYNC => {:type => ::Thrift::Types::BOOL, :name => 'runAsync', :default => false, :optional => true}
}
def struct_fields; FIELDS; end
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java Tue Sep 10 01:13:20 2013
@@ -140,10 +140,12 @@ public class CLIService extends Composit
}
/* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map)
+ * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle,
+ * java.lang.String, java.util.Map)
*/
@Override
- public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map<String, String> confOverlay)
+ public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay)
throws HiveSQLException {
OperationHandle opHandle = sessionManager.getSession(sessionHandle)
.executeStatement(statement, confOverlay);
@@ -152,6 +154,20 @@ public class CLIService extends Composit
}
/* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle,
+ * java.lang.String, java.util.Map)
+ */
+ @Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay) throws HiveSQLException {
+ OperationHandle opHandle = sessionManager.getSession(sessionHandle)
+ .executeStatementAsync(statement, confOverlay);
+ LOG.info(sessionHandle + ": executeStatementAsync()");
+ return opHandle;
+ }
+
+
+ /* (non-Javadoc)
* @see org.apache.hive.service.cli.ICLIService#getTypeInfo(org.apache.hive.service.cli.SessionHandle)
*/
@Override
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java Tue Sep 10 01:13:20 2013
@@ -56,13 +56,22 @@ public abstract class CLIServiceClient i
throws HiveSQLException;
/* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map)
+ * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle,
+ * java.lang.String, java.util.Map)
*/
@Override
public abstract OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay) throws HiveSQLException;
/* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle,
+ * java.lang.String, java.util.Map)
+ */
+ @Override
+ public abstract OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay) throws HiveSQLException;
+
+ /* (non-Javadoc)
* @see org.apache.hive.service.cli.ICLIService#getTypeInfo(org.apache.hive.service.cli.SessionHandle)
*/
@Override
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java Tue Sep 10 01:13:20 2013
@@ -66,7 +66,8 @@ public class EmbeddedCLIServiceClient ex
}
/* (non-Javadoc)
- * @see org.apache.hive.service.cli.CLIServiceClient#executeStatement(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map)
+ * @see org.apache.hive.service.cli.CLIServiceClient#executeStatement(org.apache.hive.service.cli.SessionHandle,
+ * java.lang.String, java.util.Map)
*/
@Override
public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
@@ -75,6 +76,17 @@ public class EmbeddedCLIServiceClient ex
}
/* (non-Javadoc)
+ * @see org.apache.hive.service.cli.CLIServiceClient#executeStatementAsync(org.apache.hive.service.cli.SessionHandle,
+ * java.lang.String, java.util.Map)
+ */
+ @Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay) throws HiveSQLException {
+ return cliService.executeStatementAsync(sessionHandle, statement, confOverlay);
+ }
+
+
+ /* (non-Javadoc)
* @see org.apache.hive.service.cli.CLIServiceClient#getTypeInfo(org.apache.hive.service.cli.SessionHandle)
*/
@Override
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/ICLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/ICLIService.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/ICLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/ICLIService.java Tue Sep 10 01:13:20 2013
@@ -43,6 +43,10 @@ public interface ICLIService {
Map<String, String> confOverlay)
throws HiveSQLException;
+ public abstract OperationHandle executeStatementAsync(SessionHandle sessionHandle,
+ String statement, Map<String, String> confOverlay)
+ throws HiveSQLException;
+
public abstract OperationHandle getTypeInfo(SessionHandle sessionHandle)
throws HiveSQLException;
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java Tue Sep 10 01:13:20 2013
@@ -31,7 +31,8 @@ public enum OperationState {
CANCELED(TOperationState.CANCELED_STATE),
CLOSED(TOperationState.CLOSED_STATE),
ERROR(TOperationState.ERROR_STATE),
- UNKNOWN(TOperationState.UKNOWN_STATE);
+ UNKNOWN(TOperationState.UKNOWN_STATE),
+ PENDING(TOperationState.PENDING_STATE);
private final TOperationState tOperationState;
@@ -55,11 +56,22 @@ public enum OperationState {
switch (oldState) {
case INITIALIZED:
switch (newState) {
+ case PENDING:
case RUNNING:
case CLOSED:
return;
}
break;
+ case PENDING:
+ switch (newState) {
+ case RUNNING:
+ case FINISHED:
+ case CANCELED:
+ case ERROR:
+ case CLOSED:
+ return;
+ }
+ break;
case RUNNING:
switch (newState) {
case FINISHED:
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java Tue Sep 10 01:13:20 2013
@@ -40,7 +40,7 @@ public abstract class ExecuteStatementOp
}
public static ExecuteStatementOperation newExecuteStatementOperation(
- HiveSession parentSession, String statement, Map<String, String> confOverlay) {
+ HiveSession parentSession, String statement, Map<String, String> confOverlay, boolean runAsync) {
String[] tokens = statement.trim().split("\\s+");
String command = tokens[0].toLowerCase();
@@ -53,7 +53,7 @@ public abstract class ExecuteStatementOp
} else if ("delete".equals(command)) {
return new DeleteResourceOperation(parentSession, statement, confOverlay);
} else {
- return new SQLOperation(parentSession, statement, confOverlay);
+ return new SQLOperation(parentSession, statement, confOverlay, runAsync);
}
}
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Tue Sep 10 01:13:20 2013
@@ -66,9 +66,9 @@ public class OperationManager extends Ab
}
public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
- String statement, Map<String, String> confOverlay) {
+ String statement, Map<String, String> confOverlay, boolean runAsync) {
ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
- .newExecuteStatementOperation(parentSession, statement, confOverlay);
+ .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync);
addOperation(executeStatementOperation);
return executeStatementOperation;
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Tue Sep 10 01:13:20 2013
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -64,19 +66,21 @@ public class SQLOperation extends Execut
private TableSchema resultSchema = null;
private Schema mResultSchema = null;
private SerDe serde = null;
+ private final boolean runAsync;
+ private Future<?> backgroundHandle;
-
- public SQLOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay) {
+ public SQLOperation(HiveSession parentSession, String statement, Map<String,
+ String> confOverlay, boolean runInBackground) {
// TODO: call setRemoteUser in ExecuteStatementOperation or higher.
super(parentSession, statement, confOverlay);
+ this.runAsync = runInBackground;
}
public void prepare() throws HiveSQLException {
}
- @Override
- public void run() throws HiveSQLException {
+ private void runInternal() throws HiveSQLException {
setState(OperationState.RUNNING);
String statement_trimmed = statement.trim();
String[] tokens = statement_trimmed.split("\\s");
@@ -135,31 +139,62 @@ public class SQLOperation extends Execut
}
@Override
- public void cancel() throws HiveSQLException {
- setState(OperationState.CANCELED);
+ public void run() throws HiveSQLException {
+ setState(OperationState.PENDING);
+ if (!shouldRunAsync()) {
+ runInternal();
+ } else {
+ Runnable backgroundOperation = new Runnable() {
+ SessionState ss = SessionState.get();
+ @Override
+ public void run() {
+ SessionState.start(ss);
+ try {
+ runInternal();
+ } catch (HiveSQLException e) {
+ LOG.error("Error: ", e);
+ // TODO: Return a more detailed error to the client,
+ // currently the async thread only writes to the log and sets the OperationState
+ }
+ }
+ };
+ try {
+ // This submit blocks if no background threads are available to run this operation
+ backgroundHandle =
+ getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
+ } catch (RejectedExecutionException rejected) {
+ setState(OperationState.ERROR);
+ throw new HiveSQLException(rejected);
+ }
+ }
+ }
+
+ private void cleanup(OperationState state) throws HiveSQLException {
+ setState(state);
+ if (shouldRunAsync()) {
+ if (backgroundHandle != null) {
+ backgroundHandle.cancel(true);
+ }
+ }
if (driver != null) {
driver.close();
driver.destroy();
}
- SessionState session = SessionState.get();
- if (session.getTmpOutputFile() != null) {
- session.getTmpOutputFile().delete();
+ SessionState ss = SessionState.get();
+ if (ss.getTmpOutputFile() != null) {
+ ss.getTmpOutputFile().delete();
}
}
@Override
- public void close() throws HiveSQLException {
- setState(OperationState.CLOSED);
- if (driver != null) {
- driver.close();
- driver.destroy();
- }
+ public void cancel() throws HiveSQLException {
+ cleanup(OperationState.CANCELED);
+ }
- SessionState session = SessionState.get();
- if (session.getTmpOutputFile() != null) {
- session.getTmpOutputFile().delete();
- }
+ @Override
+ public void close() throws HiveSQLException {
+ cleanup(OperationState.CLOSED);
}
@Override
@@ -278,4 +313,8 @@ public class SQLOperation extends Execut
return serde;
}
+ private boolean shouldRunAsync() {
+ return runAsync;
+ }
+
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java Tue Sep 10 01:13:20 2013
@@ -42,6 +42,11 @@ public interface HiveSession {
public void setSessionManager(SessionManager sessionManager);
/**
+ * Get the session manager for the session
+ */
+ public SessionManager getSessionManager();
+
+ /**
* Set operation manager for the session
* @param operationManager
*/
@@ -76,6 +81,16 @@ public interface HiveSession {
Map<String, String> confOverlay) throws HiveSQLException;
/**
+ * execute operation handler
+ * @param statement
+ * @param confOverlay
+ * @return
+ * @throws HiveSQLException
+ */
+ public OperationHandle executeStatementAsync(String statement,
+ Map<String, String> confOverlay) throws HiveSQLException;
+
+ /**
* getTypeInfo operation handler
* @return
* @throws HiveSQLException
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Sep 10 01:13:20 2013
@@ -18,7 +18,6 @@
package org.apache.hive.service.cli.session;
-import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
@@ -26,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -93,7 +91,7 @@ public class HiveSessionImpl implements
sessionState = new SessionState(hiveConf);
}
- private SessionManager getSessionManager() {
+ public SessionManager getSessionManager() {
return sessionManager;
}
@@ -174,10 +172,21 @@ public class HiveSessionImpl implements
public OperationHandle executeStatement(String statement, Map<String, String> confOverlay)
throws HiveSQLException {
+ return executeStatementInternal(statement, confOverlay, false);
+ }
+
+ public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay)
+ throws HiveSQLException {
+ return executeStatementInternal(statement, confOverlay, true);
+ }
+
+ private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
+ boolean runAsync)
+ throws HiveSQLException {
acquire();
try {
ExecuteStatementOperation operation = getOperationManager()
- .newExecuteStatementOperation(getSession(), statement, confOverlay);
+ .newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync);
operation.run();
OperationHandle opHandle = operation.getHandle();
opHandleSet.add(opHandle);
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Tue Sep 10 01:13:20 2013
@@ -21,9 +21,16 @@ package org.apache.hive.service.cli.sess
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.hooks.HookUtils;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.SessionHandle;
@@ -34,11 +41,12 @@ import org.apache.hive.service.cli.opera
*
*/
public class SessionManager extends CompositeService {
-
+ private static final Log LOG = LogFactory.getLog(CompositeService.class);
private HiveConf hiveConf;
private final Map<SessionHandle, HiveSession> handleToSession = new HashMap<SessionHandle, HiveSession>();
private OperationManager operationManager = new OperationManager();
private static final Object sessionMapLock = new Object();
+ private ExecutorService backgroundOperationPool;
public SessionManager() {
super("SessionManager");
@@ -47,10 +55,11 @@ public class SessionManager extends Comp
@Override
public synchronized void init(HiveConf hiveConf) {
this.hiveConf = hiveConf;
-
operationManager = new OperationManager();
+ int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
+ LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize);
+ backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize);
addService(operationManager);
-
super.init(hiveConf);
}
@@ -64,6 +73,16 @@ public class SessionManager extends Comp
public synchronized void stop() {
// TODO
super.stop();
+ if (backgroundOperationPool != null) {
+ backgroundOperationPool.shutdown();
+ long timeout = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
+ try {
+ backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
+ } catch (InterruptedException exc) {
+ LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
+ " seconds has been exceeded. RUNNING background operations will be shut down", exc);
+ }
+ }
}
@@ -164,4 +183,9 @@ public class SessionManager extends Comp
sessionHook.run(new HiveSessionHookContextImpl(session));
}
}
+
+ public Future<?> submitBackgroundOperation(Runnable r) {
+ return backgroundOperationPool.submit(r);
+ }
+
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Tue Sep 10 01:13:20 2013
@@ -200,8 +200,10 @@ public class ThriftCLIService extends Ab
SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
String statement = req.getStatement();
Map<String, String> confOverlay = req.getConfOverlay();
- OperationHandle operationHandle =
- cliService.executeStatement(sessionHandle, statement, confOverlay);
+ Boolean runAsync = req.isRunAsync();
+ OperationHandle operationHandle = runAsync ?
+ cliService.executeStatementAsync(sessionHandle, statement, confOverlay)
+ : cliService.executeStatement(sessionHandle, statement, confOverlay);
resp.setOperationHandle(operationHandle.toTOperationHandle());
resp.setStatus(OK_STATUS);
} catch (Exception e) {
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java Tue Sep 10 01:13:20 2013
@@ -122,9 +122,27 @@ public class ThriftCLIServiceClient exte
public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay)
throws HiveSQLException {
+ return executeStatementInternal(sessionHandle, statement, confOverlay, false);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map)
+ */
+ @Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay)
+ throws HiveSQLException {
+ return executeStatementInternal(sessionHandle, statement, confOverlay, true);
+ }
+
+ private OperationHandle executeStatementInternal(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, boolean isAsync)
+ throws HiveSQLException {
try {
- TExecuteStatementReq req = new TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement);
+ TExecuteStatementReq req =
+ new TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement);
req.setConfOverlay(confOverlay);
+ req.setRunAsync(isAsync);
TExecuteStatementResp resp = cliService.ExecuteStatement(req);
checkStatus(resp.getStatus());
return new OperationHandle(resp.getOperationHandle());
Modified: hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1521326&r1=1521325&r2=1521326&view=diff
==============================================================================
--- hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original)
+++ hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Tue Sep 10 01:13:20 2013
@@ -113,4 +113,105 @@ public abstract class CLIServiceTest {
client.closeSession(sessionHandle);
}
+
+ @Test
+ public void testExecuteStatement() throws Exception {
+ HashMap<String, String> confOverlay = new HashMap<String, String>();
+ SessionHandle sessionHandle = client.openSession("tom", "password",
+ new HashMap<String, String>());
+ assertNotNull(sessionHandle);
+
+ // Change lock manager, otherwise unit-test doesn't go through
+ String setLockMgr = "SET hive.lock.manager=" +
+ "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager";
+ client.executeStatement(sessionHandle, setLockMgr, confOverlay);
+
+ String createTable = "CREATE TABLE TEST_EXEC(ID STRING)";
+ client.executeStatement(sessionHandle, createTable, confOverlay);
+
+ // blocking execute
+ String select = "SELECT ID FROM TEST_EXEC";
+ OperationHandle ophandle = client.executeStatement(sessionHandle, select, confOverlay);
+
+ // expect query to be completed now
+ assertEquals("Query should be finished",
+ OperationState.FINISHED, client.getOperationStatus(ophandle));
+ }
+
+ @Test
+ public void testExecuteStatementAsync() throws Exception {
+ HashMap<String, String> confOverlay = new HashMap<String, String>();
+ SessionHandle sessionHandle = client.openSession("tom", "password",
+ new HashMap<String, String>());
+ // Timeout for the poll in case of asynchronous execute
+ long pollTimeout = System.currentTimeMillis() + 100000;
+ assertNotNull(sessionHandle);
+ OperationState state = null;
+ OperationHandle ophandle;
+
+ // Change lock manager, otherwise unit-test doesn't go through
+ String setLockMgr = "SET hive.lock.manager=" +
+ "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager";
+ client.executeStatement(sessionHandle, setLockMgr, confOverlay);
+
+ String createTable = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)";
+ client.executeStatementAsync(sessionHandle, createTable, confOverlay);
+
+ // Test async execution response when query is malformed
+ String wrongQuery = "SELECT NAME FROM TEST_EXEC";
+ ophandle = client.executeStatementAsync(sessionHandle, wrongQuery, confOverlay);
+
+ int count = 0;
+ while (true) {
+ // Break if polling times out
+ if (System.currentTimeMillis() > pollTimeout) {
+ System.out.println("Polling timed out");
+ break;
+ }
+ state = client.getOperationStatus(ophandle);
+ System.out.println("Polling: " + ophandle + " count=" + (++count)
+ + " state=" + state);
+
+ if (OperationState.CANCELED == state || state == OperationState.CLOSED
+ || state == OperationState.FINISHED || state == OperationState.ERROR) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ assertEquals("Query should return an error state",
+ OperationState.ERROR, client.getOperationStatus(ophandle));
+
+ // Test async execution when query is well formed
+ String select = "SELECT ID FROM TEST_EXEC_ASYNC";
+ ophandle =
+ client.executeStatementAsync(sessionHandle, select, confOverlay);
+
+ count = 0;
+ while (true) {
+ // Break if polling times out
+ if (System.currentTimeMillis() > pollTimeout) {
+ System.out.println("Polling timed out");
+ break;
+ }
+ state = client.getOperationStatus(ophandle);
+ System.out.println("Polling: " + ophandle + " count=" + (++count)
+ + " state=" + state);
+
+ if (OperationState.CANCELED == state || state == OperationState.CLOSED
+ || state == OperationState.FINISHED || state == OperationState.ERROR) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ assertEquals("Query should be finished",
+ OperationState.FINISHED, client.getOperationStatus(ophandle));
+
+ // Cancellation test
+ ophandle = client.executeStatementAsync(sessionHandle, select, confOverlay);
+ System.out.println("cancelling " + ophandle);
+ client.cancelOperation(ophandle);
+ state = client.getOperationStatus(ophandle);
+ System.out.println(ophandle + " after cancelling, state= " + state);
+ assertEquals("Query should be cancelled", OperationState.CANCELED, state);
+ }
}