You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/03 22:31:57 UTC
[44/45] hive git commit: HIVE-4924: JDBC: Support query timeout for
jdbc (Vaibhav Gumashta reviewed by Thejas Nair)
HIVE-4924: JDBC: Support query timeout for jdbc (Vaibhav Gumashta reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b6218275
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b6218275
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b6218275
Branch: refs/heads/llap
Commit: b6218275b00b64aed7efaf470784cc0441464f67
Parents: 0a5bc94
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Tue May 3 12:49:22 2016 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Tue May 3 12:49:22 2016 -0700
----------------------------------------------------------------------
.../org/apache/hive/jdbc/TestJdbcDriver2.java | 43 +++++++-
.../cli/session/TestHiveSessionImpl.java | 2 +-
.../org/apache/hive/jdbc/HiveStatement.java | 20 ++--
.../java/org/apache/hadoop/hive/ql/Driver.java | 27 +++--
.../hadoop/hive/ql/history/HiveHistoryImpl.java | 8 +-
.../hadoop/hive/ql/session/OperationLog.java | 8 +-
service-rpc/if/TCLIService.thrift | 6 +
.../gen/thrift/gen-cpp/TCLIService_types.cpp | 30 ++++-
.../src/gen/thrift/gen-cpp/TCLIService_types.h | 15 ++-
.../rpc/thrift/TExecuteStatementReq.java | 109 ++++++++++++++++++-
.../service/rpc/thrift/TOperationState.java | 5 +-
service-rpc/src/gen/thrift/gen-php/Types.php | 25 +++++
.../src/gen/thrift/gen-py/TCLIService/ttypes.py | 18 ++-
.../gen/thrift/gen-rb/t_c_l_i_service_types.rb | 9 +-
.../org/apache/hive/service/cli/CLIService.java | 46 ++++++--
.../service/cli/EmbeddedCLIServiceClient.java | 19 ++--
.../apache/hive/service/cli/ICLIService.java | 16 +--
.../apache/hive/service/cli/OperationState.java | 7 +-
.../operation/ExecuteStatementOperation.java | 9 +-
.../cli/operation/HiveCommandOperation.java | 5 +
.../cli/operation/MetadataOperation.java | 7 +-
.../hive/service/cli/operation/Operation.java | 17 +--
.../service/cli/operation/OperationManager.java | 27 +++--
.../service/cli/operation/SQLOperation.java | 106 +++++++++++++-----
.../hive/service/cli/session/HiveSession.java | 28 ++++-
.../service/cli/session/HiveSessionImpl.java | 38 ++++---
.../thrift/RetryingThriftCLIServiceClient.java | 22 +++-
.../service/cli/thrift/ThriftCLIService.java | 16 +--
.../cli/thrift/ThriftCLIServiceClient.java | 32 +++---
.../cli/thrift/ThriftCLIServiceTest.java | 6 +-
.../thrift/ThriftCliServiceTestWithCookie.java | 3 +-
31 files changed, 557 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index 73bc620..7243648 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -55,6 +55,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.sql.Timestamp;
@@ -2384,7 +2385,7 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException,
try {
System.out.println("Executing query: ");
stmt.executeQuery("select sleepUDF(t1.under_col) as u0, t1.under_col as u1, " +
- "t2.under_col as u2 from " + tableName + "t1 join " + tableName +
+ "t2.under_col as u2 from " + tableName + " t1 join " + tableName +
" t2 on t1.under_col = t2.under_col");
fail("Expecting SQLException");
} catch (SQLException e) {
@@ -2399,7 +2400,7 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException,
@Override
public void run() {
try {
- Thread.sleep(1000);
+ Thread.sleep(10000);
System.out.println("Cancelling query: ");
stmt.cancel();
} catch (Exception e) {
@@ -2414,6 +2415,44 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException,
stmt.close();
}
+ @Test
+ public void testQueryTimeout() throws Exception {
+ String udfName = SleepUDF.class.getName();
+ Statement stmt1 = con.createStatement();
+ stmt1.execute("create temporary function sleepUDF as '" + udfName + "'");
+ stmt1.close();
+ Statement stmt = con.createStatement();
+ // Test a query where timeout kicks in
+ // Set query timeout to 15 seconds
+ stmt.setQueryTimeout(15);
+ System.err.println("Executing query: ");
+ try {
+ // Sleep UDF sleeps for 100ms for each select call
+ // The test table has 500 rows, so that should be sufficient time
+ stmt.executeQuery("select sleepUDF(t1.under_col) as u0, t1.under_col as u1, "
+ + "t2.under_col as u2 from " + tableName + " t1 join " + tableName
+ + " t2 on t1.under_col = t2.under_col");
+ fail("Expecting SQLTimeoutException");
+ } catch (SQLTimeoutException e) {
+ assertNotNull(e);
+ System.err.println(e.toString());
+ } catch (SQLException e) {
+ fail("Expecting SQLTimeoutException, but got SQLException: " + e);
+ e.printStackTrace();
+ }
+
+ // Test a query where timeout does not kick in. Set it to 25s
+ stmt.setQueryTimeout(25);
+ try {
+ stmt.executeQuery("show tables");
+ } catch (SQLException e) {
+ fail("Unexpected SQLException: " + e);
+ e.printStackTrace();
+ }
+
+ stmt.close();
+ }
+
/**
* Test the non-null value of the Yarn ATS GUID.
* We spawn 2 threads - one running the query and
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
index 4d763d2..c9e6a13 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
@@ -70,7 +70,7 @@ public class TestHiveSessionImpl {
Map<String, String> confOverlay = new HashMap<String, String>();
String hql = "drop table if exists table_not_exists";
Mockito.when(operationManager.newExecuteStatementOperation(same(session), eq(hql),
- (Map<String, String>)Mockito.any(), eq(true))).thenReturn(operation);
+ (Map<String, String>)Mockito.any(), eq(true), eq(0))).thenReturn(operation);
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index 3cc6b74..38ccc78 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -43,6 +43,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTimeoutException;
import java.sql.SQLWarning;
import java.util.ArrayList;
import java.util.HashMap;
@@ -111,6 +112,8 @@ public class HiveStatement implements java.sql.Statement {
*/
private boolean isExecuteStatementFailed = false;
+ private int queryTimeout = 0;
+
public HiveStatement(HiveConnection connection, TCLIService.Iface client,
TSessionHandle sessHandle) {
this(connection, client, sessHandle, false, DEFAULT_FETCH_SIZE);
@@ -300,7 +303,7 @@ public class HiveStatement implements java.sql.Statement {
*/
execReq.setRunAsync(true);
execReq.setConfOverlay(sessConf);
-
+ execReq.setQueryTimeout(queryTimeout);
try {
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
Utils.verifySuccessWithInfo(execResp.getStatus());
@@ -323,8 +326,8 @@ public class HiveStatement implements java.sql.Statement {
while (!isOperationComplete) {
try {
/**
- * For an async SQLOperation, GetOperationStatus will use the long polling approach
- * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
+ * For an async SQLOperation, GetOperationStatus will use the long polling approach It will
+ * essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
*/
statusResp = client.GetOperationStatus(statusReq);
Utils.verifySuccessWithInfo(statusResp.getStatus());
@@ -338,10 +341,12 @@ public class HiveStatement implements java.sql.Statement {
case CANCELED_STATE:
// 01000 -> warning
throw new SQLException("Query was cancelled", "01000");
+ case TIMEDOUT_STATE:
+ throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds");
case ERROR_STATE:
// Get the error details from the underlying exception
- throw new SQLException(statusResp.getErrorMessage(),
- statusResp.getSqlState(), statusResp.getErrorCode());
+ throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(),
+ statusResp.getErrorCode());
case UKNOWN_STATE:
throw new SQLException("Unknown query", "HY000");
case INITIALIZED_STATE:
@@ -787,10 +792,7 @@ public class HiveStatement implements java.sql.Statement {
@Override
public void setQueryTimeout(int seconds) throws SQLException {
- // 0 is supported which means "no limit"
- if (seconds != 0) {
- throw new SQLException("Query timeout seconds must be 0");
- }
+ this.queryTimeout = seconds;
}
/*
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 32d2cb2..6a610cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1725,20 +1725,31 @@ public class Driver implements CommandProcessor {
}
LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
}
- plan.setDone();
- if (SessionState.get() != null) {
- try {
- SessionState.get().getHiveHistory().logPlanProgress(plan);
- } catch (Exception e) {
- // ignore
- }
+ releasePlan(plan);
+
+ if (console != null) {
+ console.printInfo("OK");
}
- console.printInfo("OK");
return (0);
}
+ private synchronized void releasePlan(QueryPlan plan) {
+ // Plan maybe null if Driver.close is called in another thread for the same Driver object
+ if (plan != null) {
+ plan.setDone();
+ if (SessionState.get() != null) {
+ try {
+ SessionState.get().getHiveHistory().logPlanProgress(plan);
+ } catch (Exception e) {
+ // Log and ignore
+ LOG.warn("Could not log query plan progress", e);
+ }
+ }
+ }
+ }
+
private void setQueryDisplays(List<Task<? extends Serializable>> tasks) {
if (tasks != null) {
for (Task<? extends Serializable> task : tasks) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
index 0234fd9..6582cdd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
@@ -315,9 +315,11 @@ public class HiveHistoryImpl implements HiveHistory{
@Override
public void logPlanProgress(QueryPlan plan) throws IOException {
- Map<String,String> ctrmap = ctrMapFactory.get();
- ctrmap.put("plan", plan.toString());
- log(RecordTypes.Counters, ctrmap);
+ if (plan != null) {
+ Map<String,String> ctrmap = ctrMapFactory.get();
+ ctrmap.put("plan", plan.toString());
+ log(RecordTypes.Counters, ctrmap);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java
index 6d0f14a..18216f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java
@@ -166,7 +166,7 @@ public class OperationLog {
return readResults(maxRows);
}
- void remove() {
+ synchronized void remove() {
try {
if (in != null) {
in.close();
@@ -174,8 +174,10 @@ public class OperationLog {
if (out != null) {
out.close();
}
- FileUtils.forceDelete(file);
- isRemoved = true;
+ if (!isRemoved) {
+ FileUtils.forceDelete(file);
+ isRemoved = true;
+ }
} catch (Exception e) {
LOG.error("Failed to remove corresponding log file of operation: " + operationName, e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service-rpc/if/TCLIService.thrift
----------------------------------------------------------------------
diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift
index 92bcf77..9879b1b 100644
--- a/service-rpc/if/TCLIService.thrift
+++ b/service-rpc/if/TCLIService.thrift
@@ -458,6 +458,9 @@ enum TOperationState {
// The operation is in an pending state
PENDING_STATE,
+
+ // The operation is in an timedout state
+ TIMEDOUT_STATE,
}
// A string identifier. This is interpreted literally.
@@ -697,6 +700,9 @@ struct TExecuteStatementReq {
// Execute asynchronously when runAsync is true
4: optional bool runAsync = false
+
+ // The number of seconds after which the query will timeout on the server
+ 5: optional i64 queryTimeout = 0
}
struct TExecuteStatementResp {
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
index 66f5e8c..5229230 100644
--- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
+++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
@@ -109,7 +109,8 @@ int _kTOperationStateValues[] = {
TOperationState::CLOSED_STATE,
TOperationState::ERROR_STATE,
TOperationState::UKNOWN_STATE,
- TOperationState::PENDING_STATE
+ TOperationState::PENDING_STATE,
+ TOperationState::TIMEDOUT_STATE
};
const char* _kTOperationStateNames[] = {
"INITIALIZED_STATE",
@@ -119,9 +120,10 @@ const char* _kTOperationStateNames[] = {
"CLOSED_STATE",
"ERROR_STATE",
"UKNOWN_STATE",
- "PENDING_STATE"
+ "PENDING_STATE",
+ "TIMEDOUT_STATE"
};
-const std::map<int, const char*> _TOperationState_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(8, _kTOperationStateValues, _kTOperationStateNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _TOperationState_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(9, _kTOperationStateValues, _kTOperationStateNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
int _kTOperationTypeValues[] = {
TOperationType::EXECUTE_STATEMENT,
@@ -5575,6 +5577,11 @@ void TExecuteStatementReq::__set_runAsync(const bool val) {
__isset.runAsync = true;
}
+void TExecuteStatementReq::__set_queryTimeout(const int64_t val) {
+ this->queryTimeout = val;
+__isset.queryTimeout = true;
+}
+
uint32_t TExecuteStatementReq::read(::apache::thrift::protocol::TProtocol* iprot) {
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -5645,6 +5652,14 @@ uint32_t TExecuteStatementReq::read(::apache::thrift::protocol::TProtocol* iprot
xfer += iprot->skip(ftype);
}
break;
+ case 5:
+ if (ftype == ::apache::thrift::protocol::T_I64) {
+ xfer += iprot->readI64(this->queryTimeout);
+ this->__isset.queryTimeout = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -5693,6 +5708,11 @@ uint32_t TExecuteStatementReq::write(::apache::thrift::protocol::TProtocol* opro
xfer += oprot->writeBool(this->runAsync);
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.queryTimeout) {
+ xfer += oprot->writeFieldBegin("queryTimeout", ::apache::thrift::protocol::T_I64, 5);
+ xfer += oprot->writeI64(this->queryTimeout);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -5704,6 +5724,7 @@ void swap(TExecuteStatementReq &a, TExecuteStatementReq &b) {
swap(a.statement, b.statement);
swap(a.confOverlay, b.confOverlay);
swap(a.runAsync, b.runAsync);
+ swap(a.queryTimeout, b.queryTimeout);
swap(a.__isset, b.__isset);
}
@@ -5712,6 +5733,7 @@ TExecuteStatementReq::TExecuteStatementReq(const TExecuteStatementReq& other222)
statement = other222.statement;
confOverlay = other222.confOverlay;
runAsync = other222.runAsync;
+ queryTimeout = other222.queryTimeout;
__isset = other222.__isset;
}
TExecuteStatementReq& TExecuteStatementReq::operator=(const TExecuteStatementReq& other223) {
@@ -5719,6 +5741,7 @@ TExecuteStatementReq& TExecuteStatementReq::operator=(const TExecuteStatementReq
statement = other223.statement;
confOverlay = other223.confOverlay;
runAsync = other223.runAsync;
+ queryTimeout = other223.queryTimeout;
__isset = other223.__isset;
return *this;
}
@@ -5729,6 +5752,7 @@ void TExecuteStatementReq::printTo(std::ostream& out) const {
out << ", " << "statement=" << to_string(statement);
out << ", " << "confOverlay="; (__isset.confOverlay ? (out << to_string(confOverlay)) : (out << "<null>"));
out << ", " << "runAsync="; (__isset.runAsync ? (out << to_string(runAsync)) : (out << "<null>"));
+ out << ", " << "queryTimeout="; (__isset.queryTimeout ? (out << to_string(queryTimeout)) : (out << "<null>"));
out << ")";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
index 9f937ca..838bf17 100644
--- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
+++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
@@ -84,7 +84,8 @@ struct TOperationState {
CLOSED_STATE = 4,
ERROR_STATE = 5,
UKNOWN_STATE = 6,
- PENDING_STATE = 7
+ PENDING_STATE = 7,
+ TIMEDOUT_STATE = 8
};
};
@@ -2501,9 +2502,10 @@ inline std::ostream& operator<<(std::ostream& out, const TGetInfoResp& obj)
}
typedef struct _TExecuteStatementReq__isset {
- _TExecuteStatementReq__isset() : confOverlay(false), runAsync(true) {}
+ _TExecuteStatementReq__isset() : confOverlay(false), runAsync(true), queryTimeout(true) {}
bool confOverlay :1;
bool runAsync :1;
+ bool queryTimeout :1;
} _TExecuteStatementReq__isset;
class TExecuteStatementReq {
@@ -2511,7 +2513,7 @@ class TExecuteStatementReq {
TExecuteStatementReq(const TExecuteStatementReq&);
TExecuteStatementReq& operator=(const TExecuteStatementReq&);
- TExecuteStatementReq() : statement(), runAsync(false) {
+ TExecuteStatementReq() : statement(), runAsync(false), queryTimeout(0LL) {
}
virtual ~TExecuteStatementReq() throw();
@@ -2519,6 +2521,7 @@ class TExecuteStatementReq {
std::string statement;
std::map<std::string, std::string> confOverlay;
bool runAsync;
+ int64_t queryTimeout;
_TExecuteStatementReq__isset __isset;
@@ -2530,6 +2533,8 @@ class TExecuteStatementReq {
void __set_runAsync(const bool val);
+ void __set_queryTimeout(const int64_t val);
+
bool operator == (const TExecuteStatementReq & rhs) const
{
if (!(sessionHandle == rhs.sessionHandle))
@@ -2544,6 +2549,10 @@ class TExecuteStatementReq {
return false;
else if (__isset.runAsync && !(runAsync == rhs.runAsync))
return false;
+ if (__isset.queryTimeout != rhs.__isset.queryTimeout)
+ return false;
+ else if (__isset.queryTimeout && !(queryTimeout == rhs.queryTimeout))
+ return false;
return true;
}
bool operator != (const TExecuteStatementReq &rhs) const {
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TExecuteStatementReq.java
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TExecuteStatementReq.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TExecuteStatementReq.java
index 2eb4d09..1f73cec 100644
--- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TExecuteStatementReq.java
+++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TExecuteStatementReq.java
@@ -42,6 +42,7 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
private static final org.apache.thrift.protocol.TField STATEMENT_FIELD_DESC = new org.apache.thrift.protocol.TField("statement", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField CONF_OVERLAY_FIELD_DESC = new org.apache.thrift.protocol.TField("confOverlay", org.apache.thrift.protocol.TType.MAP, (short)3);
private static final org.apache.thrift.protocol.TField RUN_ASYNC_FIELD_DESC = new org.apache.thrift.protocol.TField("runAsync", org.apache.thrift.protocol.TType.BOOL, (short)4);
+ private static final org.apache.thrift.protocol.TField QUERY_TIMEOUT_FIELD_DESC = new org.apache.thrift.protocol.TField("queryTimeout", org.apache.thrift.protocol.TType.I64, (short)5);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -53,13 +54,15 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
private String statement; // required
private Map<String,String> confOverlay; // optional
private boolean runAsync; // optional
+ private long queryTimeout; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
SESSION_HANDLE((short)1, "sessionHandle"),
STATEMENT((short)2, "statement"),
CONF_OVERLAY((short)3, "confOverlay"),
- RUN_ASYNC((short)4, "runAsync");
+ RUN_ASYNC((short)4, "runAsync"),
+ QUERY_TIMEOUT((short)5, "queryTimeout");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -82,6 +85,8 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
return CONF_OVERLAY;
case 4: // RUN_ASYNC
return RUN_ASYNC;
+ case 5: // QUERY_TIMEOUT
+ return QUERY_TIMEOUT;
default:
return null;
}
@@ -123,8 +128,9 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
// isset id assignments
private static final int __RUNASYNC_ISSET_ID = 0;
+ private static final int __QUERYTIMEOUT_ISSET_ID = 1;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.CONF_OVERLAY,_Fields.RUN_ASYNC};
+ private static final _Fields optionals[] = {_Fields.CONF_OVERLAY,_Fields.RUN_ASYNC,_Fields.QUERY_TIMEOUT};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -138,6 +144,8 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
tmpMap.put(_Fields.RUN_ASYNC, new org.apache.thrift.meta_data.FieldMetaData("runAsync", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+ tmpMap.put(_Fields.QUERY_TIMEOUT, new org.apache.thrift.meta_data.FieldMetaData("queryTimeout", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TExecuteStatementReq.class, metaDataMap);
}
@@ -145,6 +153,8 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
public TExecuteStatementReq() {
this.runAsync = false;
+ this.queryTimeout = 0L;
+
}
public TExecuteStatementReq(
@@ -172,6 +182,7 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
this.confOverlay = __this__confOverlay;
}
this.runAsync = other.runAsync;
+ this.queryTimeout = other.queryTimeout;
}
public TExecuteStatementReq deepCopy() {
@@ -185,6 +196,8 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
this.confOverlay = null;
this.runAsync = false;
+ this.queryTimeout = 0L;
+
}
public TSessionHandle getSessionHandle() {
@@ -289,6 +302,28 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __RUNASYNC_ISSET_ID, value);
}
+ public long getQueryTimeout() {
+ return this.queryTimeout;
+ }
+
+ public void setQueryTimeout(long queryTimeout) {
+ this.queryTimeout = queryTimeout;
+ setQueryTimeoutIsSet(true);
+ }
+
+ public void unsetQueryTimeout() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __QUERYTIMEOUT_ISSET_ID);
+ }
+
+ /** Returns true if field queryTimeout is set (has been assigned a value) and false otherwise */
+ public boolean isSetQueryTimeout() {
+ return EncodingUtils.testBit(__isset_bitfield, __QUERYTIMEOUT_ISSET_ID);
+ }
+
+ public void setQueryTimeoutIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __QUERYTIMEOUT_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case SESSION_HANDLE:
@@ -323,6 +358,14 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
}
break;
+ case QUERY_TIMEOUT:
+ if (value == null) {
+ unsetQueryTimeout();
+ } else {
+ setQueryTimeout((Long)value);
+ }
+ break;
+
}
}
@@ -340,6 +383,9 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
case RUN_ASYNC:
return isRunAsync();
+ case QUERY_TIMEOUT:
+ return getQueryTimeout();
+
}
throw new IllegalStateException();
}
@@ -359,6 +405,8 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
return isSetConfOverlay();
case RUN_ASYNC:
return isSetRunAsync();
+ case QUERY_TIMEOUT:
+ return isSetQueryTimeout();
}
throw new IllegalStateException();
}
@@ -412,6 +460,15 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
return false;
}
+ boolean this_present_queryTimeout = true && this.isSetQueryTimeout();
+ boolean that_present_queryTimeout = true && that.isSetQueryTimeout();
+ if (this_present_queryTimeout || that_present_queryTimeout) {
+ if (!(this_present_queryTimeout && that_present_queryTimeout))
+ return false;
+ if (this.queryTimeout != that.queryTimeout)
+ return false;
+ }
+
return true;
}
@@ -439,6 +496,11 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
if (present_runAsync)
list.add(runAsync);
+ boolean present_queryTimeout = true && (isSetQueryTimeout());
+ list.add(present_queryTimeout);
+ if (present_queryTimeout)
+ list.add(queryTimeout);
+
return list.hashCode();
}
@@ -490,6 +552,16 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetQueryTimeout()).compareTo(other.isSetQueryTimeout());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetQueryTimeout()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.queryTimeout, other.queryTimeout);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -541,6 +613,12 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
sb.append(this.runAsync);
first = false;
}
+ if (isSetQueryTimeout()) {
+ if (!first) sb.append(", ");
+ sb.append("queryTimeout:");
+ sb.append(this.queryTimeout);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -642,6 +720,14 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 5: // QUERY_TIMEOUT
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.queryTimeout = iprot.readI64();
+ struct.setQueryTimeoutIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -685,6 +771,11 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
oprot.writeBool(struct.runAsync);
oprot.writeFieldEnd();
}
+ if (struct.isSetQueryTimeout()) {
+ oprot.writeFieldBegin(QUERY_TIMEOUT_FIELD_DESC);
+ oprot.writeI64(struct.queryTimeout);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -711,7 +802,10 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
if (struct.isSetRunAsync()) {
optionals.set(1);
}
- oprot.writeBitSet(optionals, 2);
+ if (struct.isSetQueryTimeout()) {
+ optionals.set(2);
+ }
+ oprot.writeBitSet(optionals, 3);
if (struct.isSetConfOverlay()) {
{
oprot.writeI32(struct.confOverlay.size());
@@ -725,6 +819,9 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
if (struct.isSetRunAsync()) {
oprot.writeBool(struct.runAsync);
}
+ if (struct.isSetQueryTimeout()) {
+ oprot.writeI64(struct.queryTimeout);
+ }
}
@Override
@@ -735,7 +832,7 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
struct.setSessionHandleIsSet(true);
struct.statement = iprot.readString();
struct.setStatementIsSet(true);
- BitSet incoming = iprot.readBitSet(2);
+ BitSet incoming = iprot.readBitSet(3);
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TMap _map168 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
@@ -755,6 +852,10 @@ public class TExecuteStatementReq implements org.apache.thrift.TBase<TExecuteSta
struct.runAsync = iprot.readBool();
struct.setRunAsyncIsSet(true);
}
+ if (incoming.get(2)) {
+ struct.queryTimeout = iprot.readI64();
+ struct.setQueryTimeoutIsSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationState.java
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationState.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationState.java
index 3fa49b0..4390b4b 100644
--- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationState.java
+++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationState.java
@@ -19,7 +19,8 @@ public enum TOperationState implements org.apache.thrift.TEnum {
CLOSED_STATE(4),
ERROR_STATE(5),
UKNOWN_STATE(6),
- PENDING_STATE(7);
+ PENDING_STATE(7),
+ TIMEDOUT_STATE(8);
private final int value;
@@ -56,6 +57,8 @@ public enum TOperationState implements org.apache.thrift.TEnum {
return UKNOWN_STATE;
case 7:
return PENDING_STATE;
+ case 8:
+ return TIMEDOUT_STATE;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service-rpc/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-php/Types.php b/service-rpc/src/gen/thrift/gen-php/Types.php
index 7f1f99f..9ed7403 100644
--- a/service-rpc/src/gen/thrift/gen-php/Types.php
+++ b/service-rpc/src/gen/thrift/gen-php/Types.php
@@ -109,6 +109,7 @@ final class TOperationState {
const ERROR_STATE = 5;
const UKNOWN_STATE = 6;
const PENDING_STATE = 7;
+ const TIMEDOUT_STATE = 8;
static public $__names = array(
0 => 'INITIALIZED_STATE',
1 => 'RUNNING_STATE',
@@ -118,6 +119,7 @@ final class TOperationState {
5 => 'ERROR_STATE',
6 => 'UKNOWN_STATE',
7 => 'PENDING_STATE',
+ 8 => 'TIMEDOUT_STATE',
);
}
@@ -5446,6 +5448,10 @@ class TExecuteStatementReq {
* @var bool
*/
public $runAsync = false;
+ /**
+ * @var int
+ */
+ public $queryTimeout = 0;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -5475,6 +5481,10 @@ class TExecuteStatementReq {
'var' => 'runAsync',
'type' => TType::BOOL,
),
+ 5 => array(
+ 'var' => 'queryTimeout',
+ 'type' => TType::I64,
+ ),
);
}
if (is_array($vals)) {
@@ -5490,6 +5500,9 @@ class TExecuteStatementReq {
if (isset($vals['runAsync'])) {
$this->runAsync = $vals['runAsync'];
}
+ if (isset($vals['queryTimeout'])) {
+ $this->queryTimeout = $vals['queryTimeout'];
+ }
}
}
@@ -5554,6 +5567,13 @@ class TExecuteStatementReq {
$xfer += $input->skip($ftype);
}
break;
+ case 5:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->queryTimeout);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -5603,6 +5623,11 @@ class TExecuteStatementReq {
$xfer += $output->writeBool($this->runAsync);
$xfer += $output->writeFieldEnd();
}
+ if ($this->queryTimeout !== null) {
+ $xfer += $output->writeFieldBegin('queryTimeout', TType::I64, 5);
+ $xfer += $output->writeI64($this->queryTimeout);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
index 3bb20b8..44e5462 100644
--- a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
+++ b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
@@ -154,6 +154,7 @@ class TOperationState:
ERROR_STATE = 5
UKNOWN_STATE = 6
PENDING_STATE = 7
+ TIMEDOUT_STATE = 8
_VALUES_TO_NAMES = {
0: "INITIALIZED_STATE",
@@ -164,6 +165,7 @@ class TOperationState:
5: "ERROR_STATE",
6: "UKNOWN_STATE",
7: "PENDING_STATE",
+ 8: "TIMEDOUT_STATE",
}
_NAMES_TO_VALUES = {
@@ -175,6 +177,7 @@ class TOperationState:
"ERROR_STATE": 5,
"UKNOWN_STATE": 6,
"PENDING_STATE": 7,
+ "TIMEDOUT_STATE": 8,
}
class TOperationType:
@@ -4162,6 +4165,7 @@ class TExecuteStatementReq:
- statement
- confOverlay
- runAsync
+ - queryTimeout
"""
thrift_spec = (
@@ -4170,13 +4174,15 @@ class TExecuteStatementReq:
(2, TType.STRING, 'statement', None, None, ), # 2
(3, TType.MAP, 'confOverlay', (TType.STRING,None,TType.STRING,None), None, ), # 3
(4, TType.BOOL, 'runAsync', None, False, ), # 4
+ (5, TType.I64, 'queryTimeout', None, 0, ), # 5
)
- def __init__(self, sessionHandle=None, statement=None, confOverlay=None, runAsync=thrift_spec[4][4],):
+ def __init__(self, sessionHandle=None, statement=None, confOverlay=None, runAsync=thrift_spec[4][4], queryTimeout=thrift_spec[5][4],):
self.sessionHandle = sessionHandle
self.statement = statement
self.confOverlay = confOverlay
self.runAsync = runAsync
+ self.queryTimeout = queryTimeout
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -4214,6 +4220,11 @@ class TExecuteStatementReq:
self.runAsync = iprot.readBool()
else:
iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.I64:
+ self.queryTimeout = iprot.readI64()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -4244,6 +4255,10 @@ class TExecuteStatementReq:
oprot.writeFieldBegin('runAsync', TType.BOOL, 4)
oprot.writeBool(self.runAsync)
oprot.writeFieldEnd()
+ if self.queryTimeout is not None:
+ oprot.writeFieldBegin('queryTimeout', TType.I64, 5)
+ oprot.writeI64(self.queryTimeout)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -4261,6 +4276,7 @@ class TExecuteStatementReq:
value = (value * 31) ^ hash(self.statement)
value = (value * 31) ^ hash(self.confOverlay)
value = (value * 31) ^ hash(self.runAsync)
+ value = (value * 31) ^ hash(self.queryTimeout)
return value
def __repr__(self):
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
index 7208bae..b39ec1e 100644
--- a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
+++ b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
@@ -65,8 +65,9 @@ module TOperationState
ERROR_STATE = 5
UKNOWN_STATE = 6
PENDING_STATE = 7
- VALUE_MAP = {0 => "INITIALIZED_STATE", 1 => "RUNNING_STATE", 2 => "FINISHED_STATE", 3 => "CANCELED_STATE", 4 => "CLOSED_STATE", 5 => "ERROR_STATE", 6 => "UKNOWN_STATE", 7 => "PENDING_STATE"}
- VALID_VALUES = Set.new([INITIALIZED_STATE, RUNNING_STATE, FINISHED_STATE, CANCELED_STATE, CLOSED_STATE, ERROR_STATE, UKNOWN_STATE, PENDING_STATE]).freeze
+ TIMEDOUT_STATE = 8
+ VALUE_MAP = {0 => "INITIALIZED_STATE", 1 => "RUNNING_STATE", 2 => "FINISHED_STATE", 3 => "CANCELED_STATE", 4 => "CLOSED_STATE", 5 => "ERROR_STATE", 6 => "UKNOWN_STATE", 7 => "PENDING_STATE", 8 => "TIMEDOUT_STATE"}
+ VALID_VALUES = Set.new([INITIALIZED_STATE, RUNNING_STATE, FINISHED_STATE, CANCELED_STATE, CLOSED_STATE, ERROR_STATE, UKNOWN_STATE, PENDING_STATE, TIMEDOUT_STATE]).freeze
end
module TOperationType
@@ -1135,12 +1136,14 @@ class TExecuteStatementReq
STATEMENT = 2
CONFOVERLAY = 3
RUNASYNC = 4
+ QUERYTIMEOUT = 5
FIELDS = {
SESSIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'sessionHandle', :class => ::TSessionHandle},
STATEMENT => {:type => ::Thrift::Types::STRING, :name => 'statement'},
CONFOVERLAY => {:type => ::Thrift::Types::MAP, :name => 'confOverlay', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true},
- RUNASYNC => {:type => ::Thrift::Types::BOOL, :name => 'runAsync', :default => false, :optional => true}
+ RUNASYNC => {:type => ::Thrift::Types::BOOL, :name => 'runAsync', :default => false, :optional => true},
+ QUERYTIMEOUT => {:type => ::Thrift::Types::I64, :name => 'queryTimeout', :default => 0, :optional => true}
}
def struct_fields; FIELDS; end
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java
index 4a83e38..ed52b4a 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -248,33 +248,55 @@ public class CLIService extends CompositeService implements ICLIService {
return infoValue;
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle,
- * java.lang.String, java.util.Map)
+ /**
+ * Execute statement on the server. This is a blocking call.
*/
@Override
public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
- Map<String, String> confOverlay)
- throws HiveSQLException {
- OperationHandle opHandle = sessionManager.getSession(sessionHandle)
- .executeStatement(statement, confOverlay);
+ Map<String, String> confOverlay) throws HiveSQLException {
+ OperationHandle opHandle =
+ sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay);
LOG.debug(sessionHandle + ": executeStatement()");
return opHandle;
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle,
- * java.lang.String, java.util.Map)
+ /**
+ * Execute statement on the server with a timeout. This is a blocking call.
+ */
+ @Override
+ public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
+ OperationHandle opHandle =
+ sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay,
+ queryTimeout);
+ LOG.debug(sessionHandle + ": executeStatement()");
+ return opHandle;
+ }
+
+ /**
+ * Execute statement asynchronously on the server. This is a non-blocking call
*/
@Override
public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay) throws HiveSQLException {
- OperationHandle opHandle = sessionManager.getSession(sessionHandle)
- .executeStatementAsync(statement, confOverlay);
+ OperationHandle opHandle =
+ sessionManager.getSession(sessionHandle).executeStatementAsync(statement, confOverlay);
LOG.debug(sessionHandle + ": executeStatementAsync()");
return opHandle;
}
+ /**
+ * Execute statement asynchronously on the server with a timeout. This is a non-blocking call
+ */
+ @Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
+ OperationHandle opHandle =
+ sessionManager.getSession(sessionHandle).executeStatementAsync(statement, confOverlay,
+ queryTimeout);
+ LOG.debug(sessionHandle + ": executeStatementAsync()");
+ return opHandle;
+ }
/* (non-Javadoc)
* @see org.apache.hive.service.cli.ICLIService#getTypeInfo(org.apache.hive.service.cli.SessionHandle)
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
index 79e0024..86e9bb1 100644
--- a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
@@ -67,26 +67,29 @@ public class EmbeddedCLIServiceClient extends CLIServiceClient {
return cliService.getInfo(sessionHandle, getInfoType);
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.CLIServiceClient#executeStatement(org.apache.hive.service.cli.SessionHandle,
- * java.lang.String, java.util.Map)
- */
@Override
public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay) throws HiveSQLException {
return cliService.executeStatement(sessionHandle, statement, confOverlay);
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.CLIServiceClient#executeStatementAsync(org.apache.hive.service.cli.SessionHandle,
- * java.lang.String, java.util.Map)
- */
+ @Override
+ public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
+ return cliService.executeStatement(sessionHandle, statement, confOverlay, queryTimeout);
+ }
+
@Override
public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay) throws HiveSQLException {
return cliService.executeStatementAsync(sessionHandle, statement, confOverlay);
}
+ @Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
+ return cliService.executeStatementAsync(sessionHandle, statement, confOverlay, queryTimeout);
+ }
/* (non-Javadoc)
* @see org.apache.hive.service.cli.CLIServiceClient#getTypeInfo(org.apache.hive.service.cli.SessionHandle)
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/ICLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/ICLIService.java b/service/src/java/org/apache/hive/service/cli/ICLIService.java
index e4aef96..fef772d 100644
--- a/service/src/java/org/apache/hive/service/cli/ICLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/ICLIService.java
@@ -39,12 +39,16 @@ public interface ICLIService {
throws HiveSQLException;
OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
- Map<String, String> confOverlay)
- throws HiveSQLException;
+ Map<String, String> confOverlay) throws HiveSQLException;
- OperationHandle executeStatementAsync(SessionHandle sessionHandle,
- String statement, Map<String, String> confOverlay)
- throws HiveSQLException;
+ OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException;
+
+ OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay) throws HiveSQLException;
+
+ OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException;
OperationHandle getTypeInfo(SessionHandle sessionHandle)
throws HiveSQLException;
@@ -105,6 +109,4 @@ public interface ICLIService {
String primaryCatalog, String primarySchema, String primaryTable,
String foreignCatalog, String foreignSchema, String foreignTable)
throws HiveSQLException;
-
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/OperationState.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/OperationState.java b/service/src/java/org/apache/hive/service/cli/OperationState.java
index 6a67a1d..ae1ff5e 100644
--- a/service/src/java/org/apache/hive/service/cli/OperationState.java
+++ b/service/src/java/org/apache/hive/service/cli/OperationState.java
@@ -32,7 +32,8 @@ public enum OperationState {
CLOSED(TOperationState.CLOSED_STATE, true),
ERROR(TOperationState.ERROR_STATE, true),
UNKNOWN(TOperationState.UKNOWN_STATE, false),
- PENDING(TOperationState.PENDING_STATE, false);
+ PENDING(TOperationState.PENDING_STATE, false),
+ TIMEDOUT(TOperationState.TIMEDOUT_STATE, true);
private final TOperationState tOperationState;
private final boolean terminal;
@@ -57,6 +58,7 @@ public enum OperationState {
case RUNNING:
case CANCELED:
case CLOSED:
+ case TIMEDOUT:
return;
}
break;
@@ -67,6 +69,7 @@ public enum OperationState {
case CANCELED:
case ERROR:
case CLOSED:
+ case TIMEDOUT:
return;
}
break;
@@ -76,11 +79,13 @@ public enum OperationState {
case CANCELED:
case ERROR:
case CLOSED:
+ case TIMEDOUT:
return;
}
break;
case FINISHED:
case CANCELED:
+ case TIMEDOUT:
case ERROR:
if (OperationState.CLOSED.equals(newState)) {
return;
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
index b3d9b52..ff46ed8 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
@@ -39,9 +39,9 @@ public abstract class ExecuteStatementOperation extends Operation {
return statement;
}
- public static ExecuteStatementOperation newExecuteStatementOperation(
- HiveSession parentSession, String statement, Map<String, String> confOverlay, boolean runAsync)
- throws HiveSQLException {
+ public static ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
+ String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout)
+ throws HiveSQLException {
String[] tokens = statement.trim().split("\\s+");
CommandProcessor processor = null;
try {
@@ -50,7 +50,8 @@ public abstract class ExecuteStatementOperation extends Operation {
throw new HiveSQLException(e.getMessage(), e.getSQLState(), e);
}
if (processor == null) {
- return new SQLOperation(parentSession, statement, confOverlay, runAsync);
+ // runAsync, queryTimeout makes sense only for a SQLOperation
+ return new SQLOperation(parentSession, statement, confOverlay, runAsync, queryTimeout);
}
return new HiveCommandOperation(parentSession, statement, processor, confOverlay);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
index f18dc67..8f08c2e 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
@@ -229,4 +229,9 @@ public class HiveCommandOperation extends ExecuteStatementOperation {
resultReader = null;
}
}
+
+ @Override
+ public void cancel(OperationState stateAfterCancel) throws HiveSQLException {
+ throw new UnsupportedOperationException("HiveCommandOperation.cancel()");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
index 77228fa..fd6e428 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
@@ -109,7 +109,7 @@ public abstract class MetadataOperation extends Operation {
pattern = replaceAll(pattern, "^_", ".");
return pattern;
}
-
+
private String replaceAll(String input, final String pattern, final String replace) {
while (true) {
String replaced = input.replaceAll(pattern, replace);
@@ -145,4 +145,9 @@ public abstract class MetadataOperation extends Operation {
}
}
+ @Override
+ public void cancel(OperationState stateAfterCancel) throws HiveSQLException {
+ throw new UnsupportedOperationException("MetadataOperation.cancel()");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/operation/Operation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 63b1a48..0932884 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -327,22 +327,23 @@ public abstract class Operation {
}
}
- protected void cleanupOperationLog() {
+ protected synchronized void cleanupOperationLog() {
if (isOperationLogEnabled) {
+ if (opHandle == null) {
+ LOG.warn("Operation seems to be in invalid state, opHandle is null");
+ return;
+ }
if (operationLog == null) {
- LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] "
- + "logging is enabled, but its OperationLog object cannot be found.");
+ LOG.warn("Operation [ " + opHandle.getHandleIdentifier() + " ] " + "logging is enabled, "
+ + "but its OperationLog object cannot be found. "
+ + "Perhaps the operation has already terminated.");
} else {
operationLog.close();
}
}
}
- // TODO: make this abstract and implement in subclasses.
- public void cancel() throws HiveSQLException {
- setState(OperationState.CANCELED);
- throw new UnsupportedOperationException("SQLOperation.cancel()");
- }
+ public abstract void cancel(OperationState stateAfterCancel) throws HiveSQLException;
public abstract void close() throws HiveSQLException;
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 52e4b4d..2f18231 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -90,12 +90,10 @@ public class OperationManager extends AbstractService {
@Override
public synchronized void start() {
super.start();
- // TODO
}
@Override
public synchronized void stop() {
- // TODO
super.stop();
}
@@ -111,10 +109,11 @@ public class OperationManager extends AbstractService {
}
public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
- String statement, Map<String, String> confOverlay, boolean runAsync)
- throws HiveSQLException {
- ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
- .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync);
+ String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout)
+ throws HiveSQLException {
+ ExecuteStatementOperation executeStatementOperation =
+ ExecuteStatementOperation.newExecuteStatementOperation(parentSession, statement,
+ confOverlay, runAsync, queryTimeout);
addOperation(executeStatementOperation);
return executeStatementOperation;
}
@@ -250,20 +249,20 @@ public class OperationManager extends AbstractService {
return getOperation(opHandle).getStatus();
}
+ /**
+ * Cancel the running operation unless it is already in a terminal state
+ * @param opHandle
+ * @throws HiveSQLException
+ */
public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
Operation operation = getOperation(opHandle);
OperationState opState = operation.getStatus().getState();
- if (opState == OperationState.CANCELED ||
- opState == OperationState.CLOSED ||
- opState == OperationState.FINISHED ||
- opState == OperationState.ERROR ||
- opState == OperationState.UNKNOWN) {
+ if (opState.isTerminal()) {
// Cancel should be a no-op in either cases
LOG.debug(opHandle + ": Operation is already aborted in state - " + opState);
- }
- else {
+ } else {
LOG.debug(opHandle + ": Attempting to cancel from state - " + opState);
- operation.cancel();
+ operation.cancel(OperationState.CANCELED);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 01dd48c..67e0e52 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -18,12 +18,24 @@
package org.apache.hive.service.cli.operation;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.binary.Base64;
@@ -84,9 +96,10 @@ public class SQLOperation extends ExecuteStatementOperation {
private SerDe serde = null;
private boolean fetchStarted = false;
private volatile MetricsScope currentSQLStateScope;
-
- //Display for WebUI.
+ // Display for WebUI.
private SQLOperationDisplay sqlOpDisplay;
+ private long queryTimeout;
+ private ScheduledExecutorService timeoutExecutor;
/**
* A map to track query count running by each user
@@ -94,10 +107,11 @@ public class SQLOperation extends ExecuteStatementOperation {
private static Map<String, AtomicInteger> userQueries = new HashMap<String, AtomicInteger>();
private static final String ACTIVE_SQL_USER = MetricsConstant.SQL_OPERATION_PREFIX + "active_user";
- public SQLOperation(HiveSession parentSession, String statement, Map<String,
- String> confOverlay, boolean runInBackground) {
+ public SQLOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay,
+ boolean runInBackground, long queryTimeout) {
// TODO: call setRemoteUser in ExecuteStatementOperation or higher.
super(parentSession, statement, confOverlay, runInBackground);
+ this.queryTimeout = queryTimeout;
setupSessionIO(parentSession.getSessionState());
try {
sqlOpDisplay = new SQLOperationDisplay(this);
@@ -121,7 +135,7 @@ public class SQLOperation extends ExecuteStatementOperation {
}
}
- /***
+ /**
* Compile the query and extract metadata
* @param sqlOperationConf
* @throws HiveSQLException
@@ -130,6 +144,29 @@ public class SQLOperation extends ExecuteStatementOperation {
setState(OperationState.RUNNING);
try {
driver = new Driver(queryState, getParentSession().getUserName());
+
+ // Start the timer thread for cancelling the query when query timeout is reached
+ // queryTimeout == 0 means no timeout
+ if (queryTimeout > 0) {
+ timeoutExecutor = new ScheduledThreadPoolExecutor(1);
+ Runnable timeoutTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Query timed out after: " + queryTimeout
+ + " seconds. Cancelling the execution now.");
+ SQLOperation.this.cancel(OperationState.TIMEDOUT);
+ } catch (HiveSQLException e) {
+ LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e);
+ } finally {
+ // Stop
+ timeoutExecutor.shutdown();
+ }
+ }
+ };
+ timeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS);
+ }
+
sqlOpDisplay.setQueryDisplay(driver.getQueryDisplay());
// set the operation handle information in Driver, so that thrift API users
@@ -184,6 +221,13 @@ public class SQLOperation extends ExecuteStatementOperation {
private void runQuery() throws HiveSQLException {
try {
+ OperationState opState = getStatus().getState();
+ // Operation may have been cancelled by another thread
+ if (opState.isTerminal()) {
+ LOG.info("Not running the query. Operation is already in terminal state: " + opState
+ + ", perhaps cancelled due to query timeout or by another thread.");
+ return;
+ }
// In Hive server mode, we are not able to retry in the FetchTask
// case, when calling fetch queries since execute() has returned.
// For now, we disable the test attempts.
@@ -193,14 +237,16 @@ public class SQLOperation extends ExecuteStatementOperation {
throw toSQLException("Error while processing statement", response);
}
} catch (HiveSQLException e) {
- // If the operation was cancelled by another thread,
- // Driver#run will return a non-zero response code.
- // We will simply return if the operation state is CANCELED,
- // otherwise throw an exception
- if (getStatus().getState() == OperationState.CANCELED) {
+ /**
+ * If the operation was cancelled by another thread, or the execution timed out, Driver#run
+ * may return a non-zero response code. We will simply return if the operation state is
+ * CANCELED, TIMEDOUT or CLOSED, otherwise throw an exception
+ */
+ if ((getStatus().getState() == OperationState.CANCELED)
+ || (getStatus().getState() == OperationState.TIMEDOUT)
+ || (getStatus().getState() == OperationState.CLOSED)) {
return;
- }
- else {
+ } else {
setState(OperationState.ERROR);
throw e;
}
@@ -312,8 +358,22 @@ public class SQLOperation extends ExecuteStatementOperation {
}
}
- private void cleanup(OperationState state) throws HiveSQLException {
+ private synchronized void cleanup(OperationState state) throws HiveSQLException {
setState(state);
+ if (driver != null) {
+ driver.close();
+ driver.destroy();
+ }
+ driver = null;
+
+ SessionState ss = SessionState.get();
+ if (ss == null) {
+ LOG.warn("Operation seems to be in invalid state, SessionState is null");
+ } else {
+ ss.deleteTmpOutputFile();
+ ss.deleteTmpErrOutputFile();
+ }
+
if (shouldRunAsync()) {
Future<?> backgroundHandle = getBackgroundHandle();
if (backgroundHandle != null) {
@@ -321,20 +381,16 @@ public class SQLOperation extends ExecuteStatementOperation {
}
}
- if (driver != null) {
- driver.close();
- driver.destroy();
+ // Shutdown the timeout thread if any, while closing this operation
+ if ((timeoutExecutor != null) && (state != OperationState.TIMEDOUT) && (state.isTerminal())) {
+ timeoutExecutor.shutdownNow();
}
- driver = null;
-
- SessionState ss = SessionState.get();
- ss.deleteTmpOutputFile();
- ss.deleteTmpErrOutputFile();
}
@Override
- public void cancel() throws HiveSQLException {
- cleanup(OperationState.CANCELED);
+ public void cancel(OperationState stateAfterCancel) throws HiveSQLException {
+ cleanup(stateAfterCancel);
+ cleanupOperationLog();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index 9ea643b..78ff388 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -56,18 +56,38 @@ public interface HiveSession extends HiveSessionBase {
* @return
* @throws HiveSQLException
*/
- OperationHandle executeStatement(String statement,
- Map<String, String> confOverlay) throws HiveSQLException;
+ OperationHandle executeStatement(String statement, Map<String, String> confOverlay) throws HiveSQLException;
/**
* execute operation handler
* @param statement
* @param confOverlay
+ * @param queryTimeout
* @return
* @throws HiveSQLException
*/
- OperationHandle executeStatementAsync(String statement,
- Map<String, String> confOverlay) throws HiveSQLException;
+ OperationHandle executeStatement(String statement, Map<String, String> confOverlay,
+ long queryTimeout) throws HiveSQLException;
+
+ /**
+ * execute operation handler
+ * @param statement
+ * @param confOverlay
+ * @return
+ * @throws HiveSQLException
+ */
+ OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay) throws HiveSQLException;
+
+ /**
+ * execute operation handler
+ * @param statement
+ * @param confOverlay
+ * @param queryTimeout
+ * @return
+ * @throws HiveSQLException
+ */
+ OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay,
+ long queryTimeout) throws HiveSQLException;
/**
* getTypeInfo operation handler
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 0cfec7a..a0015eb 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -204,7 +204,7 @@ public class HiveSessionImpl implements HiveSession {
OperationHandle opHandle = null;
try {
//execute in sync mode
- opHandle = executeStatementInternal(cmd_trimed, null, false);
+ opHandle = executeStatementInternal(cmd_trimed, null, false, 0);
} catch (HiveSQLException e) {
LOG.warn("Failed to execute command in global .hiverc file.", e);
return -1;
@@ -426,33 +426,43 @@ public class HiveSessionImpl implements HiveSession {
}
@Override
- public OperationHandle executeStatement(String statement, Map<String, String> confOverlay)
- throws HiveSQLException {
- return executeStatementInternal(statement, confOverlay, false);
+ public OperationHandle executeStatement(String statement, Map<String, String> confOverlay) throws HiveSQLException {
+ return executeStatementInternal(statement, confOverlay, false, 0);
}
@Override
- public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay)
- throws HiveSQLException {
- return executeStatementInternal(statement, confOverlay, true);
+ public OperationHandle executeStatement(String statement, Map<String, String> confOverlay,
+ long queryTimeout) throws HiveSQLException {
+ return executeStatementInternal(statement, confOverlay, false, queryTimeout);
}
- private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
- boolean runAsync)
- throws HiveSQLException {
+ @Override
+ public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay) throws HiveSQLException {
+ return executeStatementInternal(statement, confOverlay, true, 0);
+ }
+
+ @Override
+ public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay,
+ long queryTimeout) throws HiveSQLException {
+ return executeStatementInternal(statement, confOverlay, true, queryTimeout);
+ }
+
+ private OperationHandle executeStatementInternal(String statement,
+ Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
acquire(true);
OperationManager operationManager = getOperationManager();
- ExecuteStatementOperation operation = operationManager
- .newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync);
+ ExecuteStatementOperation operation =
+ operationManager.newExecuteStatementOperation(getSession(), statement, confOverlay,
+ runAsync, queryTimeout);
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
addOpHandle(opHandle);
return opHandle;
} catch (HiveSQLException e) {
- // Refering to SQLOperation.java,there is no chance that a HiveSQLException throws and the asyn
- // background operation submits to thread pool successfully at the same time. So, Cleanup
+ // Refering to SQLOperation.java, there is no chance that a HiveSQLException throws and the
+ // async background operation submits to thread pool successfully at the same time. So, Cleanup
// opHandle directly when got HiveSQLException
operationManager.closeOperation(opHandle);
throw e;
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
index b2e0e9e..933750b 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
@@ -126,20 +126,30 @@ public class RetryingThriftCLIServiceClient implements InvocationHandler {
}
@Override
- public OperationHandle executeStatement(SessionHandle sessionHandle,
- String statement,
- Map<String, String> confOverlay) throws HiveSQLException {
+ public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay) throws HiveSQLException {
return cliService.executeStatement(sessionHandle, statement, confOverlay);
}
@Override
- public OperationHandle executeStatementAsync(SessionHandle sessionHandle,
- String statement,
- Map<String, String> confOverlay) throws HiveSQLException {
+ public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
+ return cliService.executeStatement(sessionHandle, statement, confOverlay, queryTimeout);
+ }
+
+ @Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay) throws HiveSQLException {
return cliService.executeStatementAsync(sessionHandle, statement, confOverlay);
}
@Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
+ return cliService.executeStatementAsync(sessionHandle, statement, confOverlay, queryTimeout);
+ }
+
+ @Override
public OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException {
return cliService.getTypeInfo(sessionHandle);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 6ede1d7..5464e58 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -493,15 +493,17 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
String statement = req.getStatement();
Map<String, String> confOverlay = req.getConfOverlay();
Boolean runAsync = req.isRunAsync();
- OperationHandle operationHandle = runAsync ?
- cliService.executeStatementAsync(sessionHandle, statement, confOverlay)
- : cliService.executeStatement(sessionHandle, statement, confOverlay);
- resp.setOperationHandle(operationHandle.toTOperationHandle());
- resp.setStatus(OK_STATUS);
+ long queryTimeout = req.getQueryTimeout();
+ OperationHandle operationHandle =
+ runAsync ? cliService.executeStatementAsync(sessionHandle, statement, confOverlay,
+ queryTimeout) : cliService.executeStatement(sessionHandle, statement, confOverlay,
+ queryTimeout);
+ resp.setOperationHandle(operationHandle.toTOperationHandle());
+ resp.setStatus(OK_STATUS);
} catch (Exception e) {
// Note: it's rather important that this (and other methods) catch Exception, not Throwable;
- // in combination with HiveSessionProxy.invoke code, perhaps unintentionally, it used
- // to also catch all errors; and now it allows OOMs only to propagate.
+ // in combination with HiveSessionProxy.invoke code, perhaps unintentionally, it used
+ // to also catch all errors; and now it allows OOMs only to propagate.
LOG.warn("Error executing statement: ", e);
resp.setStatus(HiveSQLException.toTStatus(e));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
index 098aea6..82ac42d 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
@@ -166,34 +166,38 @@ public class ThriftCLIServiceClient extends CLIServiceClient {
}
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map)
- */
@Override
public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
- Map<String, String> confOverlay)
- throws HiveSQLException {
- return executeStatementInternal(sessionHandle, statement, confOverlay, false);
+ Map<String, String> confOverlay) throws HiveSQLException {
+ return executeStatementInternal(sessionHandle, statement, confOverlay, false, 0);
+ }
+
+ @Override
+ public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
+ return executeStatementInternal(sessionHandle, statement, confOverlay, false, queryTimeout);
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map)
- */
@Override
public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
- Map<String, String> confOverlay)
- throws HiveSQLException {
- return executeStatementInternal(sessionHandle, statement, confOverlay, true);
+ Map<String, String> confOverlay) throws HiveSQLException {
+ return executeStatementInternal(sessionHandle, statement, confOverlay, true, 0);
+ }
+
+ @Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
+ Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
+ return executeStatementInternal(sessionHandle, statement, confOverlay, true, queryTimeout);
}
private OperationHandle executeStatementInternal(SessionHandle sessionHandle, String statement,
- Map<String, String> confOverlay, boolean isAsync)
- throws HiveSQLException {
+ Map<String, String> confOverlay, boolean isAsync, long queryTimeout) throws HiveSQLException {
try {
TExecuteStatementReq req =
new TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement);
req.setConfOverlay(confOverlay);
req.setRunAsync(isAsync);
+ req.setQueryTimeout(queryTimeout);
TExecuteStatementResp resp = cliService.ExecuteStatement(req);
checkStatus(resp.getStatus());
TProtocolVersion protocol = sessionHandle.getProtocolVersion();
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
index 1740079..abb1ecf 100644
--- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
@@ -178,8 +178,7 @@ public abstract class ThriftCLIServiceTest {
// Execute another query
queryString = "SELECT ID+1 FROM TEST_EXEC_THRIFT";
- OperationHandle opHandle = client.executeStatement(sessHandle,
- queryString, opConf);
+ OperationHandle opHandle = client.executeStatement(sessHandle, queryString, opConf);
assertNotNull(opHandle);
OperationStatus opStatus = client.getOperationStatus(opHandle);
@@ -229,8 +228,7 @@ public abstract class ThriftCLIServiceTest {
// Execute another query
queryString = "SELECT ID+1 FROM TEST_EXEC_ASYNC_THRIFT";
System.out.println("Will attempt to execute: " + queryString);
- opHandle = client.executeStatementAsync(sessHandle,
- queryString, opConf);
+ opHandle = client.executeStatementAsync(sessHandle, queryString, opConf);
assertNotNull(opHandle);
// Poll on the operation status till the query is completed
http://git-wip-us.apache.org/repos/asf/hive/blob/b6218275/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
index a1ef1fc..ab20c4c 100644
--- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
+++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
@@ -200,8 +200,7 @@ public class ThriftCliServiceTestWithCookie {
// Execute another query
queryString = "SELECT ID+1 FROM TEST_EXEC_THRIFT";
- OperationHandle opHandle = client.executeStatement(sessHandle,
- queryString, opConf);
+ OperationHandle opHandle = client.executeStatement(sessHandle, queryString, opConf);
assertNotNull(opHandle);
OperationStatus opStatus = client.getOperationStatus(opHandle);