You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2017/02/07 20:12:34 UTC
[1/4] hive git commit: HIVE-15473: Progress Bar on Beeline client
(Anishek Agarwal via Thejas Nair)
Repository: hive
Updated Branches:
refs/heads/master f6cdbc879 -> 3e01ef326
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java b/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java
new file mode 100644
index 0000000..76ce24a
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java
@@ -0,0 +1,38 @@
+package org.apache.hive.service.cli;
+
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+
+import java.util.List;
+
+public class JobProgressUpdate {
+ public final double progressedPercentage;
+ public final String footerSummary;
+ public final long startTimeMillis;
+ private final List<String> headers;
+ private final List<List<String>> rows;
+ public final String status;
+
+
+ JobProgressUpdate(ProgressMonitor monitor) {
+ this(monitor.headers(), monitor.rows(), monitor.footerSummary(), monitor.progressedPercentage(),
+ monitor.startTime(), monitor.executionStatus());
+ }
+
+ private JobProgressUpdate(List<String> headers, List<List<String>> rows, String footerSummary,
+ double progressedPercentage, long startTimeMillis, String status) {
+ this.progressedPercentage = progressedPercentage;
+ this.footerSummary = footerSummary;
+ this.startTimeMillis = startTimeMillis;
+ this.headers = headers;
+ this.rows = rows;
+ this.status = status;
+ }
+
+ public List<String> headers() {
+ return headers;
+ }
+
+ public List<List<String>> rows() {
+ return rows;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/OperationStatus.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/OperationStatus.java b/service/src/java/org/apache/hive/service/cli/OperationStatus.java
index b0a26e3..317585f 100644
--- a/service/src/java/org/apache/hive/service/cli/OperationStatus.java
+++ b/service/src/java/org/apache/hive/service/cli/OperationStatus.java
@@ -30,6 +30,7 @@ public class OperationStatus {
private final long operationCompleted;
private final boolean hasResultSet;
private final HiveSQLException operationException;
+ private JobProgressUpdate jobProgressUpdate;
public OperationStatus(OperationState state, String taskStatus, long operationStarted, long operationCompleted, boolean hasResultSet, HiveSQLException operationException) {
this.state = state;
@@ -64,4 +65,11 @@ public class OperationStatus {
return operationException;
}
+ void setJobProgressUpdate(JobProgressUpdate jobProgressUpdate){
+ this.jobProgressUpdate = jobProgressUpdate;
+ }
+
+ public JobProgressUpdate jobProgressUpdate(){
+ return jobProgressUpdate;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java b/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java
new file mode 100644
index 0000000..29a5f66
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java
@@ -0,0 +1,19 @@
+package org.apache.hive.service.cli;
+
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+
+/**
+ * This defines the mapping between the internal execution status of various engines and the
+ * generic states that the progress monitor cares about. Theses are specified by TJobExecutionStatus
+ */
+public interface ProgressMonitorStatusMapper {
+
+ ProgressMonitorStatusMapper DEFAULT = new ProgressMonitorStatusMapper() {
+ @Override
+ public TJobExecutionStatus forStatus(String status) {
+ return TJobExecutionStatus.NOT_AVAILABLE;
+ }
+ };
+
+ TJobExecutionStatus forStatus(String status);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java b/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java
new file mode 100644
index 0000000..88fbd22
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java
@@ -0,0 +1,32 @@
+package org.apache.hive.service.cli;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+
+public class TezProgressMonitorStatusMapper implements ProgressMonitorStatusMapper {
+
+ /**
+ * These states are taken form DAGStatus.State, could not use that here directly as it was
+ * optional dependency and did not want to include it just for the enum.
+ */
+ enum TezStatus {
+ SUBMITTED, INITING, RUNNING, SUCCEEDED, KILLED, FAILED, ERROR
+
+ }
+
+ @Override
+ public TJobExecutionStatus forStatus(String status) {
+ if (StringUtils.isEmpty(status)) {
+ return TJobExecutionStatus.NOT_AVAILABLE;
+ }
+ TezStatus tezStatus = TezStatus.valueOf(status);
+ switch (tezStatus) {
+ case SUBMITTED:
+ case INITING:
+ case RUNNING:
+ return TJobExecutionStatus.IN_PROGRESS;
+ default:
+ return TJobExecutionStatus.COMPLETE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
index 85b82b6..0e76c91 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import javax.security.sasl.SaslException;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.PlainSaslHelper;
import org.apache.hive.service.cli.CLIServiceClient;
@@ -191,8 +190,8 @@ public class RetryingThriftCLIServiceClient implements InvocationHandler {
}
@Override
- public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException {
- return cliService.getOperationStatus(opHandle);
+ public OperationStatus getOperationStatus(OperationHandle opHandle, boolean getProgressUpdate) throws HiveSQLException {
+ return cliService.getOperationStatus(opHandle, getProgressUpdate);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 2938338..e09d9fe 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -24,7 +24,6 @@ import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.security.auth.login.LoginException;
@@ -33,24 +32,25 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.ServiceUtils;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.TSetIpAddressProcessor;
-import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.FetchOrientation;
import org.apache.hive.service.cli.FetchType;
import org.apache.hive.service.cli.GetInfoType;
import org.apache.hive.service.cli.GetInfoValue;
import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.JobProgressUpdate;
import org.apache.hive.service.cli.OperationHandle;
import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.ProgressMonitorStatusMapper;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.TezProgressMonitorStatusMapper;
import org.apache.hive.service.cli.session.SessionManager;
import org.apache.hive.service.rpc.thrift.TCLIService;
import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
@@ -93,6 +93,7 @@ import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
@@ -629,15 +630,30 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
@Override
public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {
TGetOperationStatusResp resp = new TGetOperationStatusResp();
+ OperationHandle operationHandle = new OperationHandle(req.getOperationHandle());
try {
- OperationStatus operationStatus = cliService.getOperationStatus(
- new OperationHandle(req.getOperationHandle()));
+ OperationStatus operationStatus =
+ cliService.getOperationStatus(operationHandle, req.isGetProgressUpdate());
resp.setOperationState(operationStatus.getState().toTOperationState());
HiveSQLException opException = operationStatus.getOperationException();
resp.setTaskStatus(operationStatus.getTaskStatus());
resp.setOperationStarted(operationStatus.getOperationStarted());
resp.setOperationCompleted(operationStatus.getOperationCompleted());
resp.setHasResultSet(operationStatus.getHasResultSet());
+ JobProgressUpdate progressUpdate = operationStatus.jobProgressUpdate();
+ ProgressMonitorStatusMapper mapper = ProgressMonitorStatusMapper.DEFAULT;
+ if ("tez".equals(hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
+ mapper = new TezProgressMonitorStatusMapper();
+ }
+
+ resp.setProgressUpdateResponse(new TProgressUpdateResp(
+ progressUpdate.headers(),
+ progressUpdate.rows(),
+ progressUpdate.progressedPercentage,
+ mapper.forStatus(progressUpdate.status),
+ progressUpdate.footerSummary,
+ progressUpdate.startTimeMillis
+ ));
if (opException != null) {
resp.setSqlState(opException.getSQLState());
resp.setErrorCode(opException.getErrorCode());
@@ -746,7 +762,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
}
return resp;
}
-
+
@Override
public abstract void run();
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
index 9805641..617bc40 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
@@ -367,9 +367,10 @@ public class ThriftCLIServiceClient extends CLIServiceClient {
* @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle)
*/
@Override
- public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException {
+ public OperationStatus getOperationStatus(OperationHandle opHandle, boolean getProgressUpdate) throws HiveSQLException {
try {
TGetOperationStatusReq req = new TGetOperationStatusReq(opHandle.toTOperationHandle());
+ req.setGetProgressUpdate(getProgressUpdate);
TGetOperationStatusResp resp = cliService.GetOperationStatus(req);
// Checks the status of the RPC call, throws an exception in case of error
checkStatus(resp.getStatus());
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index f325dbc..bc6648e 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -173,11 +173,11 @@ public abstract class CLIServiceTest {
queryString = "SELECT ID+1 FROM TEST_EXEC";
opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
- OperationStatus opStatus = client.getOperationStatus(opHandle);
+ OperationStatus opStatus = client.getOperationStatus(opHandle, false);
checkOperationTimes(opHandle, opStatus);
// Expect query to be completed now
assertEquals("Query should be finished",
- OperationState.FINISHED, client.getOperationStatus(opHandle).getState());
+ OperationState.FINISHED, client.getOperationStatus(opHandle, false).getState());
client.closeOperation(opHandle);
// Cleanup
@@ -273,10 +273,10 @@ public abstract class CLIServiceTest {
System.out.println("Cancelling " + opHandle);
client.cancelOperation(opHandle);
- OperationStatus operationStatus = client.getOperationStatus(opHandle);
+ OperationStatus operationStatus = client.getOperationStatus(opHandle, false);
checkOperationTimes(opHandle, operationStatus);
- state = client.getOperationStatus(opHandle).getState();
+ state = client.getOperationStatus(opHandle, false).getState();
System.out.println(opHandle + " after cancelling, state= " + state);
assertEquals("Query should be cancelled", OperationState.CANCELED, state);
@@ -545,7 +545,7 @@ public abstract class CLIServiceTest {
}
longPollingStart = System.currentTimeMillis();
System.out.println("Long polling starts at: " + longPollingStart);
- opStatus = client.getOperationStatus(opHandle);
+ opStatus = client.getOperationStatus(opHandle, false);
state = opStatus.getState();
longPollingEnd = System.currentTimeMillis();
System.out.println("Long polling ends at: " + longPollingEnd);
@@ -568,7 +568,7 @@ public abstract class CLIServiceTest {
assertTrue(longPollingTimeDelta - 0.9*expectedTimeout > 0);
}
}
- assertEquals(expectedState, client.getOperationStatus(opHandle).getState());
+ assertEquals(expectedState, client.getOperationStatus(opHandle, false).getState());
client.closeOperation(opHandle);
return opStatus;
}
@@ -606,7 +606,7 @@ public abstract class CLIServiceTest {
assertNotNull(opHandle);
// query should pass and create the table
assertEquals("Query should be finished",
- OperationState.FINISHED, client.getOperationStatus(opHandle).getState());
+ OperationState.FINISHED, client.getOperationStatus(opHandle, false).getState());
client.closeOperation(opHandle);
// select from the new table should pass
@@ -615,7 +615,7 @@ public abstract class CLIServiceTest {
assertNotNull(opHandle);
// query should pass and create the table
assertEquals("Query should be finished",
- OperationState.FINISHED, client.getOperationStatus(opHandle).getState());
+ OperationState.FINISHED, client.getOperationStatus(opHandle, false).getState());
client.closeOperation(opHandle);
// the settings in conf overlay should not be part of session config
@@ -653,7 +653,7 @@ public abstract class CLIServiceTest {
OperationStatus status = null;
int count = 0;
while (true) {
- status = client.getOperationStatus(ophandle);
+ status = client.getOperationStatus(ophandle, false);
checkOperationTimes(ophandle, status);
OperationState state = status.getState();
System.out.println("Polling: " + ophandle + " count=" + (++count)
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
index 2855bb3..79953c4 100644
--- a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
+++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
@@ -206,7 +206,7 @@ public class TestRetryingThriftCLIServiceClient {
// operations will be lost once owning session is closed.
for (OperationHandle op: new OperationHandle[]{op1, op2}) {
try {
- client.getOperationStatus(op);
+ client.getOperationStatus(op, false);
fail("Should have failed.");
} catch (HiveSQLException ignored) {
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
index abb1ecf..4c59fca 100644
--- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
@@ -181,7 +181,7 @@ public abstract class ThriftCLIServiceTest {
OperationHandle opHandle = client.executeStatement(sessHandle, queryString, opConf);
assertNotNull(opHandle);
- OperationStatus opStatus = client.getOperationStatus(opHandle);
+ OperationStatus opStatus = client.getOperationStatus(opHandle, false);
assertNotNull(opStatus);
OperationState state = opStatus.getState();
@@ -241,7 +241,7 @@ public abstract class ThriftCLIServiceTest {
System.out.println("Polling timed out");
break;
}
- opStatus = client.getOperationStatus(opHandle);
+ opStatus = client.getOperationStatus(opHandle, false);
assertNotNull(opStatus);
state = opStatus.getState();
System.out.println("Current state: " + state);
@@ -264,7 +264,7 @@ public abstract class ThriftCLIServiceTest {
System.out.println("Will attempt to execute: " + queryString);
opHandle = client.executeStatementAsync(sessHandle, queryString, opConf);
assertNotNull(opHandle);
- opStatus = client.getOperationStatus(opHandle);
+ opStatus = client.getOperationStatus(opHandle, false);
assertNotNull(opStatus);
isQueryRunning = true;
pollTimeout = System.currentTimeMillis() + 100000;
@@ -283,7 +283,7 @@ public abstract class ThriftCLIServiceTest {
isQueryRunning = false;
}
Thread.sleep(1000);
- opStatus = client.getOperationStatus(opHandle);
+ opStatus = client.getOperationStatus(opHandle, false);
}
// Expect query to return an error state
assertEquals("Operation should be in error state",
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
index a5c8d62..6fec947 100644
--- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
+++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
@@ -202,7 +202,7 @@ public class ThriftCliServiceTestWithCookie {
OperationHandle opHandle = client.executeStatement(sessHandle, queryString, opConf);
assertNotNull(opHandle);
- OperationStatus opStatus = client.getOperationStatus(opHandle);
+ OperationStatus opStatus = client.getOperationStatus(opHandle, false);
assertNotNull(opStatus);
OperationState state = opStatus.getState();
[4/4] hive git commit: HIVE-15473: Progress Bar on Beeline client
(Anishek Agarwal via Thejas Nair)
Posted by th...@apache.org.
HIVE-15473: Progress Bar on Beeline client (Anishek Agarwal via Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3e01ef32
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3e01ef32
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3e01ef32
Branch: refs/heads/master
Commit: 3e01ef3268ffbcb69c5c18c2c9f8810512c91bf8
Parents: f6cdbc8
Author: Anishek Agarwal <an...@gmail.com>
Authored: Fri Jan 6 14:31:21 2017 +0530
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Tue Feb 7 12:12:27 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/hive/beeline/Commands.java | 84 +-
.../logs/BeelineInPlaceUpdateStream.java | 66 ++
common/pom.xml | 5 +
.../hadoop/hive/common/log/InPlaceUpdate.java | 202 ++++
.../hadoop/hive/common/log/ProgressMonitor.java | 51 +
.../org/apache/hadoop/hive/conf/HiveConf.java | 7 +-
.../TestOperationLoggingAPIWithMr.java | 2 +-
.../TestOperationLoggingAPIWithTez.java | 2 +-
.../org/apache/hive/jdbc/HiveStatement.java | 13 +
.../hive/jdbc/logs/InPlaceUpdateStream.java | 14 +
ql/pom.xml | 5 -
.../hadoop/hive/ql/exec/InPlaceUpdates.java | 89 --
.../hive/ql/exec/SerializationUtilities.java | 1 -
.../ql/exec/spark/status/SparkJobMonitor.java | 6 +-
.../hive/ql/exec/tez/TezJobExecHelper.java | 5 +-
.../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 1016 -----------------
.../hive/ql/exec/tez/TezSessionState.java | 8 +-
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 6 +-
.../hive/ql/exec/tez/monitoring/Constants.java | 7 +
.../hive/ql/exec/tez/monitoring/DAGSummary.java | 197 ++++
.../exec/tez/monitoring/FSCountersSummary.java | 92 ++
.../ql/exec/tez/monitoring/LLAPioSummary.java | 108 ++
.../ql/exec/tez/monitoring/PrintSummary.java | 7 +
.../QueryExecutionBreakdownSummary.java | 75 ++
.../ql/exec/tez/monitoring/TezJobMonitor.java | 397 +++++++
.../exec/tez/monitoring/TezProgressMonitor.java | 313 ++++++
.../apache/hadoop/hive/ql/metadata/Hive.java | 11 +-
.../hadoop/hive/ql/session/SessionState.java | 12 +
.../tez/monitoring/TestTezProgressMonitor.java | 101 ++
service-rpc/if/TCLIService.thrift | 26 +-
.../gen/thrift/gen-cpp/TCLIService_types.cpp | 322 ++++++
.../src/gen/thrift/gen-cpp/TCLIService_types.h | 102 +-
.../rpc/thrift/TGetOperationStatusReq.java | 109 +-
.../rpc/thrift/TGetOperationStatusResp.java | 116 +-
.../service/rpc/thrift/TJobExecutionStatus.java | 48 +
.../service/rpc/thrift/TProgressUpdateResp.java | 1033 ++++++++++++++++++
service-rpc/src/gen/thrift/gen-php/Types.php | 327 ++++++
.../src/gen/thrift/gen-py/TCLIService/ttypes.py | 214 +++-
.../gen/thrift/gen-rb/t_c_l_i_service_types.rb | 51 +-
.../org/apache/hive/service/cli/CLIService.java | 63 +-
.../service/cli/EmbeddedCLIServiceClient.java | 4 +-
.../apache/hive/service/cli/ICLIService.java | 2 +-
.../hive/service/cli/JobProgressUpdate.java | 38 +
.../hive/service/cli/OperationStatus.java | 8 +
.../cli/ProgressMonitorStatusMapper.java | 19 +
.../cli/TezProgressMonitorStatusMapper.java | 32 +
.../thrift/RetryingThriftCLIServiceClient.java | 5 +-
.../service/cli/thrift/ThriftCLIService.java | 28 +-
.../cli/thrift/ThriftCLIServiceClient.java | 3 +-
.../apache/hive/service/cli/CLIServiceTest.java | 18 +-
.../cli/TestRetryingThriftCLIServiceClient.java | 2 +-
.../cli/thrift/ThriftCLIServiceTest.java | 8 +-
.../thrift/ThriftCliServiceTestWithCookie.java | 2 +-
53 files changed, 4268 insertions(+), 1214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/beeline/src/java/org/apache/hive/beeline/Commands.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java
index 748546d..99db643 100644
--- a/beeline/src/java/org/apache/hive/beeline/Commands.java
+++ b/beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.conf.HiveVariableSource;
import org.apache.hadoop.hive.conf.SystemVariables;
import org.apache.hadoop.hive.conf.VariableSubstitution;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hive.beeline.logs.BeelineInPlaceUpdateStream;
import org.apache.hive.jdbc.HiveStatement;
import org.apache.hive.jdbc.Utils;
import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
@@ -982,6 +983,11 @@ public class Commands {
logThread = new Thread(createLogRunnable(stmnt));
logThread.setDaemon(true);
logThread.start();
+ if (stmnt instanceof HiveStatement) {
+ ((HiveStatement) stmnt).setInPlaceUpdateStream(
+ new BeelineInPlaceUpdateStream(beeLine.getOutputStream())
+ );
+ }
hasResults = stmnt.execute(sql);
logThread.interrupt();
logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
@@ -1242,43 +1248,65 @@ public class Commands {
command.setLength(0);
}
- private Runnable createLogRunnable(Statement statement) {
+ private Runnable createLogRunnable(final Statement statement) {
if (statement instanceof HiveStatement) {
- final HiveStatement hiveStatement = (HiveStatement) statement;
-
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- while (hiveStatement.hasMoreLogs()) {
- try {
- // fetch the log periodically and output to beeline console
- for (String log : hiveStatement.getQueryLog()) {
- beeLine.info(log);
- }
- Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL);
- } catch (SQLException e) {
- beeLine.error(new SQLWarning(e));
- return;
- } catch (InterruptedException e) {
- beeLine.debug("Getting log thread is interrupted, since query is done!");
- showRemainingLogsIfAny(hiveStatement);
- return;
- }
- }
- }
- };
- return runnable;
+ return new LogRunnable(this, (HiveStatement) statement,
+ DEFAULT_QUERY_PROGRESS_INTERVAL);
} else {
- beeLine.debug("The statement instance is not HiveStatement type: " + statement.getClass());
+ beeLine.debug(
+ "The statement instance is not HiveStatement type: " + statement
+ .getClass());
return new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
// do nothing.
}
};
}
}
+ private void error(Throwable throwable) {
+ beeLine.error(throwable);
+ }
+
+ private void debug(String message) {
+ beeLine.debug(message);
+ }
+
+
+
+ static class LogRunnable implements Runnable {
+ private final Commands commands;
+ private final HiveStatement hiveStatement;
+ private final long queryProgressInterval;
+
+ LogRunnable(Commands commands, HiveStatement hiveStatement,
+ long queryProgressInterval) {
+ this.hiveStatement = hiveStatement;
+ this.commands = commands;
+ this.queryProgressInterval = queryProgressInterval;
+ }
+
+ private void updateQueryLog() throws SQLException {
+ for (String log : hiveStatement.getQueryLog()) {
+ commands.beeLine.info(log);
+ }
+ }
+
+ @Override public void run() {
+ while (hiveStatement.hasMoreLogs()) {
+ try {
+ updateQueryLog();
+ Thread.sleep(queryProgressInterval);
+ } catch (SQLException e) {
+ commands.error(new SQLWarning(e));
+ } catch (InterruptedException e) {
+ commands.debug("Getting log thread is interrupted, since query is done!");
+ commands.showRemainingLogsIfAny(hiveStatement);
+ }
+ }
+ }
+ }
+
private void showRemainingLogsIfAny(Statement statement) {
if (statement instanceof HiveStatement) {
HiveStatement hiveStatement = (HiveStatement) statement;
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java
new file mode 100644
index 0000000..2ed289c
--- /dev/null
+++ b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java
@@ -0,0 +1,66 @@
+package org.apache.hive.beeline.logs;
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hive.jdbc.logs.InPlaceUpdateStream;
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
+
+import java.io.PrintStream;
+import java.util.List;
+
+public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream {
+ private InPlaceUpdate inPlaceUpdate;
+
+ public BeelineInPlaceUpdateStream(PrintStream out) {
+ this.inPlaceUpdate = new InPlaceUpdate(out);
+ }
+
+ @Override
+ public void update(TProgressUpdateResp response) {
+ if (response == null || response.getStatus().equals(TJobExecutionStatus.NOT_AVAILABLE))
+ return;
+
+ inPlaceUpdate.render(new ProgressMonitorWrapper(response));
+ }
+
+ static class ProgressMonitorWrapper implements ProgressMonitor {
+ private TProgressUpdateResp response;
+
+ ProgressMonitorWrapper(TProgressUpdateResp response) {
+ this.response = response;
+ }
+
+ @Override
+ public List<String> headers() {
+ return response.getHeaderNames();
+ }
+
+ @Override
+ public List<List<String>> rows() {
+ return response.getRows();
+ }
+
+ @Override
+ public String footerSummary() {
+ return response.getFooterSummary();
+ }
+
+ @Override
+ public long startTime() {
+ return response.getStartTime();
+ }
+
+ @Override
+ public String executionStatus() {
+ throw new UnsupportedOperationException(
+ "This should never be used for anything. All the required data is available via other methods"
+ );
+ }
+
+ @Override
+ public double progressedPercentage() {
+ return response.getProgressedPercentage();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index fd948f8..8474a87 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -64,6 +64,11 @@
<artifactId>orc-core</artifactId>
</dependency>
<dependency>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ <version>${jline.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<version>${jetty.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
new file mode 100644
index 0000000..bfdb4fa
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
@@ -0,0 +1,202 @@
+package org.apache.hadoop.hive.common.log;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import jline.TerminalFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.fusesource.jansi.Ansi;
+
+import javax.annotation.Nullable;
+import java.io.PrintStream;
+import java.io.StringWriter;
+import java.text.DecimalFormat;
+import java.util.List;
+
+import static org.fusesource.jansi.Ansi.ansi;
+import static org.fusesource.jansi.internal.CLibrary.*;
+
+/**
+ * Renders information from ProgressMonitor to the stream provided.
+ */
+public class InPlaceUpdate {
+
+ public static final int MIN_TERMINAL_WIDTH = 94;
+
+ // keep this within 80 chars width. If more columns needs to be added then update min terminal
+ // width requirement and SEPARATOR width accordingly
+ private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s %6s ";
+ private static final String VERTEX_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s %6s ";
+ private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s";
+
+ private static final int PROGRESS_BAR_CHARS = 30;
+ private static final String SEPARATOR = new String(new char[MIN_TERMINAL_WIDTH]).replace("\0", "-");
+
+ /* Pretty print the values */
+ private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00");
+ private int lines = 0;
+ private PrintStream out;
+
+ public InPlaceUpdate(PrintStream out) {
+ this.out = out;
+ }
+
+ public InPlaceUpdate() {
+ this(System.out);
+ }
+
+ public static void reprintLine(PrintStream out, String line) {
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+ out.flush();
+ }
+
+ public static void rePositionCursor(PrintStream ps) {
+ ps.print(ansi().cursorUp(0).toString());
+ ps.flush();
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given line.
+ *
+ * @param line - line to print
+ */
+ private void reprintLine(String line) {
+ reprintLine(out, line);
+ lines++;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given line with the specified color.
+ *
+ * @param line - line to print
+ * @param color - color for the line
+ */
+ private void reprintLineWithColorAsBold(String line, Ansi.Color color) {
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
+ .toString());
+ out.flush();
+ lines++;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given multiline. Make sure the specified line is not
+ * terminated by linebreak.
+ *
+ * @param line - line to print
+ */
+ private void reprintMultiLine(String line) {
+ int numLines = line.split("\r\n|\r|\n").length;
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+ out.flush();
+ lines += numLines;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Repositions the cursor back to line 0.
+ */
+ private void repositionCursor() {
+ if (lines > 0) {
+ out.print(ansi().cursorUp(lines).toString());
+ out.flush();
+ lines = 0;
+ }
+ }
+
+
+ // [==================>>-----]
+ private String getInPlaceProgressBar(double percent) {
+ StringWriter bar = new StringWriter();
+ bar.append("[");
+ int remainingChars = PROGRESS_BAR_CHARS - 4;
+ int completed = (int) (remainingChars * percent);
+ int pending = remainingChars - completed;
+ for (int i = 0; i < completed; i++) {
+ bar.append("=");
+ }
+ bar.append(">>");
+ for (int i = 0; i < pending; i++) {
+ bar.append("-");
+ }
+ bar.append("]");
+ return bar.toString();
+ }
+
+ public void render(ProgressMonitor monitor) {
+ if (monitor == null) return;
+ // position the cursor to line 0
+ repositionCursor();
+
+ // print header
+ // -------------------------------------------------------------------------------
+ // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
+ // -------------------------------------------------------------------------------
+ reprintLine(SEPARATOR);
+ reprintLineWithColorAsBold(String.format(HEADER_FORMAT, monitor.headers().toArray()),
+ Ansi.Color.CYAN);
+ reprintLine(SEPARATOR);
+
+
+ // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0
+ List<String> printReady = Lists.transform(monitor.rows(), new Function<List<String>, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable List<String> row) {
+ return String.format(VERTEX_FORMAT, row.toArray());
+ }
+ });
+ reprintMultiLine(StringUtils.join(printReady, "\n"));
+
+ // -------------------------------------------------------------------------------
+ // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s
+ // -------------------------------------------------------------------------------
+ String progressStr = "" + (int) (monitor.progressedPercentage() * 100) + "%";
+ float et = (float) (System.currentTimeMillis() - monitor.startTime()) / (float) 1000;
+ String elapsedTime = "ELAPSED TIME: " + secondsFormatter.format(et) + " s";
+ String footer = String.format(
+ FOOTER_FORMAT,
+ monitor.footerSummary(),
+ getInPlaceProgressBar(monitor.progressedPercentage()),
+ progressStr,
+ elapsedTime);
+
+ reprintLineWithColorAsBold(footer, Ansi.Color.RED);
+ reprintLine(SEPARATOR);
+ }
+
+
+ public static boolean canRenderInPlace(HiveConf conf) {
+ boolean inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
+
+ // we need at least 80 chars wide terminal to display in-place updates properly
+ return inPlaceUpdates && isUnixTerminal() && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH;
+ }
+
+ private static boolean isUnixTerminal() {
+
+ String os = System.getProperty("os.name");
+ if (os.startsWith("Windows")) {
+ // we do not support Windows, we will revisit this if we really need it for windows.
+ return false;
+ }
+
+ // We must be on some unix variant..
+ // check if standard out is a terminal
+ try {
+ // isatty system call will return 1 if the file descriptor is terminal else 0
+ if (isatty(STDOUT_FILENO) == 0) {
+ return false;
+ }
+ if (isatty(STDERR_FILENO) == 0) {
+ return false;
+ }
+ } catch (NoClassDefFoundError | UnsatisfiedLinkError ignore) {
+ // These errors happen if the JNI lib is not available for your platform.
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
new file mode 100644
index 0000000..ee02ccb
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
@@ -0,0 +1,51 @@
+package org.apache.hadoop.hive.common.log;
+
+import java.util.Collections;
+import java.util.List;
+
+public interface ProgressMonitor {
+
+ ProgressMonitor NULL = new ProgressMonitor() {
+ @Override
+ public List<String> headers() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<List<String>> rows() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String footerSummary() {
+ return "";
+ }
+
+ @Override
+ public long startTime() {
+ return 0;
+ }
+
+ @Override
+ public String executionStatus() {
+ return "";
+ }
+
+ @Override
+ public double progressedPercentage() {
+ return 0;
+ }
+ };
+
+ List<String> headers();
+
+ List<List<String>> rows();
+
+ String footerSummary();
+
+ long startTime();
+
+ String executionStatus();
+
+ double progressedPercentage();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cb27cd6..f3b01b2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2841,7 +2841,12 @@ public class HiveConf extends Configuration {
TEZ_EXEC_INPLACE_PROGRESS(
"hive.tez.exec.inplace.progress",
true,
- "Updates tez job execution progress in-place in the terminal."),
+ "Updates tez job execution progress in-place in the terminal when hive-cli is used."),
+ HIVE_SERVER2_INPLACE_PROGRESS(
+ "hive.server2.in.place.progress",
+ true,
+ "Allows hive server 2 to send progress bar update information. This is currently available"
+ + " only if the execution engine is tez."),
SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true,
"Updates spark job execution progress in-place in the terminal."),
TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f,
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
index b8462c6..830ffc2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
@@ -97,7 +97,7 @@ public class TestOperationLoggingAPIWithMr extends OperationLoggingAPITestBase {
if (System.currentTimeMillis() > pollTimeout) {
break;
}
- opStatus = client.getOperationStatus(operationHandle);
+ opStatus = client.getOperationStatus(operationHandle, false);
Assert.assertNotNull(opStatus);
state = opStatus.getState();
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
index 8b5b516..e98406d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
@@ -50,7 +50,7 @@ public class TestOperationLoggingAPIWithTez extends OperationLoggingAPITestBase
"<PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>",
"<PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>",
"<PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>",
- "from=org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor",
+ "from=org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor",
"org.apache.tez.common.counters.DAGCounter",
"NUM_SUCCEEDED_TASKS",
"TOTAL_LAUNCHED_TASKS",
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index a242501..56860c4 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -19,6 +19,7 @@
package org.apache.hive.jdbc;
import org.apache.commons.codec.binary.Base64;
+import org.apache.hive.jdbc.logs.InPlaceUpdateStream;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.rpc.thrift.TCLIService;
@@ -114,6 +115,8 @@ public class HiveStatement implements java.sql.Statement {
private int queryTimeout = 0;
+ private InPlaceUpdateStream inPlaceUpdateStream = InPlaceUpdateStream.NO_OP;
+
public HiveStatement(HiveConnection connection, TCLIService.Iface client,
TSessionHandle sessHandle) {
this(connection, client, sessHandle, false, DEFAULT_FETCH_SIZE);
@@ -342,6 +345,7 @@ public class HiveStatement implements java.sql.Statement {
TGetOperationStatusResp waitForOperationToComplete() throws SQLException {
TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
+ statusReq.setGetProgressUpdate(inPlaceUpdateStream != InPlaceUpdateStream.NO_OP);
TGetOperationStatusResp statusResp = null;
// Poll on the operation status, till the operation is complete
@@ -352,6 +356,7 @@ public class HiveStatement implements java.sql.Statement {
* essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
*/
statusResp = client.GetOperationStatus(statusReq);
+ inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse());
Utils.verifySuccessWithInfo(statusResp.getStatus());
if (statusResp.isSetOperationState()) {
switch (statusResp.getOperationState()) {
@@ -951,4 +956,12 @@ public class HiveStatement implements java.sql.Statement {
}
return null;
}
+
+ /**
+ * This is only used by the beeline client to set the stream on which in place progress updates
+ * are to be shown
+ */
+ public void setInPlaceUpdateStream(InPlaceUpdateStream stream) {
+ this.inPlaceUpdateStream = stream;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java
new file mode 100644
index 0000000..3a682b2
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java
@@ -0,0 +1,14 @@
+package org.apache.hive.jdbc.logs;
+
+import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
+
+public interface InPlaceUpdateStream {
+ void update(TProgressUpdateResp response);
+
+ InPlaceUpdateStream NO_OP = new InPlaceUpdateStream() {
+ @Override
+ public void update(TProgressUpdateResp response) {
+
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index 84e83ee..1e6ba9a 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -463,11 +463,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>jline</groupId>
- <artifactId>jline</artifactId>
- <version>${jline.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
<version>${tez.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
deleted file mode 100644
index f59d8e2..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec;
-
-import static org.fusesource.jansi.Ansi.ansi;
-import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.isatty;
-
-import java.io.PrintStream;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.fusesource.jansi.Ansi;
-
-import jline.TerminalFactory;
-
-public class InPlaceUpdates {
-
- public static final int MIN_TERMINAL_WIDTH = 94;
-
- static boolean isUnixTerminal() {
-
- String os = System.getProperty("os.name");
- if (os.startsWith("Windows")) {
- // we do not support Windows, we will revisit this if we really need it for windows.
- return false;
- }
-
- // We must be on some unix variant..
- // check if standard out is a terminal
- try {
- // isatty system call will return 1 if the file descriptor is terminal else 0
- if (isatty(STDOUT_FILENO) == 0) {
- return false;
- }
- if (isatty(STDERR_FILENO) == 0) {
- return false;
- }
- } catch (NoClassDefFoundError ignore) {
- // These errors happen if the JNI lib is not available for your platform.
- return false;
- } catch (UnsatisfiedLinkError ignore) {
- // These errors happen if the JNI lib is not available for your platform.
- return false;
- }
- return true;
- }
-
- public static boolean inPlaceEligible(HiveConf conf) {
- String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
- boolean inPlaceUpdates = false;
- if (engine.equals("tez")) {
- inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
- }
- if (engine.equals("spark")) {
- inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.SPARK_EXEC_INPLACE_PROGRESS);
- }
-
- // we need at least 80 chars wide terminal to display in-place updates properly
- return inPlaceUpdates && !SessionState.getConsole().getIsSilent() && isUnixTerminal()
- && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH;
- }
-
- public static void reprintLine(PrintStream out, String line) {
- out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
- out.flush();
- }
-
- public static void rePositionCursor(PrintStream ps) {
- ps.print(ansi().cursorUp(0).toString());
- ps.flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index 7be628e..247d589 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -38,7 +38,6 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor;
import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index d5b9b5d..cf0162d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hive.ql.exec.spark.status;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.fusesource.jansi.Ansi;
@@ -82,7 +82,7 @@ abstract class SparkJobMonitor {
protected SparkJobMonitor(HiveConf hiveConf) {
monitorTimeoutInterval = hiveConf.getTimeVar(
HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
- inPlaceUpdate = InPlaceUpdates.inPlaceEligible(hiveConf);
+ inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent();
console = SessionState.getConsole();
out = SessionState.LogHelper.getInfoStream();
}
@@ -270,7 +270,7 @@ abstract class SparkJobMonitor {
}
private void reprintLine(String line) {
- InPlaceUpdates.reprintLine(out, line);
+ InPlaceUpdate.reprintLine(out, line);
lines++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
index a3fc815..a544b93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
@@ -18,10 +18,11 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Method;
+
/**
* TezJobExecHelper is a utility to safely call Tez functionality from
* common code paths. It will check if tez is available/installed before
@@ -37,7 +38,7 @@ public class TezJobExecHelper {
// we have tez installed
ClassLoader classLoader = TezJobExecHelper.class.getClassLoader();
- Method method = classLoader.loadClass("org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor")
+ Method method = classLoader.loadClass("org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor")
.getMethod("killRunningJobs");
method.invoke(null, null);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
deleted file mode 100644
index bd935d4..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ /dev/null
@@ -1,1016 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.tez;
-
-import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
-import static org.fusesource.jansi.Ansi.ansi;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
-import org.apache.hadoop.hive.ql.exec.MapOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hive.common.util.ShutdownHookManager;
-import org.apache.tez.common.counters.FileSystemCounter;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.client.VertexStatus;
-import org.fusesource.jansi.Ansi;
-
-import com.google.common.base.Preconditions;
-
-/**
- * TezJobMonitor keeps track of a tez job while it's being executed. It will
- * print status to the console and retrieve final status of the job after
- * completion.
- */
-public class TezJobMonitor {
-
- private static final String CLASS_NAME = TezJobMonitor.class.getName();
-
- private static final int COLUMN_1_WIDTH = 16;
- private static final int SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH;
- private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH + 34;
- private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0", "-");
- private static final String FILE_HEADER_SEPARATOR =
- new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-");
- private static final String QUERY_EXEC_SUMMARY_HEADER = "Query Execution Summary";
- private static final String TASK_SUMMARY_HEADER = "Task Execution Summary";
- private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary";
- private static final String FS_COUNTERS_SUMMARY_HEADER = "FileSystem Counters Summary";
-
- // keep this within 80 chars width. If more columns needs to be added then update min terminal
- // width requirement and SEPARATOR width accordingly
- private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s %6s ";
- private static final String VERTEX_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s %6s ";
- private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s";
- private static final String HEADER = String.format(HEADER_FORMAT,
- "VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED");
-
- // method and dag summary format
- private static final String SUMMARY_HEADER_FORMAT = "%10s %14s %13s %12s %14s %15s";
- private static final String SUMMARY_HEADER = String.format(SUMMARY_HEADER_FORMAT,
- "VERTICES", "DURATION(ms)", "CPU_TIME(ms)", "GC_TIME(ms)", "INPUT_RECORDS", "OUTPUT_RECORDS");
-
- // used when I/O redirection is used
- private static final String FILE_HEADER_FORMAT = "%10s %12s %16s %13s %14s %13s %12s %14s %15s";
- private static final String FILE_HEADER = String.format(FILE_HEADER_FORMAT,
- "VERTICES", "TOTAL_TASKS", "FAILED_ATTEMPTS", "KILLED_TASKS", "DURATION(ms)",
- "CPU_TIME(ms)", "GC_TIME(ms)", "INPUT_RECORDS", "OUTPUT_RECORDS");
-
- // LLAP counters
- private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %9s %9s %10s %9s %10s %11s %8s %9s";
- private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT,
- "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS",
- "ALLOCATION", "USED", "TOTAL_IO");
-
- // FileSystem counters
- private static final String FS_COUNTERS_HEADER_FORMAT = "%10s %15s %13s %18s %18s %13s";
-
- // Methods summary
- private static final String OPERATION_SUMMARY = "%-35s %9s";
- private static final String OPERATION = "OPERATION";
- private static final String DURATION = "DURATION";
-
- // in-place progress update related variables
- private int lines;
- private final PrintStream out;
-
- private transient LogHelper console;
- private final PerfLogger perfLogger = SessionState.getPerfLogger();
- private final int checkInterval = 200;
- private final int maxRetryInterval = 2500;
- private final int printInterval = 3000;
- private final int progressBarChars = 30;
- private long lastPrintTime;
- private Set<String> completed;
-
- /* Pretty print the values */
- private final NumberFormat secondsFormat;
- private final NumberFormat commaFormat;
- private static final List<DAGClient> shutdownList;
- private final Map<String, BaseWork> workMap;
-
- private StringBuffer diagnostics;
-
- static {
- shutdownList = new LinkedList<DAGClient>();
- ShutdownHookManager.addShutdownHook(new Runnable() {
- @Override
- public void run() {
- TezJobMonitor.killRunningJobs();
- try {
- TezSessionPoolManager.getInstance().closeNonDefaultSessions(false);
- } catch (Exception e) {
- // ignore
- }
- }
- });
- }
-
- public static void initShutdownHook() {
- Preconditions.checkNotNull(shutdownList,
- "Shutdown hook was not properly initialized");
- }
-
- public TezJobMonitor(Map<String, BaseWork> workMap) {
- this.workMap = workMap;
- console = SessionState.getConsole();
- secondsFormat = new DecimalFormat("#0.00");
- commaFormat = NumberFormat.getNumberInstance(Locale.US);
- // all progress updates are written to info stream and log file. In-place updates can only be
- // done to info stream (console)
- out = console.getInfoStream();
- }
-
- /**
- * NOTE: Use this method only if isUnixTerminal is true.
- * Erases the current line and prints the given line.
- * @param line - line to print
- */
- public void reprintLine(String line) {
- InPlaceUpdates.reprintLine(out, line);
- lines++;
- }
-
- /**
- * NOTE: Use this method only if isUnixTerminal is true.
- * Erases the current line and prints the given line with the specified color.
- * @param line - line to print
- * @param color - color for the line
- */
- public void reprintLineWithColorAsBold(String line, Ansi.Color color) {
- out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
- .toString());
- out.flush();
- lines++;
- }
-
- /**
- * NOTE: Use this method only if isUnixTerminal is true.
- * Erases the current line and prints the given multiline. Make sure the specified line is not
- * terminated by linebreak.
- * @param line - line to print
- */
- public void reprintMultiLine(String line) {
- int numLines = line.split("\r\n|\r|\n").length;
- out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
- out.flush();
- lines += numLines;
- }
-
- /**
- * NOTE: Use this method only if isUnixTerminal is true.
- * Repositions the cursor back to line 0.
- */
- public void repositionCursor() {
- if (lines > 0) {
- out.print(ansi().cursorUp(lines).toString());
- out.flush();
- lines = 0;
- }
- }
-
- /**
- * monitorExecution handles status printing, failures during execution and final status retrieval.
- *
- * @param dagClient client that was used to kick off the job
- * @param conf configuration file for this operation
- * @return int 0 - success, 1 - killed, 2 - failed
- */
- public int monitorExecution(final DAGClient dagClient, HiveConf conf,
- DAG dag, Context ctx) throws InterruptedException {
- long monitorStartTime = System.currentTimeMillis();
- DAGStatus status = null;
- completed = new HashSet<String>();
- diagnostics = new StringBuffer();
-
- boolean running = false;
- boolean done = false;
- boolean success = false;
- int failedCounter = 0;
- int rc = 0;
- DAGStatus.State lastState = null;
- String lastReport = null;
- Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
- long startTime = 0;
- boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
- Utilities.isPerfOrAboveLogging(conf);
- boolean llapIoEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_IO_ENABLED, false);
-
- boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf);
- synchronized(shutdownList) {
- shutdownList.add(dagClient);
- }
- console.printInfo("\n");
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
- Map<String, Progress> progressMap = null;
- while (true) {
-
- try {
- if (ctx != null) {
- ctx.checkHeartbeaterLockException();
- }
-
- status = dagClient.getDAGStatus(opts, checkInterval);
- progressMap = status.getVertexProgress();
- DAGStatus.State state = status.getState();
-
- if (state != lastState || state == RUNNING) {
- lastState = state;
-
- switch (state) {
- case SUBMITTED:
- console.printInfo("Status: Submitted");
- break;
- case INITING:
- console.printInfo("Status: Initializing");
- startTime = System.currentTimeMillis();
- break;
- case RUNNING:
- if (!running) {
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
- console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n");
- startTime = System.currentTimeMillis();
- running = true;
- }
-
- if (inPlaceEligible) {
- printStatusInPlace(progressMap, startTime, false, dagClient);
- // log the progress report to log file as well
- lastReport = logStatus(progressMap, lastReport, console);
- } else {
- lastReport = printStatus(progressMap, lastReport, console);
- }
- break;
- case SUCCEEDED:
- if (!running) {
- startTime = monitorStartTime;
- }
- if (inPlaceEligible) {
- printStatusInPlace(progressMap, startTime, false, dagClient);
- // log the progress report to log file as well
- lastReport = logStatus(progressMap, lastReport, console);
- } else {
- lastReport = printStatus(progressMap, lastReport, console);
- }
- success = true;
- running = false;
- done = true;
- break;
- case KILLED:
- if (!running) {
- startTime = monitorStartTime;
- }
- if (inPlaceEligible) {
- printStatusInPlace(progressMap, startTime, true, dagClient);
- // log the progress report to log file as well
- lastReport = logStatus(progressMap, lastReport, console);
- }
- console.printInfo("Status: Killed");
- running = false;
- done = true;
- rc = 1;
- break;
- case FAILED:
- case ERROR:
- if (!running) {
- startTime = monitorStartTime;
- }
- if (inPlaceEligible) {
- printStatusInPlace(progressMap, startTime, true, dagClient);
- // log the progress report to log file as well
- lastReport = logStatus(progressMap, lastReport, console);
- }
- console.printError("Status: Failed");
- running = false;
- done = true;
- rc = 2;
- break;
- }
- }
- } catch (Exception e) {
- console.printInfo("Exception: " + e.getMessage());
- boolean isInterrupted = hasInterruptedException(e);
- if (isInterrupted || (++failedCounter % maxRetryInterval / checkInterval == 0)) {
- try {
- console.printInfo("Killing DAG...");
- dagClient.tryKillDAG();
- } catch (IOException io) {
- // best effort
- } catch (TezException te) {
- // best effort
- }
- e.printStackTrace();
- console.printError("Execution has failed.");
- rc = 1;
- done = true;
- } else {
- console.printInfo("Retrying...");
- }
- } finally {
- if (done) {
- if (rc != 0 && status != null) {
- for (String diag : status.getDiagnostics()) {
- console.printError(diag);
- diagnostics.append(diag);
- }
- }
- synchronized(shutdownList) {
- shutdownList.remove(dagClient);
- }
- break;
- }
- }
- }
-
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
-
- if (isProfileEnabled && success && progressMap != null) {
-
- double duration = (System.currentTimeMillis() - startTime) / 1000.0;
- console.printInfo("Status: DAG finished successfully in "
- + String.format("%.2f seconds", duration));
- console.printInfo("");
-
- console.printInfo(QUERY_EXEC_SUMMARY_HEADER);
- printQueryExecutionBreakDown();
- console.printInfo(SEPARATOR);
- console.printInfo("");
-
- console.printInfo(TASK_SUMMARY_HEADER);
- printDagSummary(progressMap, console, dagClient, conf, dag, inPlaceEligible);
- if (inPlaceEligible) {
- console.printInfo(SEPARATOR);
- } else {
- console.printInfo(FILE_HEADER_SEPARATOR);
- }
-
- if (llapIoEnabled) {
- console.printInfo("");
- console.printInfo(LLAP_IO_SUMMARY_HEADER);
- printLlapIOSummary(progressMap, console, dagClient);
- console.printInfo(SEPARATOR);
- console.printInfo("");
-
- console.printInfo(FS_COUNTERS_SUMMARY_HEADER);
- printFSCountersSummary(progressMap, console, dagClient);
- }
-
- console.printInfo("");
- }
-
- return rc;
- }
-
- private static boolean hasInterruptedException(Throwable e) {
- // Hadoop IPC wraps InterruptedException. GRRR.
- while (e != null) {
- if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
- return true;
- }
- e = e.getCause();
- }
- return false;
- }
-
- /**
- * killRunningJobs tries to terminate execution of all
- * currently running tez queries. No guarantees, best effort only.
- */
- public static void killRunningJobs() {
- synchronized (shutdownList) {
- for (DAGClient c : shutdownList) {
- try {
- System.err.println("Trying to shutdown DAG");
- c.tryKillDAG();
- } catch (Exception e) {
- // ignore
- }
- }
- }
- }
-
- private static long getCounterValueByGroupName(TezCounters vertexCounters,
- String groupNamePattern,
- String counterName) {
- TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName);
- return (tezCounter == null) ? 0 : tezCounter.getValue();
- }
-
- private void printQueryExecutionBreakDown() {
-
- /* Build the method summary header */
- String execBreakdownHeader = String.format(OPERATION_SUMMARY, OPERATION, DURATION);
- console.printInfo(SEPARATOR);
- reprintLineWithColorAsBold(execBreakdownHeader, Ansi.Color.CYAN);
- console.printInfo(SEPARATOR);
-
- // parse, analyze, optimize and compile
- long compile = perfLogger.getEndTime(PerfLogger.COMPILE) -
- perfLogger.getStartTime(PerfLogger.COMPILE);
- console.printInfo(String.format(OPERATION_SUMMARY, "Compile Query",
- secondsFormat.format(compile / 1000.0) + "s"));
-
- // prepare plan for submission (building DAG, adding resources, creating scratch dirs etc.)
- long totalDAGPrep = perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG) -
- perfLogger.getEndTime(PerfLogger.COMPILE);
- console.printInfo(String.format(OPERATION_SUMMARY, "Prepare Plan",
- secondsFormat.format(totalDAGPrep / 1000.0) + "s"));
-
- // submit to accept dag (if session is closed, this will include re-opening of session time,
- // localizing files for AM, submitting DAG)
- long submitToAccept = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) -
- perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG);
- console.printInfo(String.format(OPERATION_SUMMARY, "Submit Plan",
- secondsFormat.format(submitToAccept / 1000.0) + "s"));
-
- // accept to start dag (schedule wait time, resource wait time etc.)
- long acceptToStart = perfLogger.getDuration(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
- console.printInfo(String.format(OPERATION_SUMMARY, "Start DAG",
- secondsFormat.format(acceptToStart / 1000.0) + "s"));
-
- // time to actually run the dag (actual dag runtime)
- final long startToEnd;
- if (acceptToStart == 0) {
- startToEnd = perfLogger.getDuration(PerfLogger.TEZ_RUN_DAG);
- } else {
- startToEnd = perfLogger.getEndTime(PerfLogger.TEZ_RUN_DAG) -
- perfLogger.getEndTime(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
- }
- console.printInfo(String.format(OPERATION_SUMMARY, "Run DAG",
- secondsFormat.format(startToEnd / 1000.0) + "s"));
-
- }
-
- private void printDagSummary(Map<String, Progress> progressMap, LogHelper console,
- DAGClient dagClient, HiveConf conf, DAG dag, final boolean inPlaceEligible) {
-
- /* Strings for headers and counters */
- String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
- Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- TezCounters hiveCounters = null;
- try {
- hiveCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
- } catch (IOException e) {
- // best attempt, shouldn't really kill DAG for this
- } catch (TezException e) {
- // best attempt, shouldn't really kill DAG for this
- }
-
- /* If the counters are missing there is no point trying to print progress */
- if (hiveCounters == null) {
- return;
- }
-
- /* Print the per Vertex summary */
- if (inPlaceEligible) {
- console.printInfo(SEPARATOR);
- reprintLineWithColorAsBold(SUMMARY_HEADER, Ansi.Color.CYAN);
- console.printInfo(SEPARATOR);
- } else {
- console.printInfo(FILE_HEADER_SEPARATOR);
- reprintLineWithColorAsBold(FILE_HEADER, Ansi.Color.CYAN);
- console.printInfo(FILE_HEADER_SEPARATOR);
- }
- SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
- Set<StatusGetOpts> statusOptions = new HashSet<StatusGetOpts>(1);
- statusOptions.add(StatusGetOpts.GET_COUNTERS);
- for (String vertexName : keys) {
- Progress progress = progressMap.get(vertexName);
- if (progress != null) {
- final int totalTasks = progress.getTotalTaskCount();
- final int failedTaskAttempts = progress.getFailedTaskAttemptCount();
- final int killedTaskAttempts = progress.getKilledTaskAttemptCount();
- final double duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName);
- VertexStatus vertexStatus = null;
- try {
- vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions);
- } catch (IOException e) {
- // best attempt, shouldn't really kill DAG for this
- } catch (TezException e) {
- // best attempt, shouldn't really kill DAG for this
- }
-
- if (vertexStatus == null) {
- continue;
- }
-
- Vertex currentVertex = dag.getVertex(vertexName);
- List<Vertex> inputVerticesList = currentVertex.getInputVertices();
- long hiveInputRecordsFromOtherVertices = 0;
- if (inputVerticesList.size() > 0) {
-
- for (Vertex inputVertex : inputVerticesList) {
- String inputVertexName = inputVertex.getName();
- hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters,
- hiveCountersGroup, String.format("%s_",
- ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) +
- inputVertexName.replace(" ", "_"));
-
- hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters,
- hiveCountersGroup, String.format("%s_",
- FileSinkOperator.Counter.RECORDS_OUT.toString()) +
- inputVertexName.replace(" ", "_"));
- }
- }
-
- /*
- * Get the CPU & GC
- *
- * counters org.apache.tez.common.counters.TaskCounter
- * GC_TIME_MILLIS=37712
- * CPU_MILLISECONDS=2774230
- */
- final TezCounters vertexCounters = vertexStatus.getVertexCounters();
- final double cpuTimeMillis = getCounterValueByGroupName(vertexCounters,
- TaskCounter.class.getName(),
- TaskCounter.CPU_MILLISECONDS.name());
-
- final double gcTimeMillis = getCounterValueByGroupName(vertexCounters,
- TaskCounter.class.getName(),
- TaskCounter.GC_TIME_MILLIS.name());
-
- /*
- * Get the HIVE counters
- *
- * HIVE
- * CREATED_FILES=1
- * DESERIALIZE_ERRORS=0
- * RECORDS_IN_Map_1=550076554
- * RECORDS_OUT_INTERMEDIATE_Map_1=854987
- * RECORDS_OUT_Reducer_2=1
- */
-
- final long hiveInputRecords =
- getCounterValueByGroupName(
- hiveCounters,
- hiveCountersGroup,
- String.format("%s_", MapOperator.Counter.RECORDS_IN.toString())
- + vertexName.replace(" ", "_"))
- + hiveInputRecordsFromOtherVertices;
- final long hiveOutputIntermediateRecords =
- getCounterValueByGroupName(
- hiveCounters,
- hiveCountersGroup,
- String.format("%s_", ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString())
- + vertexName.replace(" ", "_"));
- final long hiveOutputRecords =
- getCounterValueByGroupName(
- hiveCounters,
- hiveCountersGroup,
- String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString())
- + vertexName.replace(" ", "_"))
- + hiveOutputIntermediateRecords;
-
- final String vertexExecutionStats;
- if (inPlaceEligible) {
- vertexExecutionStats = String.format(SUMMARY_HEADER_FORMAT,
- vertexName,
- secondsFormat.format((duration)),
- commaFormat.format(cpuTimeMillis),
- commaFormat.format(gcTimeMillis),
- commaFormat.format(hiveInputRecords),
- commaFormat.format(hiveOutputRecords));
- } else {
- vertexExecutionStats = String.format(FILE_HEADER_FORMAT,
- vertexName,
- totalTasks,
- failedTaskAttempts,
- killedTaskAttempts,
- secondsFormat.format((duration)),
- commaFormat.format(cpuTimeMillis),
- commaFormat.format(gcTimeMillis),
- commaFormat.format(hiveInputRecords),
- commaFormat.format(hiveOutputRecords));
- }
- console.printInfo(vertexExecutionStats);
- }
- }
- }
-
- private void printLlapIOSummary(Map<String, Progress> progressMap, LogHelper console,
- DAGClient dagClient) {
- SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
- Set<StatusGetOpts> statusOptions = new HashSet<>(1);
- statusOptions.add(StatusGetOpts.GET_COUNTERS);
- boolean first = false;
- String counterGroup = LlapIOCounters.class.getName();
- for (String vertexName : keys) {
- // Reducers do not benefit from LLAP IO so no point in printing
- if (vertexName.startsWith("Reducer")) {
- continue;
- }
- TezCounters vertexCounters = null;
- try {
- vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions)
- .getVertexCounters();
- } catch (IOException e) {
- // best attempt, shouldn't really kill DAG for this
- } catch (TezException e) {
- // best attempt, shouldn't really kill DAG for this
- }
- if (vertexCounters != null) {
- final long selectedRowgroups = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.SELECTED_ROWGROUPS.name());
- final long metadataCacheHit = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.METADATA_CACHE_HIT.name());
- final long metadataCacheMiss = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.METADATA_CACHE_MISS.name());
- final long cacheHitBytes = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.CACHE_HIT_BYTES.name());
- final long cacheMissBytes = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.CACHE_MISS_BYTES.name());
- final long allocatedBytes = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.ALLOCATED_BYTES.name());
- final long allocatedUsedBytes = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.ALLOCATED_USED_BYTES.name());
- final long totalIoTime = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.TOTAL_IO_TIME_NS.name());
-
- if (!first) {
- console.printInfo(SEPARATOR);
- reprintLineWithColorAsBold(LLAP_SUMMARY_HEADER, Ansi.Color.CYAN);
- console.printInfo(SEPARATOR);
- first = true;
- }
-
- String queryFragmentStats = String.format(LLAP_SUMMARY_HEADER_FORMAT,
- vertexName,
- selectedRowgroups,
- metadataCacheHit,
- metadataCacheMiss,
- Utilities.humanReadableByteCount(cacheHitBytes),
- Utilities.humanReadableByteCount(cacheMissBytes),
- Utilities.humanReadableByteCount(allocatedBytes),
- Utilities.humanReadableByteCount(allocatedUsedBytes),
- secondsFormat.format(totalIoTime / 1000_000_000.0) + "s");
- console.printInfo(queryFragmentStats);
- }
- }
- }
-
- private void printFSCountersSummary(Map<String, Progress> progressMap, LogHelper console,
- DAGClient dagClient) {
- SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
- Set<StatusGetOpts> statusOptions = new HashSet<>(1);
- statusOptions.add(StatusGetOpts.GET_COUNTERS);
- // Assuming FileSystem.getAllStatistics() returns all schemes that are accessed on task side
- // as well. If not, we need a way to get all the schemes that are accessed by the tez task/llap.
- for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
- final String scheme = statistics.getScheme().toUpperCase();
- final String fsCountersHeader = String.format(FS_COUNTERS_HEADER_FORMAT,
- "VERTICES", "BYTES_READ", "READ_OPS", "LARGE_READ_OPS", "BYTES_WRITTEN", "WRITE_OPS");
-
- console.printInfo("");
- reprintLineWithColorAsBold("Scheme: " + scheme, Ansi.Color.RED);
- console.printInfo(SEPARATOR);
- reprintLineWithColorAsBold(fsCountersHeader, Ansi.Color.CYAN);
- console.printInfo(SEPARATOR);
-
- for (String vertexName : keys) {
- TezCounters vertexCounters = null;
- try {
- vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions)
- .getVertexCounters();
- } catch (IOException e) {
- // best attempt, shouldn't really kill DAG for this
- } catch (TezException e) {
- // best attempt, shouldn't really kill DAG for this
- }
- if (vertexCounters != null) {
- final String counterGroup = FileSystemCounter.class.getName();
- final long bytesRead = getCounterValueByGroupName(vertexCounters,
- counterGroup, scheme + "_" + FileSystemCounter.BYTES_READ.name());
- final long bytesWritten = getCounterValueByGroupName(vertexCounters,
- counterGroup, scheme + "_" + FileSystemCounter.BYTES_WRITTEN.name());
- final long readOps = getCounterValueByGroupName(vertexCounters,
- counterGroup, scheme + "_" + FileSystemCounter.READ_OPS.name());
- final long largeReadOps = getCounterValueByGroupName(vertexCounters,
- counterGroup, scheme + "_" + FileSystemCounter.LARGE_READ_OPS.name());
- final long writeOps = getCounterValueByGroupName(vertexCounters,
- counterGroup, scheme + "_" + FileSystemCounter.WRITE_OPS.name());
-
- String fsCountersSummary = String.format(FS_COUNTERS_HEADER_FORMAT,
- vertexName,
- Utilities.humanReadableByteCount(bytesRead),
- readOps,
- largeReadOps,
- Utilities.humanReadableByteCount(bytesWritten),
- writeOps);
- console.printInfo(fsCountersSummary);
- }
- }
-
- console.printInfo(SEPARATOR);
- }
- }
-
- private void printStatusInPlace(Map<String, Progress> progressMap, long startTime,
- boolean vextexStatusFromAM, DAGClient dagClient) {
- StringBuilder reportBuffer = new StringBuilder();
- int sumComplete = 0;
- int sumTotal = 0;
-
- // position the cursor to line 0
- repositionCursor();
-
- // print header
- // -------------------------------------------------------------------------------
- // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
- // -------------------------------------------------------------------------------
- reprintLine(SEPARATOR);
- reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN);
- reprintLine(SEPARATOR);
-
- SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
- int idx = 0;
- int maxKeys = keys.size();
- for (String s : keys) {
- idx++;
- Progress progress = progressMap.get(s);
- final int complete = progress.getSucceededTaskCount();
- final int total = progress.getTotalTaskCount();
- final int running = progress.getRunningTaskCount();
- final int failed = progress.getFailedTaskAttemptCount();
- final int pending = progress.getTotalTaskCount() - progress.getSucceededTaskCount() -
- progress.getRunningTaskCount();
- final int killed = progress.getKilledTaskAttemptCount();
-
- // To get vertex status we can use DAGClient.getVertexStatus(), but it will be expensive to
- // get status from AM for every refresh of the UI. Lets infer the state from task counts.
- // Only if DAG is FAILED or KILLED the vertex status is fetched from AM.
- VertexStatus.State vertexState = VertexStatus.State.INITIALIZING;
-
- // INITED state
- if (total > 0) {
- vertexState = VertexStatus.State.INITED;
- sumComplete += complete;
- sumTotal += total;
- }
-
- // RUNNING state
- if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
- vertexState = VertexStatus.State.RUNNING;
- if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
- }
-
- // SUCCEEDED state
- if (complete == total) {
- vertexState = VertexStatus.State.SUCCEEDED;
- if (!completed.contains(s)) {
- completed.add(s);
-
- /* We may have missed the start of the vertex
- * due to the 3 seconds interval
- */
- if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
-
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
- }
-
- // DAG might have been killed, lets try to get vertex state from AM before dying
- // KILLED or FAILED state
- if (vextexStatusFromAM) {
- VertexStatus vertexStatus = null;
- try {
- vertexStatus = dagClient.getVertexStatus(s, null);
- } catch (IOException e) {
- // best attempt, shouldn't really kill DAG for this
- } catch (TezException e) {
- // best attempt, shouldn't really kill DAG for this
- }
- if (vertexStatus != null) {
- vertexState = vertexStatus.getState();
- }
- }
-
- // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0
- String nameWithProgress = getNameWithProgress(s, complete, total);
- String mode = getMode(s, workMap);
- String vertexStr = String.format(VERTEX_FORMAT,
- nameWithProgress,
- mode,
- vertexState.toString(),
- total,
- complete,
- running,
- pending,
- failed,
- killed);
- reportBuffer.append(vertexStr);
- if (idx != maxKeys) {
- reportBuffer.append("\n");
- }
- }
-
- reprintMultiLine(reportBuffer.toString());
-
- // -------------------------------------------------------------------------------
- // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s
- // -------------------------------------------------------------------------------
- reprintLine(SEPARATOR);
- final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
- String footer = getFooter(keys.size(), completed.size(), progress, startTime);
- reprintLineWithColorAsBold(footer, Ansi.Color.RED);
- reprintLine(SEPARATOR);
- }
-
- private String getMode(String name, Map<String, BaseWork> workMap) {
- String mode = "container";
- BaseWork work = workMap.get(name);
- if (work != null) {
- // uber > llap > container
- if (work.getUberMode()) {
- mode = "uber";
- } else if (work.getLlapMode()) {
- mode = "llap";
- } else {
- mode = "container";
- }
- }
- return mode;
- }
-
- // Map 1 ..........
- private String getNameWithProgress(String s, int complete, int total) {
- String result = "";
- if (s != null) {
- float percent = total == 0 ? 0.0f : (float) complete / (float) total;
- // lets use the remaining space in column 1 as progress bar
- int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
- String trimmedVName = s;
-
- // if the vertex name is longer than column 1 width, trim it down
- // "Tez Merge File Work" will become "Tez Merge File.."
- if (s.length() > COLUMN_1_WIDTH) {
- trimmedVName = s.substring(0, COLUMN_1_WIDTH - 1);
- trimmedVName = trimmedVName + "..";
- }
-
- result = trimmedVName + " ";
- int toFill = (int) (spaceRemaining * percent);
- for (int i = 0; i < toFill; i++) {
- result += ".";
- }
- }
- return result;
- }
-
- // VERTICES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s
- private String getFooter(int keySize, int completedSize, float progress, long startTime) {
- String verticesSummary = String.format("VERTICES: %02d/%02d", completedSize, keySize);
- String progressBar = getInPlaceProgressBar(progress);
- final int progressPercent = (int) (progress * 100);
- String progressStr = "" + progressPercent + "%";
- float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
- String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s";
- String footer = String.format(FOOTER_FORMAT,
- verticesSummary, progressBar, progressStr, elapsedTime);
- return footer;
- }
-
- // [==================>>-----]
- private String getInPlaceProgressBar(float percent) {
- StringBuilder bar = new StringBuilder("[");
- int remainingChars = progressBarChars - 4;
- int completed = (int) (remainingChars * percent);
- int pending = remainingChars - completed;
- for (int i = 0; i < completed; i++) {
- bar.append("=");
- }
- bar.append(">>");
- for (int i = 0; i < pending; i++) {
- bar.append("-");
- }
- bar.append("]");
- return bar.toString();
- }
-
- private String printStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) {
- String report = getReport(progressMap);
- if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) {
- console.printInfo(report);
- lastPrintTime = System.currentTimeMillis();
- }
- return report;
- }
-
- private String logStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) {
- String report = getReport(progressMap);
- if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) {
- console.logInfo(report);
- lastPrintTime = System.currentTimeMillis();
- }
- return report;
- }
-
- private String getReport(Map<String, Progress> progressMap) {
- StringBuilder reportBuffer = new StringBuilder();
-
- SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
- for (String s: keys) {
- Progress progress = progressMap.get(s);
- final int complete = progress.getSucceededTaskCount();
- final int total = progress.getTotalTaskCount();
- final int running = progress.getRunningTaskCount();
- final int failed = progress.getFailedTaskAttemptCount();
- if (total <= 0) {
- reportBuffer.append(String.format("%s: -/-\t", s));
- } else {
- if (complete == total && !completed.contains(s)) {
- completed.add(s);
-
- /*
- * We may have missed the start of the vertex due to the 3 seconds interval
- */
- if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
-
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
- if(complete < total && (complete > 0 || running > 0 || failed > 0)) {
-
- if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
-
- /* vertex is started, but not complete */
- if (failed > 0) {
- reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
- } else {
- reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
- }
- } else {
- /* vertex is waiting for input/slots or complete */
- if (failed > 0) {
- /* tasks finished but some failed */
- reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
- } else {
- reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
- }
- }
- }
- }
-
- return reportBuffer.toString();
- }
-
- public String getDiagnostics() {
- return diagnostics.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index f1071fa..62f65c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -17,9 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-
import java.util.Collection;
-
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -40,9 +38,7 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-
import javax.security.auth.login.LoginException;
-
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
@@ -83,6 +79,7 @@ import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
/**
* Holds session state related to Tez
@@ -671,7 +668,7 @@ public class TezSessionState {
}
public List<LocalResource> getLocalizedResources() {
- return new ArrayList<LocalResource>(localizedResources);
+ return new ArrayList<>(localizedResources);
}
public String getUser() {
@@ -698,4 +695,5 @@ public class TezSessionState {
}
} while (!ownerThread.compareAndSet(null, newName));
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 7479b85..69cbe0b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -79,6 +79,7 @@ import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.json.JSONObject;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
/**
*
@@ -178,8 +179,9 @@ public class TezTask extends Task<TezWork> {
additionalLr, inputOutputJars, inputOutputLocalResources);
// finally monitor will print progress until the job is done
- TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
- rc = monitor.monitorExecution(dagClient, conf, dag, ctx);
+ TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap(),dagClient, conf, dag, ctx);
+ rc = monitor.monitorExecution();
+
if (rc != 0) {
this.setException(new HiveException(monitor.getDiagnostics()));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
new file mode 100644
index 0000000..eccbbb6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
@@ -0,0 +1,7 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+
+public interface Constants {
+ String SEPARATOR = new String(new char[InPlaceUpdate.MIN_TERMINAL_WIDTH]).replace("\0", "-");
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
new file mode 100644
index 0000000..5840ad6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
@@ -0,0 +1,197 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.*;
+
+
+class DAGSummary implements PrintSummary {
+
+ private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdate.MIN_TERMINAL_WIDTH + 34;
+ private static final String FILE_HEADER_SEPARATOR = new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-");
+
+ private static final String FORMATTING_PATTERN = "%10s %12s %16s %13s %14s %13s %12s %14s %15s";
+ private static final String FILE_HEADER = String.format(
+ FORMATTING_PATTERN,
+ "VERTICES",
+ "TOTAL_TASKS",
+ "FAILED_ATTEMPTS",
+ "KILLED_TASKS",
+ "DURATION(ms)",
+ "CPU_TIME(ms)",
+ "GC_TIME(ms)",
+ "INPUT_RECORDS",
+ "OUTPUT_RECORDS"
+ );
+
+ private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00");
+ private final NumberFormat commaFormatter = NumberFormat.getNumberInstance(Locale.US);
+
+ private final String hiveCountersGroup;
+ private final TezCounters hiveCounters;
+
+ private Map<String, Progress> progressMap;
+ private DAGClient dagClient;
+ private DAG dag;
+ private PerfLogger perfLogger;
+
+ DAGSummary(Map<String, Progress> progressMap, HiveConf hiveConf, DAGClient dagClient,
+ DAG dag, PerfLogger perfLogger) {
+ this.progressMap = progressMap;
+ this.dagClient = dagClient;
+ this.dag = dag;
+ this.perfLogger = perfLogger;
+ this.hiveCountersGroup = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+ this.hiveCounters = hiveCounters(dagClient);
+ }
+
+ private long hiveInputRecordsFromOtherVertices(String vertexName) {
+ List<Vertex> inputVerticesList = dag.getVertex(vertexName).getInputVertices();
+ long result = 0;
+ for (Vertex inputVertex : inputVerticesList) {
+ String intermediateRecordsCounterName = formattedName(
+ ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
+ inputVertex.getName()
+ );
+ String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
+ inputVertex.getName());
+ result += (
+ hiveCounterValue(intermediateRecordsCounterName)
+ + hiveCounterValue(recordsOutCounterName)
+ );
+ }
+ return result;
+ }
+
+ private String formattedName(String counterName, String vertexName) {
+ return String.format("%s_", counterName) + vertexName.replace(" ", "_");
+ }
+
+ private long getCounterValueByGroupName(TezCounters counters, String pattern, String counterName) {
+ TezCounter tezCounter = counters.getGroup(pattern).findCounter(counterName);
+ return (tezCounter == null) ? 0 : tezCounter.getValue();
+ }
+
+ private long hiveCounterValue(String counterName) {
+ return getCounterValueByGroupName(hiveCounters, hiveCountersGroup, counterName);
+ }
+
+ private TezCounters hiveCounters(DAGClient dagClient) {
+ try {
+ return dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS)).getDAGCounters();
+ } catch (IOException | TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ return null;
+ }
+
+ @Override
+ public void print(SessionState.LogHelper console) {
+ console.printInfo("Task Execution Summary");
+
+ /* If the counters are missing there is no point trying to print progress */
+ if (hiveCounters == null) {
+ return;
+ }
+
+ /* Print the per Vertex summary */
+ printHeader(console);
+ SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+ Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+ for (String vertexName : keys) {
+ Progress progress = progressMap.get(vertexName);
+ if (progress == null) continue;
+
+ VertexStatus vertexStatus = vertexStatus(statusOptions, vertexName);
+ if (vertexStatus == null) {
+ continue;
+ }
+ console.printInfo(vertexSummary(vertexName, progress, vertexStatus));
+ }
+ console.printInfo(FILE_HEADER_SEPARATOR);
+ }
+
+ private String vertexSummary(String vertexName, Progress progress, VertexStatus vertexStatus) {
+ /*
+ * Get the CPU & GC
+ *
+ * counters org.apache.tez.common.counters.TaskCounter
+ * GC_TIME_MILLIS=37712
+ * CPU_MILLISECONDS=2774230
+ */
+ final TezCounters vertexCounters = vertexStatus.getVertexCounters();
+ final double cpuTimeMillis = getCounterValueByGroupName(vertexCounters,
+ TaskCounter.class.getName(),
+ TaskCounter.CPU_MILLISECONDS.name());
+
+ final double gcTimeMillis = getCounterValueByGroupName(vertexCounters,
+ TaskCounter.class.getName(),
+ TaskCounter.GC_TIME_MILLIS.name());
+
+ /*
+ * Get the HIVE counters
+ *
+ * HIVE
+ * CREATED_FILES=1
+ * DESERIALIZE_ERRORS=0
+ * RECORDS_IN_Map_1=550076554
+ * RECORDS_OUT_INTERMEDIATE_Map_1=854987
+ * RECORDS_OUT_Reducer_2=1
+ */
+ final long hiveInputRecords =
+ hiveCounterValue(formattedName(MapOperator.Counter.RECORDS_IN.toString(), vertexName))
+ + hiveInputRecordsFromOtherVertices(vertexName);
+
+ final long hiveOutputRecords =
+ hiveCounterValue(formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(), vertexName)) +
+ hiveCounterValue(formattedName(ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(), vertexName));
+
+ final double duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName);
+
+ return String.format(FORMATTING_PATTERN,
+ vertexName,
+ progress.getTotalTaskCount(),
+ progress.getFailedTaskAttemptCount(),
+ progress.getKilledTaskAttemptCount(),
+ secondsFormatter.format((duration)),
+ commaFormatter.format(cpuTimeMillis),
+ commaFormatter.format(gcTimeMillis),
+ commaFormatter.format(hiveInputRecords),
+ commaFormatter.format(hiveOutputRecords));
+ }
+
+ private VertexStatus vertexStatus(Set<StatusGetOpts> statusOptions, String vertexName) {
+ try {
+ return dagClient.getVertexStatus(vertexName, statusOptions);
+ } catch (IOException | TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ return null;
+ }
+
+ private void printHeader(SessionState.LogHelper console) {
+ console.printInfo(FILE_HEADER_SEPARATOR);
+ console.printInfo(FILE_HEADER);
+ console.printInfo(FILE_HEADER_SEPARATOR);
+ }
+}
[3/4] hive git commit: HIVE-15473: Progress Bar on Beeline client
(Anishek Agarwal via Thejas Nair)
Posted by th...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
new file mode 100644
index 0000000..0a28edd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
@@ -0,0 +1,92 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor.getCounterValueByGroupName;
+
+public class FSCountersSummary implements PrintSummary {
+
+ private static final String FORMATTING_PATTERN = "%10s %15s %13s %18s %18s %13s";
+ private static final String HEADER = String.format(FORMATTING_PATTERN,
+ "VERTICES", "BYTES_READ", "READ_OPS", "LARGE_READ_OPS", "BYTES_WRITTEN", "WRITE_OPS");
+
+ private Map<String, Progress> progressMap;
+ private DAGClient dagClient;
+
+ FSCountersSummary(Map<String, Progress> progressMap, DAGClient dagClient) {
+ this.progressMap = progressMap;
+ this.dagClient = dagClient;
+ }
+
+ @Override
+ public void print(SessionState.LogHelper console) {
+ console.printInfo("FileSystem Counters Summary");
+
+ SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+ Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+ // Assuming FileSystem.getAllStatistics() returns all schemes that are accessed on task side
+ // as well. If not, we need a way to get all the schemes that are accessed by the tez task/llap.
+ for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
+ final String scheme = statistics.getScheme().toUpperCase();
+
+ console.printInfo("");
+ console.printInfo("Scheme: " + scheme);
+ console.printInfo(SEPARATOR);
+ console.printInfo(HEADER);
+ console.printInfo(SEPARATOR);
+
+ for (String vertexName : keys) {
+ TezCounters vertexCounters = vertexCounters(statusOptions, vertexName);
+ if (vertexCounters != null) {
+ console.printInfo(summary(scheme, vertexName, vertexCounters));
+ }
+ }
+
+ console.printInfo(SEPARATOR);
+ }
+ }
+
+ private String summary(String scheme, String vertexName, TezCounters vertexCounters) {
+ final String counterGroup = FileSystemCounter.class.getName();
+ final long bytesRead = getCounterValueByGroupName(vertexCounters,
+ counterGroup, scheme + "_" + FileSystemCounter.BYTES_READ.name());
+ final long bytesWritten = getCounterValueByGroupName(vertexCounters,
+ counterGroup, scheme + "_" + FileSystemCounter.BYTES_WRITTEN.name());
+ final long readOps = getCounterValueByGroupName(vertexCounters,
+ counterGroup, scheme + "_" + FileSystemCounter.READ_OPS.name());
+ final long largeReadOps = getCounterValueByGroupName(vertexCounters,
+ counterGroup, scheme + "_" + FileSystemCounter.LARGE_READ_OPS.name());
+ final long writeOps = getCounterValueByGroupName(vertexCounters,
+ counterGroup, scheme + "_" + FileSystemCounter.WRITE_OPS.name());
+
+ return String.format(FORMATTING_PATTERN,
+ vertexName,
+ Utilities.humanReadableByteCount(bytesRead),
+ readOps,
+ largeReadOps,
+ Utilities.humanReadableByteCount(bytesWritten),
+ writeOps);
+ }
+
+ private TezCounters vertexCounters(Set<StatusGetOpts> statusOptions, String vertexName) {
+ try {
+ return dagClient.getVertexStatus(vertexName, statusOptions).getVertexCounters();
+ } catch (IOException | TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
new file mode 100644
index 0000000..81f1755
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
@@ -0,0 +1,108 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.*;
+
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor.getCounterValueByGroupName;
+
+public class LLAPioSummary implements PrintSummary {
+
+ private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %9s %9s %10s %9s %10s %11s %8s %9s";
+ private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary";
+ private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT,
+ "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS",
+ "ALLOCATION", "USED", "TOTAL_IO");
+
+
+
+ private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00");
+ private Map<String, Progress> progressMap;
+ private DAGClient dagClient;
+ private boolean first = false;
+
+ LLAPioSummary(Map<String, Progress> progressMap, DAGClient dagClient) {
+ this.progressMap = progressMap;
+ this.dagClient = dagClient;
+ }
+
+ @Override
+ public void print(SessionState.LogHelper console) {
+ console.printInfo("");
+ console.printInfo(LLAP_IO_SUMMARY_HEADER);
+
+ SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+ Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+ String counterGroup = LlapIOCounters.class.getName();
+ for (String vertexName : keys) {
+ // Reducers do not benefit from LLAP IO so no point in printing
+ if (vertexName.startsWith("Reducer")) {
+ continue;
+ }
+ TezCounters vertexCounters = vertexCounter(statusOptions, vertexName);
+ if (vertexCounters != null) {
+ if (!first) {
+ console.printInfo(SEPARATOR);
+ console.printInfo(LLAP_SUMMARY_HEADER);
+ console.printInfo(SEPARATOR);
+ first = true;
+ }
+ console.printInfo(vertexSummary(vertexName, counterGroup, vertexCounters));
+ }
+ }
+ console.printInfo(SEPARATOR);
+ console.printInfo("");
+ }
+
+ private String vertexSummary(String vertexName, String counterGroup, TezCounters vertexCounters) {
+ final long selectedRowgroups = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.SELECTED_ROWGROUPS.name());
+ final long metadataCacheHit = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.METADATA_CACHE_HIT.name());
+ final long metadataCacheMiss = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.METADATA_CACHE_MISS.name());
+ final long cacheHitBytes = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.CACHE_HIT_BYTES.name());
+ final long cacheMissBytes = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.CACHE_MISS_BYTES.name());
+ final long allocatedBytes = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.ALLOCATED_BYTES.name());
+ final long allocatedUsedBytes = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.ALLOCATED_USED_BYTES.name());
+ final long totalIoTime = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.TOTAL_IO_TIME_NS.name());
+
+
+ return String.format(LLAP_SUMMARY_HEADER_FORMAT,
+ vertexName,
+ selectedRowgroups,
+ metadataCacheHit,
+ metadataCacheMiss,
+ Utilities.humanReadableByteCount(cacheHitBytes),
+ Utilities.humanReadableByteCount(cacheMissBytes),
+ Utilities.humanReadableByteCount(allocatedBytes),
+ Utilities.humanReadableByteCount(allocatedUsedBytes),
+ secondsFormatter.format(totalIoTime / 1000_000_000.0) + "s");
+ }
+
+ private TezCounters vertexCounter(Set<StatusGetOpts> statusOptions, String vertexName) {
+ try {
+ return dagClient.getVertexStatus(vertexName, statusOptions).getVertexCounters();
+ } catch (IOException | TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
new file mode 100644
index 0000000..6311335
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
@@ -0,0 +1,7 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+interface PrintSummary {
+ void print(SessionState.LogHelper console);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
new file mode 100644
index 0000000..1625ce1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
@@ -0,0 +1,75 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import java.text.DecimalFormat;
+
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+
+class QueryExecutionBreakdownSummary implements PrintSummary {
+ // Methods summary
+ private static final String OPERATION_SUMMARY = "%-35s %9s";
+ private static final String OPERATION = "OPERATION";
+ private static final String DURATION = "DURATION";
+
+
+ private DecimalFormat decimalFormat = new DecimalFormat("#0.00");
+ private PerfLogger perfLogger;
+
+ private final Long compileEndTime;
+ private final Long dagSubmitStartTime;
+ private final Long submitToRunningDuration;
+
+ QueryExecutionBreakdownSummary(PerfLogger perfLogger) {
+ this.perfLogger = perfLogger;
+ this.compileEndTime = perfLogger.getEndTime(PerfLogger.COMPILE);
+ this.dagSubmitStartTime = perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG);
+ this.submitToRunningDuration = perfLogger.getDuration(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+ }
+
+ private String formatNumber(long number) {
+ return decimalFormat.format(number / 1000.0) + "s";
+ }
+
+ private String format(String value, long number) {
+ return String.format(OPERATION_SUMMARY, value, formatNumber(number));
+ }
+
+ public void print(SessionState.LogHelper console) {
+ console.printInfo("Query Execution Summary");
+
+ String execBreakdownHeader = String.format(OPERATION_SUMMARY, OPERATION, DURATION);
+ console.printInfo(SEPARATOR);
+ console.printInfo(execBreakdownHeader);
+ console.printInfo(SEPARATOR);
+
+ // parse, analyze, optimize and compile
+ long compile = compileEndTime - perfLogger.getStartTime(PerfLogger.COMPILE);
+ console.printInfo(format("Compile Query", compile));
+
+ // prepare plan for submission (building DAG, adding resources, creating scratch dirs etc.)
+ long totalDAGPrep = dagSubmitStartTime - compileEndTime;
+ console.printInfo(format("Prepare Plan", totalDAGPrep));
+
+ // submit to accept dag (if session is closed, this will include re-opening of session time,
+ // localizing files for AM, submitting DAG)
+ long submitToAccept = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) - dagSubmitStartTime;
+ console.printInfo(format("Submit Plan", submitToAccept));
+
+ // accept to start dag (schedule wait time, resource wait time etc.)
+ console.printInfo(format("Start DAG", submitToRunningDuration));
+
+ // time to actually run the dag (actual dag runtime)
+ final long startToEnd;
+ if (submitToRunningDuration == 0) {
+ startToEnd = perfLogger.getDuration(PerfLogger.TEZ_RUN_DAG);
+ } else {
+ startToEnd = perfLogger.getEndTime(PerfLogger.TEZ_RUN_DAG) -
+ perfLogger.getEndTime(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+ }
+ console.printInfo(format("Run DAG", startToEnd));
+ console.printInfo(SEPARATOR);
+ console.printInfo("");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
new file mode 100644
index 0000000..1e54f6e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -0,0 +1,397 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ <p>
+ http://www.apache.org/licenses/LICENSE-2.0
+ <p>
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.StringWriter;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
+
+/**
+ * TezJobMonitor keeps track of a tez job while it's being executed. It will
+ * print status to the console and retrieve final status of the job after
+ * completion.
+ */
+public class TezJobMonitor {
+
+ private static final String CLASS_NAME = TezJobMonitor.class.getName();
+ private static final int CHECK_INTERVAL = 200;
+ private static final int MAX_RETRY_INTERVAL = 2500;
+ private static final int PRINT_INTERVAL = 3000;
+
+ private final PerfLogger perfLogger = SessionState.getPerfLogger();
+ private static final List<DAGClient> shutdownList;
+ private final Map<String, BaseWork> workMap;
+
+ private transient LogHelper console;
+
+ private long lastPrintTime;
+ private StringWriter diagnostics = new StringWriter();
+
+ interface UpdateFunction {
+ void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report);
+ }
+
+ static {
+ shutdownList = new LinkedList<>();
+ ShutdownHookManager.addShutdownHook(new Runnable() {
+ @Override
+ public void run() {
+ TezJobMonitor.killRunningJobs();
+ try {
+ TezSessionPoolManager.getInstance().closeNonDefaultSessions(false);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ });
+ }
+
+ public static void initShutdownHook() {
+ Preconditions.checkNotNull(shutdownList,
+ "Shutdown hook was not properly initialized");
+ }
+
+ private final DAGClient dagClient;
+ private final HiveConf hiveConf;
+ private final DAG dag;
+ private final Context context;
+ private long executionStartTime = 0;
+ private final UpdateFunction updateFunction;
+ /**
+ * Have to use the same instance to render else the number lines printed earlier is lost and the
+ * screen will print the table again and again.
+ */
+ private final InPlaceUpdate inPlaceUpdate;
+
+ public TezJobMonitor(Map<String, BaseWork> workMap, final DAGClient dagClient, HiveConf conf, DAG dag,
+ Context ctx) {
+ this.workMap = workMap;
+ this.dagClient = dagClient;
+ this.hiveConf = conf;
+ this.dag = dag;
+ this.context = ctx;
+ console = SessionState.getConsole();
+ inPlaceUpdate = new InPlaceUpdate(LogHelper.getInfoStream());
+ updateFunction = updateFunction();
+ }
+
+ private UpdateFunction updateFunction() {
+ UpdateFunction logToFileFunction = new UpdateFunction() {
+ @Override
+ public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) {
+ SessionState.get().updateProgressMonitor(progressMonitor(status, vertexProgressMap));
+ console.printInfo(report);
+ }
+ };
+ UpdateFunction inPlaceUpdateFunction = new UpdateFunction() {
+ @Override
+ public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) {
+ inPlaceUpdate.render(progressMonitor(status, vertexProgressMap));
+ console.logInfo(report);
+ }
+ };
+ return InPlaceUpdate.canRenderInPlace(hiveConf)
+ && !SessionState.getConsole().getIsSilent()
+ && !SessionState.get().isHiveServerQuery()
+ ? inPlaceUpdateFunction : logToFileFunction;
+ }
+
+ private boolean isProfilingEnabled() {
+ return HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
+ Utilities.isPerfOrAboveLogging(hiveConf);
+ }
+
+ public int monitorExecution() {
+ boolean done = false;
+ boolean success = false;
+ int failedCounter = 0;
+ int rc = 0;
+ DAGStatus status = null;
+ Map<String, Progress> vertexProgressMap = null;
+
+
+ long monitorStartTime = System.currentTimeMillis();
+ synchronized (shutdownList) {
+ shutdownList.add(dagClient);
+ }
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+ DAGStatus.State lastState = null;
+ String lastReport = null;
+ boolean running = false;
+
+ while (true) {
+
+ try {
+ if (context != null) {
+ context.checkHeartbeaterLockException();
+ }
+
+ status = dagClient.getDAGStatus(new HashSet<StatusGetOpts>(), CHECK_INTERVAL);
+ vertexProgressMap = status.getVertexProgress();
+ DAGStatus.State state = status.getState();
+
+ if (state != lastState || state == RUNNING) {
+ lastState = state;
+
+ switch (state) {
+ case SUBMITTED:
+ console.printInfo("Status: Submitted");
+ break;
+ case INITING:
+ console.printInfo("Status: Initializing");
+ this.executionStartTime = System.currentTimeMillis();
+ break;
+ case RUNNING:
+ if (!running) {
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+ console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n");
+ this.executionStartTime = System.currentTimeMillis();
+ running = true;
+ }
+ lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ break;
+ case SUCCEEDED:
+ if (!running) {
+ this.executionStartTime = monitorStartTime;
+ }
+ lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ success = true;
+ running = false;
+ done = true;
+ break;
+ case KILLED:
+ if (!running) {
+ this.executionStartTime = monitorStartTime;
+ }
+ lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ console.printInfo("Status: Killed");
+ running = false;
+ done = true;
+ rc = 1;
+ break;
+ case FAILED:
+ case ERROR:
+ if (!running) {
+ this.executionStartTime = monitorStartTime;
+ }
+ lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ console.printError("Status: Failed");
+ running = false;
+ done = true;
+ rc = 2;
+ break;
+ }
+ }
+ } catch (Exception e) {
+ console.printInfo("Exception: " + e.getMessage());
+ boolean isInterrupted = hasInterruptedException(e);
+ if (isInterrupted || (++failedCounter % MAX_RETRY_INTERVAL / CHECK_INTERVAL == 0)) {
+ try {
+ console.printInfo("Killing DAG...");
+ dagClient.tryKillDAG();
+ } catch (IOException | TezException tezException) {
+ // best effort
+ }
+ console
+ .printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e));
+ rc = 1;
+ done = true;
+ } else {
+ console.printInfo("Retrying...");
+ }
+ } finally {
+ if (done) {
+ if (rc != 0 && status != null) {
+ for (String diag : status.getDiagnostics()) {
+ console.printError(diag);
+ diagnostics.append(diag);
+ }
+ }
+ synchronized (shutdownList) {
+ shutdownList.remove(dagClient);
+ }
+ break;
+ }
+ }
+ }
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
+ printSummary(success, vertexProgressMap);
+ return rc;
+ }
+
+ private void printSummary(boolean success, Map<String, Progress> progressMap) {
+ if (isProfilingEnabled() && success && progressMap != null) {
+
+ double duration = (System.currentTimeMillis() - this.executionStartTime) / 1000.0;
+ console.printInfo("Status: DAG finished successfully in " + String.format("%.2f seconds", duration));
+ console.printInfo("");
+
+ new QueryExecutionBreakdownSummary(perfLogger).print(console);
+ new DAGSummary(progressMap, hiveConf, dagClient, dag, perfLogger).print(console);
+
+ //llap IO summary
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.LLAP_IO_ENABLED, false)) {
+ new LLAPioSummary(progressMap, dagClient).print(console);
+ new FSCountersSummary(progressMap, dagClient).print(console);
+ }
+ console.printInfo("");
+ }
+ }
+
+ private static boolean hasInterruptedException(Throwable e) {
+ // Hadoop IPC wraps InterruptedException. GRRR.
+ while (e != null) {
+ if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
+ return true;
+ }
+ e = e.getCause();
+ }
+ return false;
+ }
+
+ /**
+ * killRunningJobs tries to terminate execution of all
+ * currently running tez queries. No guarantees, best effort only.
+ */
+ private static void killRunningJobs() {
+ synchronized (shutdownList) {
+ for (DAGClient c : shutdownList) {
+ try {
+ System.err.println("Trying to shutdown DAG");
+ c.tryKillDAG();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ static long getCounterValueByGroupName(TezCounters vertexCounters, String groupNamePattern,
+ String counterName) {
+ TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName);
+ return (tezCounter == null) ? 0 : tezCounter.getValue();
+ }
+
+ private String updateStatus(DAGStatus status, Map<String, Progress> vertexProgressMap,
+ String lastReport) {
+ String report = getReport(vertexProgressMap);
+ if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL) {
+ updateFunction.update(status, vertexProgressMap, report);
+ lastPrintTime = System.currentTimeMillis();
+ }
+ return report;
+ }
+
+ private String getReport(Map<String, Progress> progressMap) {
+ StringBuilder reportBuffer = new StringBuilder();
+
+ SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+ for (String s : keys) {
+ Progress progress = progressMap.get(s);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ final int running = progress.getRunningTaskCount();
+ final int failed = progress.getFailedTaskAttemptCount();
+ if (total <= 0) {
+ reportBuffer.append(String.format("%s: -/-\t", s));
+ } else {
+ if (complete == total) {
+ /*
+ * We may have missed the start of the vertex due to the 3 seconds interval
+ */
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+ if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
+
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+
+ /* vertex is started, but not complete */
+ if (failed > 0) {
+ reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
+ } else {
+ reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
+ }
+ } else {
+ /* vertex is waiting for input/slots or complete */
+ if (failed > 0) {
+ /* tasks finished but some failed */
+ reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
+ } else {
+ reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
+ }
+ }
+ }
+ }
+
+ return reportBuffer.toString();
+ }
+
+ public String getDiagnostics() {
+ return diagnostics.toString();
+ }
+
+ private ProgressMonitor progressMonitor(DAGStatus status, Map<String, Progress> progressMap) {
+ try {
+ return new TezProgressMonitor(dagClient, status, workMap, progressMap, console,
+ executionStartTime);
+ } catch (IOException | TezException e) {
+ console.printInfo("Getting Progress Information: " + e.getMessage() + " stack trace: " +
+ ExceptionUtils.getStackTrace(e));
+ }
+ return TezProgressMonitor.NULL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
new file mode 100644
index 0000000..3475fc2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
@@ -0,0 +1,313 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.apache.tez.dag.api.client.DAGStatus.State.KILLED;
+
+class TezProgressMonitor implements ProgressMonitor {
+ private static final int COLUMN_1_WIDTH = 16;
+ private final Map<String, BaseWork> workMap;
+ private final SessionState.LogHelper console;
+ private final long executionStartTime;
+ private final DAGStatus status;
+ Map<String, VertexStatus> vertexStatusMap = new HashMap<>();
+ Map<String, VertexProgress> progressCountsMap = new HashMap<>();
+
+ /**
+ * Try to get most the data required from dagClient in the constructor itself so that even after
+ * the tez job has finished this object can be used for later use.s
+ */
+ TezProgressMonitor(DAGClient dagClient, DAGStatus status, Map<String, BaseWork> workMap,
+ Map<String, Progress> progressMap, SessionState.LogHelper console, long executionStartTime)
+ throws IOException, TezException {
+ this.status = status;
+ this.workMap = workMap;
+ this.console = console;
+ this.executionStartTime = executionStartTime;
+ for (Map.Entry<String, Progress> entry : progressMap.entrySet()) {
+ String vertexName = entry.getKey();
+ progressCountsMap.put(vertexName, new VertexProgress(entry.getValue(), status.getState()));
+ try {
+ vertexStatusMap.put(vertexName, dagClient.getVertexStatus(vertexName, null));
+ } catch (IOException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ }
+ }
+
+ public List<String> headers() {
+ return Arrays.asList(
+ "VERTICES",
+ "MODE",
+ "STATUS",
+ "TOTAL",
+ "COMPLETED",
+ "RUNNING",
+ "PENDING",
+ "FAILED",
+ "KILLED"
+ );
+ }
+
+ public List<List<String>> rows() {
+ try {
+ List<List<String>> results = new ArrayList<>();
+ SortedSet<String> keys = new TreeSet<>(progressCountsMap.keySet());
+ for (String s : keys) {
+ VertexProgress progress = progressCountsMap.get(s);
+
+ // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0
+
+ results.add(
+ Arrays.asList(
+ getNameWithProgress(s, progress.succeededTaskCount, progress.totalTaskCount),
+ getMode(s, workMap),
+ progress.vertexStatus(vertexStatusMap.get(s)),
+ progress.total(),
+ progress.completed(),
+ progress.running(),
+ progress.pending(),
+ progress.failed(),
+ progress.killed()
+ )
+ );
+ }
+ return results;
+ } catch (Exception e) {
+ console.printInfo(
+ "Getting Progress Bar table rows failed: " + e.getMessage() + " stack trace: " + Arrays
+ .toString(e.getStackTrace())
+ );
+ }
+ return Collections.emptyList();
+ }
+
+ // -------------------------------------------------------------------------------
+ // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s
+ // -------------------------------------------------------------------------------
+ // contains footerSummary , progressedPercentage, starTime
+
+ @Override
+ public String footerSummary() {
+ return String.format("VERTICES: %02d/%02d", completed(), progressCountsMap.keySet().size());
+ }
+
+ @Override
+ public long startTime() {
+ return executionStartTime;
+ }
+
+ @Override
+ public double progressedPercentage() {
+ int sumTotal = 0, sumComplete = 0;
+ for (String s : progressCountsMap.keySet()) {
+ VertexProgress progress = progressCountsMap.get(s);
+ final int complete = progress.succeededTaskCount;
+ final int total = progress.totalTaskCount;
+ if (total > 0) {
+ sumTotal += total;
+ sumComplete += complete;
+ }
+ }
+ return (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
+ }
+
+ @Override
+ public String executionStatus() {
+ return this.status.getState().name();
+ }
+
+ private int completed() {
+ Set<String> completed = new HashSet<>();
+ for (String s : progressCountsMap.keySet()) {
+ VertexProgress progress = progressCountsMap.get(s);
+ final int complete = progress.succeededTaskCount;
+ final int total = progress.totalTaskCount;
+ if (total > 0) {
+ if (complete == total) {
+ completed.add(s);
+ }
+ }
+ }
+ return completed.size();
+ }
+
+ // Map 1 ..........
+
+ private String getNameWithProgress(String s, int complete, int total) {
+ String result = "";
+ if (s != null) {
+ float percent = total == 0 ? 0.0f : (float) complete / (float) total;
+ // lets use the remaining space in column 1 as progress bar
+ int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
+ String trimmedVName = s;
+
+ // if the vertex name is longer than column 1 width, trim it down
+ // "Tez Merge File Work" will become "Tez Merge File.."
+ if (s.length() > COLUMN_1_WIDTH) {
+ trimmedVName = s.substring(0, COLUMN_1_WIDTH - 1);
+ trimmedVName = trimmedVName + "..";
+ }
+
+ result = trimmedVName + " ";
+ int toFill = (int) (spaceRemaining * percent);
+ for (int i = 0; i < toFill; i++) {
+ result += ".";
+ }
+ }
+ return result;
+ }
+
+ private String getMode(String name, Map<String, BaseWork> workMap) {
+ String mode = "container";
+ BaseWork work = workMap.get(name);
+ if (work != null) {
+ // uber > llap > container
+ if (work.getUberMode()) {
+ mode = "uber";
+ } else if (work.getLlapMode()) {
+ mode = "llap";
+ } else {
+ mode = "container";
+ }
+ }
+ return mode;
+ }
+
+ static class VertexProgress {
+ private final int totalTaskCount;
+ private final int succeededTaskCount;
+ private final int failedTaskAttemptCount;
+ private final long killedTaskAttemptCount;
+ private final int runningTaskCount;
+ private final DAGStatus.State dagState;
+
+ VertexProgress(Progress progress, DAGStatus.State dagState) {
+ this(progress.getTotalTaskCount(), progress.getSucceededTaskCount(),
+ progress.getFailedTaskAttemptCount(), progress.getKilledTaskAttemptCount(),
+ progress.getRunningTaskCount(), dagState);
+ }
+
+ VertexProgress(int totalTaskCount, int succeededTaskCount, int failedTaskAttemptCount,
+ int killedTaskAttemptCount, int runningTaskCount, DAGStatus.State dagState) {
+ this.totalTaskCount = totalTaskCount;
+ this.succeededTaskCount = succeededTaskCount;
+ this.failedTaskAttemptCount = failedTaskAttemptCount;
+ this.killedTaskAttemptCount = killedTaskAttemptCount;
+ this.runningTaskCount = runningTaskCount;
+ this.dagState = dagState;
+ }
+
+ boolean isRunning() {
+ return succeededTaskCount < totalTaskCount && (succeededTaskCount > 0 || runningTaskCount > 0
+ || failedTaskAttemptCount > 0);
+ }
+
+ String vertexStatus(VertexStatus vertexStatus) {
+ // To get vertex status we can use DAGClient.getVertexStatus(), but it will be expensive to
+ // get status from AM for every refresh of the UI. Lets infer the state from task counts.
+ // Only if DAG is FAILED or KILLED the vertex status is fetched from AM.
+ VertexStatus.State vertexState = VertexStatus.State.INITIALIZING;
+ if (totalTaskCount > 0) {
+ vertexState = VertexStatus.State.INITED;
+ }
+
+ // RUNNING state
+ if (isRunning()) {
+ vertexState = VertexStatus.State.RUNNING;
+ }
+
+ // SUCCEEDED state
+ if (succeededTaskCount == totalTaskCount) {
+ vertexState = VertexStatus.State.SUCCEEDED;
+ }
+
+ // DAG might have been killed, lets try to get vertex state from AM before dying
+ // KILLED or FAILED state
+ if (dagState == KILLED) {
+ if (vertexStatus != null) {
+ vertexState = vertexStatus.getState();
+ }
+ }
+ return vertexState.toString();
+ }
+
+ // "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED"
+
+ String total() {
+ return String.valueOf(totalTaskCount);
+ }
+
+ String completed() {
+ return String.valueOf(succeededTaskCount);
+ }
+
+ String running() {
+ return String.valueOf(runningTaskCount);
+ }
+
+ String pending() {
+ return String.valueOf(totalTaskCount - succeededTaskCount - runningTaskCount);
+ }
+
+ String failed() {
+ return String.valueOf(failedTaskAttemptCount);
+ }
+
+ String killed() {
+ return String.valueOf(killedTaskAttemptCount);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ VertexProgress that = (VertexProgress) o;
+
+ if (totalTaskCount != that.totalTaskCount)
+ return false;
+ if (succeededTaskCount != that.succeededTaskCount)
+ return false;
+ if (failedTaskAttemptCount != that.failedTaskAttemptCount)
+ return false;
+ if (killedTaskAttemptCount != that.killedTaskAttemptCount)
+ return false;
+ if (runningTaskCount != that.runningTaskCount)
+ return false;
+ return dagState == that.dagState;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = totalTaskCount;
+ result = 31 * result + succeededTaskCount;
+ result = 31 * result + failedTaskAttemptCount;
+ result = 31 * result + (int) (killedTaskAttemptCount ^ (killedTaskAttemptCount >>> 32));
+ result = 31 * result + runningTaskCount;
+ result = 31 * result + dagState.hashCode();
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index c5b3517..ed854bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -26,8 +26,7 @@ import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM;
import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
@@ -127,11 +126,11 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.FunctionTask;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
-import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
@@ -1898,7 +1897,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
final AtomicInteger partitionsLoaded = new AtomicInteger(0);
final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
- && InPlaceUpdates.inPlaceEligible(conf);
+ && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent();
final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null;
final SessionState parentSession = SessionState.get();
@@ -1926,9 +1925,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (inPlaceEligible) {
synchronized (ps) {
- InPlaceUpdates.rePositionCursor(ps);
+ InPlaceUpdate.rePositionCursor(ps);
partitionsLoaded.incrementAndGet();
- InPlaceUpdates.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
+ InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
+ partsToLoad + " partitions.");
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index d607f61..3e01e92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -185,6 +186,7 @@ public class SessionState {
private HiveAuthorizationProvider authorizer;
private HiveAuthorizer authorizerV2;
+ private volatile ProgressMonitor progressMonitor;
public enum AuthorizationMode{V1, V2};
@@ -1564,6 +1566,7 @@ public class SessionState {
// removes the threadlocal variables, closes underlying HMS connection
Hive.closeCurrent();
}
+ progressMonitor = null;
}
private void unCacheDataNucleusClassLoaders() {
@@ -1744,6 +1747,15 @@ public class SessionState {
public String getReloadableAuxJars() {
return StringUtils.join(preReloadableAuxJars, ',');
}
+
+ public void updateProgressMonitor(ProgressMonitor progressMonitor) {
+ this.progressMonitor = progressMonitor;
+ }
+
+ public ProgressMonitor getProgressMonitor() {
+ return progressMonitor;
+ }
+
}
class ResourceMaps {
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java
new file mode 100644
index 0000000..648d625
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java
@@ -0,0 +1,101 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestTezProgressMonitor {
+
+ private static final String REDUCER = "Reducer";
+ private static final String MAPPER = "Mapper";
+ @Mock
+ private DAGClient dagClient;
+ @Mock
+ private SessionState.LogHelper console;
+ @Mock
+ private DAGStatus dagStatus;
+ @Mock
+ private Progress mapperProgress;
+ @Mock
+ private Progress reducerProgress;
+ @Mock
+ private VertexStatus succeeded;
+ @Mock
+ private VertexStatus running;
+
+ private Map<String, Progress> progressMap() {
+ return new HashMap<String, Progress>() {{
+ put(MAPPER, setup(mapperProgress, 2, 1, 3, 4, 5));
+ put(REDUCER, setup(reducerProgress, 3, 2, 1, 0, 1));
+ }};
+ }
+
+ private Progress setup(Progress progressMock, int total, int succeeded, int failedAttempt,
+ int killedAttempt, int running) {
+ when(progressMock.getTotalTaskCount()).thenReturn(total);
+ when(progressMock.getSucceededTaskCount()).thenReturn(succeeded);
+ when(progressMock.getFailedTaskAttemptCount()).thenReturn(failedAttempt);
+ when(progressMock.getKilledTaskAttemptCount()).thenReturn(killedAttempt);
+ when(progressMock.getRunningTaskCount()).thenReturn(running);
+ return progressMock;
+ }
+
+ @Test
+ public void setupInternalStateOnObjectCreation() throws IOException, TezException {
+ when(dagStatus.getState()).thenReturn(DAGStatus.State.RUNNING);
+ when(dagClient.getVertexStatus(eq(MAPPER), anySet())).thenReturn(succeeded);
+ when(dagClient.getVertexStatus(eq(REDUCER), anySet())).thenReturn(running);
+
+ TezProgressMonitor monitor =
+ new TezProgressMonitor(dagClient, dagStatus, new HashMap<String, BaseWork>(), progressMap(), console,
+ Long.MAX_VALUE);
+
+ verify(dagClient).getVertexStatus(eq(MAPPER), isNull(Set.class));
+ verify(dagClient).getVertexStatus(eq(REDUCER), isNull(Set.class));
+ verifyNoMoreInteractions(dagClient);
+
+ assertThat(monitor.vertexStatusMap.keySet(), hasItems(MAPPER, REDUCER));
+ assertThat(monitor.vertexStatusMap.get(MAPPER), is(sameInstance(succeeded)));
+ assertThat(monitor.vertexStatusMap.get(REDUCER), is(sameInstance(running)));
+
+ assertThat(monitor.progressCountsMap.keySet(), hasItems(MAPPER, REDUCER));
+ TezProgressMonitor.VertexProgress expectedMapperState =
+ new TezProgressMonitor.VertexProgress(2, 1, 3, 4, 5, DAGStatus.State.RUNNING);
+ assertThat(monitor.progressCountsMap.get(MAPPER), is(equalTo(expectedMapperState)));
+
+ TezProgressMonitor.VertexProgress expectedReducerState =
+ new TezProgressMonitor.VertexProgress(3, 2, 1, 0, 1, DAGStatus.State.RUNNING);
+ assertThat(monitor.progressCountsMap.get(REDUCER), is(equalTo(expectedReducerState)));
+
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/if/TCLIService.thrift
----------------------------------------------------------------------
diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift
index a4fa7b0..824b049 100644
--- a/service-rpc/if/TCLIService.thrift
+++ b/service-rpc/if/TCLIService.thrift
@@ -63,6 +63,9 @@ enum TProtocolVersion {
// V9 adds support for serializing ResultSets in SerDe
HIVE_CLI_SERVICE_PROTOCOL_V9
+
+ // V10 adds support for in place updates via GetOperationStatus
+ HIVE_CLI_SERVICE_PROTOCOL_V10
}
enum TTypeId {
@@ -559,7 +562,7 @@ struct TOperationHandle {
// which operations may be executed.
struct TOpenSessionReq {
// The version of the HiveServer2 protocol that the client is using.
- 1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9
+ 1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10
// Username and password for authentication.
// Depending on the authentication scheme being used,
@@ -578,7 +581,7 @@ struct TOpenSessionResp {
1: required TStatus status
// The protocol version that the server is using.
- 2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9
+ 2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10
// Session Handle
3: optional TSessionHandle sessionHandle
@@ -1019,6 +1022,8 @@ struct TGetCrossReferenceResp {
struct TGetOperationStatusReq {
// Session to run this request against
1: required TOperationHandle operationHandle
+ // optional arguments to get progress information
+ 2: optional bool getProgressUpdate
}
struct TGetOperationStatusResp {
@@ -1047,6 +1052,8 @@ struct TGetOperationStatusResp {
// If the operation has the result
9: optional bool hasResultSet
+ 10: optional TProgressUpdateResp progressUpdateResponse
+
}
@@ -1202,6 +1209,21 @@ struct TRenewDelegationTokenResp {
1: required TStatus status
}
+enum TJobExecutionStatus {
+ IN_PROGRESS,
+ COMPLETE,
+ NOT_AVAILABLE
+}
+
+struct TProgressUpdateResp {
+ 1: required list<string> headerNames
+ 2: required list<list<string>> rows
+ 3: required double progressedPercentage
+ 4: required TJobExecutionStatus status
+ 5: required string footerSummary
+ 6: required i64 startTime
+}
+
service TCLIService {
TOpenSessionResp OpenSession(1:TOpenSessionReq req);
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
index 2f460e8..0a17e0e 100644
--- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
+++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
@@ -269,6 +269,18 @@ const char* _kTFetchOrientationNames[] = {
};
const std::map<int, const char*> _TFetchOrientation_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kTFetchOrientationValues, _kTFetchOrientationNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+int _kTJobExecutionStatusValues[] = {
+ TJobExecutionStatus::IN_PROGRESS,
+ TJobExecutionStatus::COMPLETE,
+ TJobExecutionStatus::NOT_AVAILABLE
+};
+const char* _kTJobExecutionStatusNames[] = {
+ "IN_PROGRESS",
+ "COMPLETE",
+ "NOT_AVAILABLE"
+};
+const std::map<int, const char*> _TJobExecutionStatus_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(3, _kTJobExecutionStatusValues, _kTJobExecutionStatusNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+
TTypeQualifierValue::~TTypeQualifierValue() throw() {
}
@@ -8174,6 +8186,11 @@ void TGetOperationStatusReq::__set_operationHandle(const TOperationHandle& val)
this->operationHandle = val;
}
+void TGetOperationStatusReq::__set_getProgressUpdate(const bool val) {
+ this->getProgressUpdate = val;
+__isset.getProgressUpdate = true;
+}
+
uint32_t TGetOperationStatusReq::read(::apache::thrift::protocol::TProtocol* iprot) {
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -8204,6 +8221,14 @@ uint32_t TGetOperationStatusReq::read(::apache::thrift::protocol::TProtocol* ipr
xfer += iprot->skip(ftype);
}
break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_BOOL) {
+ xfer += iprot->readBool(this->getProgressUpdate);
+ this->__isset.getProgressUpdate = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -8227,6 +8252,11 @@ uint32_t TGetOperationStatusReq::write(::apache::thrift::protocol::TProtocol* op
xfer += this->operationHandle.write(oprot);
xfer += oprot->writeFieldEnd();
+ if (this->__isset.getProgressUpdate) {
+ xfer += oprot->writeFieldBegin("getProgressUpdate", ::apache::thrift::protocol::T_BOOL, 2);
+ xfer += oprot->writeBool(this->getProgressUpdate);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -8235,19 +8265,26 @@ uint32_t TGetOperationStatusReq::write(::apache::thrift::protocol::TProtocol* op
void swap(TGetOperationStatusReq &a, TGetOperationStatusReq &b) {
using ::std::swap;
swap(a.operationHandle, b.operationHandle);
+ swap(a.getProgressUpdate, b.getProgressUpdate);
+ swap(a.__isset, b.__isset);
}
TGetOperationStatusReq::TGetOperationStatusReq(const TGetOperationStatusReq& other268) {
operationHandle = other268.operationHandle;
+ getProgressUpdate = other268.getProgressUpdate;
+ __isset = other268.__isset;
}
TGetOperationStatusReq& TGetOperationStatusReq::operator=(const TGetOperationStatusReq& other269) {
operationHandle = other269.operationHandle;
+ getProgressUpdate = other269.getProgressUpdate;
+ __isset = other269.__isset;
return *this;
}
void TGetOperationStatusReq::printTo(std::ostream& out) const {
using ::apache::thrift::to_string;
out << "TGetOperationStatusReq(";
out << "operationHandle=" << to_string(operationHandle);
+ out << ", " << "getProgressUpdate="; (__isset.getProgressUpdate ? (out << to_string(getProgressUpdate)) : (out << "<null>"));
out << ")";
}
@@ -8300,6 +8337,11 @@ void TGetOperationStatusResp::__set_hasResultSet(const bool val) {
__isset.hasResultSet = true;
}
+void TGetOperationStatusResp::__set_progressUpdateResponse(const TProgressUpdateResp& val) {
+ this->progressUpdateResponse = val;
+__isset.progressUpdateResponse = true;
+}
+
uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* iprot) {
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -8396,6 +8438,14 @@ uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* ip
xfer += iprot->skip(ftype);
}
break;
+ case 10:
+ if (ftype == ::apache::thrift::protocol::T_STRUCT) {
+ xfer += this->progressUpdateResponse.read(iprot);
+ this->__isset.progressUpdateResponse = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -8459,6 +8509,11 @@ uint32_t TGetOperationStatusResp::write(::apache::thrift::protocol::TProtocol* o
xfer += oprot->writeBool(this->hasResultSet);
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.progressUpdateResponse) {
+ xfer += oprot->writeFieldBegin("progressUpdateResponse", ::apache::thrift::protocol::T_STRUCT, 10);
+ xfer += this->progressUpdateResponse.write(oprot);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -8475,6 +8530,7 @@ void swap(TGetOperationStatusResp &a, TGetOperationStatusResp &b) {
swap(a.operationStarted, b.operationStarted);
swap(a.operationCompleted, b.operationCompleted);
swap(a.hasResultSet, b.hasResultSet);
+ swap(a.progressUpdateResponse, b.progressUpdateResponse);
swap(a.__isset, b.__isset);
}
@@ -8488,6 +8544,7 @@ TGetOperationStatusResp::TGetOperationStatusResp(const TGetOperationStatusResp&
operationStarted = other271.operationStarted;
operationCompleted = other271.operationCompleted;
hasResultSet = other271.hasResultSet;
+ progressUpdateResponse = other271.progressUpdateResponse;
__isset = other271.__isset;
}
TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationStatusResp& other272) {
@@ -8500,6 +8557,7 @@ TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationS
operationStarted = other272.operationStarted;
operationCompleted = other272.operationCompleted;
hasResultSet = other272.hasResultSet;
+ progressUpdateResponse = other272.progressUpdateResponse;
__isset = other272.__isset;
return *this;
}
@@ -8515,6 +8573,7 @@ void TGetOperationStatusResp::printTo(std::ostream& out) const {
out << ", " << "operationStarted="; (__isset.operationStarted ? (out << to_string(operationStarted)) : (out << "<null>"));
out << ", " << "operationCompleted="; (__isset.operationCompleted ? (out << to_string(operationCompleted)) : (out << "<null>"));
out << ", " << "hasResultSet="; (__isset.hasResultSet ? (out << to_string(hasResultSet)) : (out << "<null>"));
+ out << ", " << "progressUpdateResponse="; (__isset.progressUpdateResponse ? (out << to_string(progressUpdateResponse)) : (out << "<null>"));
out << ")";
}
@@ -9984,4 +10043,267 @@ void TRenewDelegationTokenResp::printTo(std::ostream& out) const {
out << ")";
}
+
+TProgressUpdateResp::~TProgressUpdateResp() throw() {
+}
+
+
+void TProgressUpdateResp::__set_headerNames(const std::vector<std::string> & val) {
+ this->headerNames = val;
+}
+
+void TProgressUpdateResp::__set_rows(const std::vector<std::vector<std::string> > & val) {
+ this->rows = val;
+}
+
+void TProgressUpdateResp::__set_progressedPercentage(const double val) {
+ this->progressedPercentage = val;
+}
+
+void TProgressUpdateResp::__set_status(const TJobExecutionStatus::type val) {
+ this->status = val;
+}
+
+void TProgressUpdateResp::__set_footerSummary(const std::string& val) {
+ this->footerSummary = val;
+}
+
+void TProgressUpdateResp::__set_startTime(const int64_t val) {
+ this->startTime = val;
+}
+
+uint32_t TProgressUpdateResp::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ bool isset_headerNames = false;
+ bool isset_rows = false;
+ bool isset_progressedPercentage = false;
+ bool isset_status = false;
+ bool isset_footerSummary = false;
+ bool isset_startTime = false;
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_LIST) {
+ {
+ this->headerNames.clear();
+ uint32_t _size302;
+ ::apache::thrift::protocol::TType _etype305;
+ xfer += iprot->readListBegin(_etype305, _size302);
+ this->headerNames.resize(_size302);
+ uint32_t _i306;
+ for (_i306 = 0; _i306 < _size302; ++_i306)
+ {
+ xfer += iprot->readString(this->headerNames[_i306]);
+ }
+ xfer += iprot->readListEnd();
+ }
+ isset_headerNames = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_LIST) {
+ {
+ this->rows.clear();
+ uint32_t _size307;
+ ::apache::thrift::protocol::TType _etype310;
+ xfer += iprot->readListBegin(_etype310, _size307);
+ this->rows.resize(_size307);
+ uint32_t _i311;
+ for (_i311 = 0; _i311 < _size307; ++_i311)
+ {
+ {
+ this->rows[_i311].clear();
+ uint32_t _size312;
+ ::apache::thrift::protocol::TType _etype315;
+ xfer += iprot->readListBegin(_etype315, _size312);
+ this->rows[_i311].resize(_size312);
+ uint32_t _i316;
+ for (_i316 = 0; _i316 < _size312; ++_i316)
+ {
+ xfer += iprot->readString(this->rows[_i311][_i316]);
+ }
+ xfer += iprot->readListEnd();
+ }
+ }
+ xfer += iprot->readListEnd();
+ }
+ isset_rows = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 3:
+ if (ftype == ::apache::thrift::protocol::T_DOUBLE) {
+ xfer += iprot->readDouble(this->progressedPercentage);
+ isset_progressedPercentage = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 4:
+ if (ftype == ::apache::thrift::protocol::T_I32) {
+ int32_t ecast317;
+ xfer += iprot->readI32(ecast317);
+ this->status = (TJobExecutionStatus::type)ecast317;
+ isset_status = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 5:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->footerSummary);
+ isset_footerSummary = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 6:
+ if (ftype == ::apache::thrift::protocol::T_I64) {
+ xfer += iprot->readI64(this->startTime);
+ isset_startTime = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ if (!isset_headerNames)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_rows)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_progressedPercentage)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_status)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_footerSummary)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_startTime)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ return xfer;
+}
+
+uint32_t TProgressUpdateResp::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+ xfer += oprot->writeStructBegin("TProgressUpdateResp");
+
+ xfer += oprot->writeFieldBegin("headerNames", ::apache::thrift::protocol::T_LIST, 1);
+ {
+ xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>(this->headerNames.size()));
+ std::vector<std::string> ::const_iterator _iter318;
+ for (_iter318 = this->headerNames.begin(); _iter318 != this->headerNames.end(); ++_iter318)
+ {
+ xfer += oprot->writeString((*_iter318));
+ }
+ xfer += oprot->writeListEnd();
+ }
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("rows", ::apache::thrift::protocol::T_LIST, 2);
+ {
+ xfer += oprot->writeListBegin(::apache::thrift::protocol::T_LIST, static_cast<uint32_t>(this->rows.size()));
+ std::vector<std::vector<std::string> > ::const_iterator _iter319;
+ for (_iter319 = this->rows.begin(); _iter319 != this->rows.end(); ++_iter319)
+ {
+ {
+ xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>((*_iter319).size()));
+ std::vector<std::string> ::const_iterator _iter320;
+ for (_iter320 = (*_iter319).begin(); _iter320 != (*_iter319).end(); ++_iter320)
+ {
+ xfer += oprot->writeString((*_iter320));
+ }
+ xfer += oprot->writeListEnd();
+ }
+ }
+ xfer += oprot->writeListEnd();
+ }
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("progressedPercentage", ::apache::thrift::protocol::T_DOUBLE, 3);
+ xfer += oprot->writeDouble(this->progressedPercentage);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("status", ::apache::thrift::protocol::T_I32, 4);
+ xfer += oprot->writeI32((int32_t)this->status);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("footerSummary", ::apache::thrift::protocol::T_STRING, 5);
+ xfer += oprot->writeString(this->footerSummary);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("startTime", ::apache::thrift::protocol::T_I64, 6);
+ xfer += oprot->writeI64(this->startTime);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(TProgressUpdateResp &a, TProgressUpdateResp &b) {
+ using ::std::swap;
+ swap(a.headerNames, b.headerNames);
+ swap(a.rows, b.rows);
+ swap(a.progressedPercentage, b.progressedPercentage);
+ swap(a.status, b.status);
+ swap(a.footerSummary, b.footerSummary);
+ swap(a.startTime, b.startTime);
+}
+
+TProgressUpdateResp::TProgressUpdateResp(const TProgressUpdateResp& other321) {
+ headerNames = other321.headerNames;
+ rows = other321.rows;
+ progressedPercentage = other321.progressedPercentage;
+ status = other321.status;
+ footerSummary = other321.footerSummary;
+ startTime = other321.startTime;
+}
+TProgressUpdateResp& TProgressUpdateResp::operator=(const TProgressUpdateResp& other322) {
+ headerNames = other322.headerNames;
+ rows = other322.rows;
+ progressedPercentage = other322.progressedPercentage;
+ status = other322.status;
+ footerSummary = other322.footerSummary;
+ startTime = other322.startTime;
+ return *this;
+}
+void TProgressUpdateResp::printTo(std::ostream& out) const {
+ using ::apache::thrift::to_string;
+ out << "TProgressUpdateResp(";
+ out << "headerNames=" << to_string(headerNames);
+ out << ", " << "rows=" << to_string(rows);
+ out << ", " << "progressedPercentage=" << to_string(progressedPercentage);
+ out << ", " << "status=" << to_string(status);
+ out << ", " << "footerSummary=" << to_string(footerSummary);
+ out << ", " << "startTime=" << to_string(startTime);
+ out << ")";
+}
+
}}}}} // namespace
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
index b249544..6c2bb34 100644
--- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
+++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
@@ -175,6 +175,16 @@ struct TFetchOrientation {
extern const std::map<int, const char*> _TFetchOrientation_VALUES_TO_NAMES;
+struct TJobExecutionStatus {
+ enum type {
+ IN_PROGRESS = 0,
+ COMPLETE = 1,
+ NOT_AVAILABLE = 2
+ };
+};
+
+extern const std::map<int, const char*> _TJobExecutionStatus_VALUES_TO_NAMES;
+
typedef int32_t TTypeEntryPtr;
typedef std::string TIdentifier;
@@ -339,6 +349,8 @@ class TRenewDelegationTokenReq;
class TRenewDelegationTokenResp;
+class TProgressUpdateResp;
+
typedef struct _TTypeQualifierValue__isset {
_TTypeQualifierValue__isset() : i32Value(false), stringValue(false) {}
bool i32Value :1;
@@ -3669,24 +3681,37 @@ inline std::ostream& operator<<(std::ostream& out, const TGetCrossReferenceResp&
return out;
}
+typedef struct _TGetOperationStatusReq__isset {
+ _TGetOperationStatusReq__isset() : getProgressUpdate(false) {}
+ bool getProgressUpdate :1;
+} _TGetOperationStatusReq__isset;
class TGetOperationStatusReq {
public:
TGetOperationStatusReq(const TGetOperationStatusReq&);
TGetOperationStatusReq& operator=(const TGetOperationStatusReq&);
- TGetOperationStatusReq() {
+ TGetOperationStatusReq() : getProgressUpdate(0) {
}
virtual ~TGetOperationStatusReq() throw();
TOperationHandle operationHandle;
+ bool getProgressUpdate;
+
+ _TGetOperationStatusReq__isset __isset;
void __set_operationHandle(const TOperationHandle& val);
+ void __set_getProgressUpdate(const bool val);
+
bool operator == (const TGetOperationStatusReq & rhs) const
{
if (!(operationHandle == rhs.operationHandle))
return false;
+ if (__isset.getProgressUpdate != rhs.__isset.getProgressUpdate)
+ return false;
+ else if (__isset.getProgressUpdate && !(getProgressUpdate == rhs.getProgressUpdate))
+ return false;
return true;
}
bool operator != (const TGetOperationStatusReq &rhs) const {
@@ -3710,7 +3735,7 @@ inline std::ostream& operator<<(std::ostream& out, const TGetOperationStatusReq&
}
typedef struct _TGetOperationStatusResp__isset {
- _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false), hasResultSet(false) {}
+ _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false), hasResultSet(false), progressUpdateResponse(false) {}
bool operationState :1;
bool sqlState :1;
bool errorCode :1;
@@ -3719,6 +3744,7 @@ typedef struct _TGetOperationStatusResp__isset {
bool operationStarted :1;
bool operationCompleted :1;
bool hasResultSet :1;
+ bool progressUpdateResponse :1;
} _TGetOperationStatusResp__isset;
class TGetOperationStatusResp {
@@ -3739,6 +3765,7 @@ class TGetOperationStatusResp {
int64_t operationStarted;
int64_t operationCompleted;
bool hasResultSet;
+ TProgressUpdateResp progressUpdateResponse;
_TGetOperationStatusResp__isset __isset;
@@ -3760,6 +3787,8 @@ class TGetOperationStatusResp {
void __set_hasResultSet(const bool val);
+ void __set_progressUpdateResponse(const TProgressUpdateResp& val);
+
bool operator == (const TGetOperationStatusResp & rhs) const
{
if (!(status == rhs.status))
@@ -3796,6 +3825,10 @@ class TGetOperationStatusResp {
return false;
else if (__isset.hasResultSet && !(hasResultSet == rhs.hasResultSet))
return false;
+ if (__isset.progressUpdateResponse != rhs.__isset.progressUpdateResponse)
+ return false;
+ else if (__isset.progressUpdateResponse && !(progressUpdateResponse == rhs.progressUpdateResponse))
+ return false;
return true;
}
bool operator != (const TGetOperationStatusResp &rhs) const {
@@ -4470,6 +4503,71 @@ inline std::ostream& operator<<(std::ostream& out, const TRenewDelegationTokenRe
return out;
}
+
+class TProgressUpdateResp {
+ public:
+
+ TProgressUpdateResp(const TProgressUpdateResp&);
+ TProgressUpdateResp& operator=(const TProgressUpdateResp&);
+ TProgressUpdateResp() : progressedPercentage(0), status((TJobExecutionStatus::type)0), footerSummary(), startTime(0) {
+ }
+
+ virtual ~TProgressUpdateResp() throw();
+ std::vector<std::string> headerNames;
+ std::vector<std::vector<std::string> > rows;
+ double progressedPercentage;
+ TJobExecutionStatus::type status;
+ std::string footerSummary;
+ int64_t startTime;
+
+ void __set_headerNames(const std::vector<std::string> & val);
+
+ void __set_rows(const std::vector<std::vector<std::string> > & val);
+
+ void __set_progressedPercentage(const double val);
+
+ void __set_status(const TJobExecutionStatus::type val);
+
+ void __set_footerSummary(const std::string& val);
+
+ void __set_startTime(const int64_t val);
+
+ bool operator == (const TProgressUpdateResp & rhs) const
+ {
+ if (!(headerNames == rhs.headerNames))
+ return false;
+ if (!(rows == rhs.rows))
+ return false;
+ if (!(progressedPercentage == rhs.progressedPercentage))
+ return false;
+ if (!(status == rhs.status))
+ return false;
+ if (!(footerSummary == rhs.footerSummary))
+ return false;
+ if (!(startTime == rhs.startTime))
+ return false;
+ return true;
+ }
+ bool operator != (const TProgressUpdateResp &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const TProgressUpdateResp & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+ virtual void printTo(std::ostream& out) const;
+};
+
+void swap(TProgressUpdateResp &a, TProgressUpdateResp &b);
+
+inline std::ostream& operator<<(std::ostream& out, const TProgressUpdateResp& obj)
+{
+ obj.printTo(out);
+ return out;
+}
+
}}}}} // namespace
#endif
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
index 84c64cd..af31ce2 100644
--- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
+++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
@@ -39,6 +39,7 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TGetOperationStatusReq");
private static final org.apache.thrift.protocol.TField OPERATION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationHandle", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+ private static final org.apache.thrift.protocol.TField GET_PROGRESS_UPDATE_FIELD_DESC = new org.apache.thrift.protocol.TField("getProgressUpdate", org.apache.thrift.protocol.TType.BOOL, (short)2);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -47,10 +48,12 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
}
private TOperationHandle operationHandle; // required
+ private boolean getProgressUpdate; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- OPERATION_HANDLE((short)1, "operationHandle");
+ OPERATION_HANDLE((short)1, "operationHandle"),
+ GET_PROGRESS_UPDATE((short)2, "getProgressUpdate");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -67,6 +70,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
switch(fieldId) {
case 1: // OPERATION_HANDLE
return OPERATION_HANDLE;
+ case 2: // GET_PROGRESS_UPDATE
+ return GET_PROGRESS_UPDATE;
default:
return null;
}
@@ -107,11 +112,16 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
}
// isset id assignments
+ private static final int __GETPROGRESSUPDATE_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ private static final _Fields optionals[] = {_Fields.GET_PROGRESS_UPDATE};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.OPERATION_HANDLE, new org.apache.thrift.meta_data.FieldMetaData("operationHandle", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TOperationHandle.class)));
+ tmpMap.put(_Fields.GET_PROGRESS_UPDATE, new org.apache.thrift.meta_data.FieldMetaData("getProgressUpdate", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TGetOperationStatusReq.class, metaDataMap);
}
@@ -130,9 +140,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
* Performs a deep copy on <i>other</i>.
*/
public TGetOperationStatusReq(TGetOperationStatusReq other) {
+ __isset_bitfield = other.__isset_bitfield;
if (other.isSetOperationHandle()) {
this.operationHandle = new TOperationHandle(other.operationHandle);
}
+ this.getProgressUpdate = other.getProgressUpdate;
}
public TGetOperationStatusReq deepCopy() {
@@ -142,6 +154,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
@Override
public void clear() {
this.operationHandle = null;
+ setGetProgressUpdateIsSet(false);
+ this.getProgressUpdate = false;
}
public TOperationHandle getOperationHandle() {
@@ -167,6 +181,28 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
}
}
+ public boolean isGetProgressUpdate() {
+ return this.getProgressUpdate;
+ }
+
+ public void setGetProgressUpdate(boolean getProgressUpdate) {
+ this.getProgressUpdate = getProgressUpdate;
+ setGetProgressUpdateIsSet(true);
+ }
+
+ public void unsetGetProgressUpdate() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID);
+ }
+
+ /** Returns true if field getProgressUpdate is set (has been assigned a value) and false otherwise */
+ public boolean isSetGetProgressUpdate() {
+ return EncodingUtils.testBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID);
+ }
+
+ public void setGetProgressUpdateIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case OPERATION_HANDLE:
@@ -177,6 +213,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
}
break;
+ case GET_PROGRESS_UPDATE:
+ if (value == null) {
+ unsetGetProgressUpdate();
+ } else {
+ setGetProgressUpdate((Boolean)value);
+ }
+ break;
+
}
}
@@ -185,6 +229,9 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
case OPERATION_HANDLE:
return getOperationHandle();
+ case GET_PROGRESS_UPDATE:
+ return isGetProgressUpdate();
+
}
throw new IllegalStateException();
}
@@ -198,6 +245,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
switch (field) {
case OPERATION_HANDLE:
return isSetOperationHandle();
+ case GET_PROGRESS_UPDATE:
+ return isSetGetProgressUpdate();
}
throw new IllegalStateException();
}
@@ -224,6 +273,15 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
return false;
}
+ boolean this_present_getProgressUpdate = true && this.isSetGetProgressUpdate();
+ boolean that_present_getProgressUpdate = true && that.isSetGetProgressUpdate();
+ if (this_present_getProgressUpdate || that_present_getProgressUpdate) {
+ if (!(this_present_getProgressUpdate && that_present_getProgressUpdate))
+ return false;
+ if (this.getProgressUpdate != that.getProgressUpdate)
+ return false;
+ }
+
return true;
}
@@ -236,6 +294,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
if (present_operationHandle)
list.add(operationHandle);
+ boolean present_getProgressUpdate = true && (isSetGetProgressUpdate());
+ list.add(present_getProgressUpdate);
+ if (present_getProgressUpdate)
+ list.add(getProgressUpdate);
+
return list.hashCode();
}
@@ -257,6 +320,16 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetGetProgressUpdate()).compareTo(other.isSetGetProgressUpdate());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetGetProgressUpdate()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.getProgressUpdate, other.getProgressUpdate);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -284,6 +357,12 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
sb.append(this.operationHandle);
}
first = false;
+ if (isSetGetProgressUpdate()) {
+ if (!first) sb.append(", ");
+ sb.append("getProgressUpdate:");
+ sb.append(this.getProgressUpdate);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -310,6 +389,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -343,6 +424,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 2: // GET_PROGRESS_UPDATE
+ if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+ struct.getProgressUpdate = iprot.readBool();
+ struct.setGetProgressUpdateIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -361,6 +450,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
struct.operationHandle.write(oprot);
oprot.writeFieldEnd();
}
+ if (struct.isSetGetProgressUpdate()) {
+ oprot.writeFieldBegin(GET_PROGRESS_UPDATE_FIELD_DESC);
+ oprot.writeBool(struct.getProgressUpdate);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -379,6 +473,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
public void write(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatusReq struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
struct.operationHandle.write(oprot);
+ BitSet optionals = new BitSet();
+ if (struct.isSetGetProgressUpdate()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetGetProgressUpdate()) {
+ oprot.writeBool(struct.getProgressUpdate);
+ }
}
@Override
@@ -387,6 +489,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
struct.operationHandle = new TOperationHandle();
struct.operationHandle.read(iprot);
struct.setOperationHandleIsSet(true);
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.getProgressUpdate = iprot.readBool();
+ struct.setGetProgressUpdateIsSet(true);
+ }
}
}
[2/4] hive git commit: HIVE-15473: Progress Bar on Beeline client
(Anishek Agarwal via Thejas Nair)
Posted by th...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java
index b981368..dbfbb44 100644
--- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java
+++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java
@@ -47,6 +47,7 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
private static final org.apache.thrift.protocol.TField OPERATION_STARTED_FIELD_DESC = new org.apache.thrift.protocol.TField("operationStarted", org.apache.thrift.protocol.TType.I64, (short)7);
private static final org.apache.thrift.protocol.TField OPERATION_COMPLETED_FIELD_DESC = new org.apache.thrift.protocol.TField("operationCompleted", org.apache.thrift.protocol.TType.I64, (short)8);
private static final org.apache.thrift.protocol.TField HAS_RESULT_SET_FIELD_DESC = new org.apache.thrift.protocol.TField("hasResultSet", org.apache.thrift.protocol.TType.BOOL, (short)9);
+ private static final org.apache.thrift.protocol.TField PROGRESS_UPDATE_RESPONSE_FIELD_DESC = new org.apache.thrift.protocol.TField("progressUpdateResponse", org.apache.thrift.protocol.TType.STRUCT, (short)10);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -63,6 +64,7 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
private long operationStarted; // optional
private long operationCompleted; // optional
private boolean hasResultSet; // optional
+ private TProgressUpdateResp progressUpdateResponse; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -78,7 +80,8 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
TASK_STATUS((short)6, "taskStatus"),
OPERATION_STARTED((short)7, "operationStarted"),
OPERATION_COMPLETED((short)8, "operationCompleted"),
- HAS_RESULT_SET((short)9, "hasResultSet");
+ HAS_RESULT_SET((short)9, "hasResultSet"),
+ PROGRESS_UPDATE_RESPONSE((short)10, "progressUpdateResponse");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -111,6 +114,8 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
return OPERATION_COMPLETED;
case 9: // HAS_RESULT_SET
return HAS_RESULT_SET;
+ case 10: // PROGRESS_UPDATE_RESPONSE
+ return PROGRESS_UPDATE_RESPONSE;
default:
return null;
}
@@ -156,7 +161,7 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
private static final int __OPERATIONCOMPLETED_ISSET_ID = 2;
private static final int __HASRESULTSET_ISSET_ID = 3;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.OPERATION_STATE,_Fields.SQL_STATE,_Fields.ERROR_CODE,_Fields.ERROR_MESSAGE,_Fields.TASK_STATUS,_Fields.OPERATION_STARTED,_Fields.OPERATION_COMPLETED,_Fields.HAS_RESULT_SET};
+ private static final _Fields optionals[] = {_Fields.OPERATION_STATE,_Fields.SQL_STATE,_Fields.ERROR_CODE,_Fields.ERROR_MESSAGE,_Fields.TASK_STATUS,_Fields.OPERATION_STARTED,_Fields.OPERATION_COMPLETED,_Fields.HAS_RESULT_SET,_Fields.PROGRESS_UPDATE_RESPONSE};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -178,6 +183,8 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
tmpMap.put(_Fields.HAS_RESULT_SET, new org.apache.thrift.meta_data.FieldMetaData("hasResultSet", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+ tmpMap.put(_Fields.PROGRESS_UPDATE_RESPONSE, new org.apache.thrift.meta_data.FieldMetaData("progressUpdateResponse", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT , "TProgressUpdateResp")));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TGetOperationStatusResp.class, metaDataMap);
}
@@ -216,6 +223,9 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
this.operationStarted = other.operationStarted;
this.operationCompleted = other.operationCompleted;
this.hasResultSet = other.hasResultSet;
+ if (other.isSetProgressUpdateResponse()) {
+ this.progressUpdateResponse = other.progressUpdateResponse;
+ }
}
public TGetOperationStatusResp deepCopy() {
@@ -237,6 +247,7 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
this.operationCompleted = 0;
setHasResultSetIsSet(false);
this.hasResultSet = false;
+ this.progressUpdateResponse = null;
}
public TStatus getStatus() {
@@ -450,6 +461,29 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __HASRESULTSET_ISSET_ID, value);
}
+ public TProgressUpdateResp getProgressUpdateResponse() {
+ return this.progressUpdateResponse;
+ }
+
+ public void setProgressUpdateResponse(TProgressUpdateResp progressUpdateResponse) {
+ this.progressUpdateResponse = progressUpdateResponse;
+ }
+
+ public void unsetProgressUpdateResponse() {
+ this.progressUpdateResponse = null;
+ }
+
+ /** Returns true if field progressUpdateResponse is set (has been assigned a value) and false otherwise */
+ public boolean isSetProgressUpdateResponse() {
+ return this.progressUpdateResponse != null;
+ }
+
+ public void setProgressUpdateResponseIsSet(boolean value) {
+ if (!value) {
+ this.progressUpdateResponse = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case STATUS:
@@ -524,6 +558,14 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
}
break;
+ case PROGRESS_UPDATE_RESPONSE:
+ if (value == null) {
+ unsetProgressUpdateResponse();
+ } else {
+ setProgressUpdateResponse((TProgressUpdateResp)value);
+ }
+ break;
+
}
}
@@ -556,6 +598,9 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
case HAS_RESULT_SET:
return isHasResultSet();
+ case PROGRESS_UPDATE_RESPONSE:
+ return getProgressUpdateResponse();
+
}
throw new IllegalStateException();
}
@@ -585,6 +630,8 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
return isSetOperationCompleted();
case HAS_RESULT_SET:
return isSetHasResultSet();
+ case PROGRESS_UPDATE_RESPONSE:
+ return isSetProgressUpdateResponse();
}
throw new IllegalStateException();
}
@@ -683,6 +730,15 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
return false;
}
+ boolean this_present_progressUpdateResponse = true && this.isSetProgressUpdateResponse();
+ boolean that_present_progressUpdateResponse = true && that.isSetProgressUpdateResponse();
+ if (this_present_progressUpdateResponse || that_present_progressUpdateResponse) {
+ if (!(this_present_progressUpdateResponse && that_present_progressUpdateResponse))
+ return false;
+ if (!this.progressUpdateResponse.equals(that.progressUpdateResponse))
+ return false;
+ }
+
return true;
}
@@ -735,6 +791,11 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
if (present_hasResultSet)
list.add(hasResultSet);
+ boolean present_progressUpdateResponse = true && (isSetProgressUpdateResponse());
+ list.add(present_progressUpdateResponse);
+ if (present_progressUpdateResponse)
+ list.add(progressUpdateResponse);
+
return list.hashCode();
}
@@ -836,6 +897,16 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetProgressUpdateResponse()).compareTo(other.isSetProgressUpdateResponse());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetProgressUpdateResponse()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.progressUpdateResponse, other.progressUpdateResponse);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -927,6 +998,16 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
sb.append(this.hasResultSet);
first = false;
}
+ if (isSetProgressUpdateResponse()) {
+ if (!first) sb.append(", ");
+ sb.append("progressUpdateResponse:");
+ if (this.progressUpdateResponse == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.progressUpdateResponse);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -1052,6 +1133,15 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 10: // PROGRESS_UPDATE_RESPONSE
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.progressUpdateResponse = new TProgressUpdateResp();
+ struct.progressUpdateResponse.read(iprot);
+ struct.setProgressUpdateResponseIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -1118,6 +1208,13 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
oprot.writeBool(struct.hasResultSet);
oprot.writeFieldEnd();
}
+ if (struct.progressUpdateResponse != null) {
+ if (struct.isSetProgressUpdateResponse()) {
+ oprot.writeFieldBegin(PROGRESS_UPDATE_RESPONSE_FIELD_DESC);
+ struct.progressUpdateResponse.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1161,7 +1258,10 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
if (struct.isSetHasResultSet()) {
optionals.set(7);
}
- oprot.writeBitSet(optionals, 8);
+ if (struct.isSetProgressUpdateResponse()) {
+ optionals.set(8);
+ }
+ oprot.writeBitSet(optionals, 9);
if (struct.isSetOperationState()) {
oprot.writeI32(struct.operationState.getValue());
}
@@ -1186,6 +1286,9 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
if (struct.isSetHasResultSet()) {
oprot.writeBool(struct.hasResultSet);
}
+ if (struct.isSetProgressUpdateResponse()) {
+ struct.progressUpdateResponse.write(oprot);
+ }
}
@Override
@@ -1194,7 +1297,7 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
struct.status = new TStatus();
struct.status.read(iprot);
struct.setStatusIsSet(true);
- BitSet incoming = iprot.readBitSet(8);
+ BitSet incoming = iprot.readBitSet(9);
if (incoming.get(0)) {
struct.operationState = org.apache.hive.service.rpc.thrift.TOperationState.findByValue(iprot.readI32());
struct.setOperationStateIsSet(true);
@@ -1227,6 +1330,11 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
struct.hasResultSet = iprot.readBool();
struct.setHasResultSetIsSet(true);
}
+ if (incoming.get(8)) {
+ struct.progressUpdateResponse = new TProgressUpdateResp();
+ struct.progressUpdateResponse.read(iprot);
+ struct.setProgressUpdateResponseIsSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TJobExecutionStatus.java
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TJobExecutionStatus.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TJobExecutionStatus.java
new file mode 100644
index 0000000..b39f208
--- /dev/null
+++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TJobExecutionStatus.java
@@ -0,0 +1,48 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.hive.service.rpc.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum TJobExecutionStatus implements org.apache.thrift.TEnum {
+ IN_PROGRESS(0),
+ COMPLETE(1),
+ NOT_AVAILABLE(2);
+
+ private final int value;
+
+ private TJobExecutionStatus(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static TJobExecutionStatus findByValue(int value) {
+ switch (value) {
+ case 0:
+ return IN_PROGRESS;
+ case 1:
+ return COMPLETE;
+ case 2:
+ return NOT_AVAILABLE;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java
new file mode 100644
index 0000000..ecc413a
--- /dev/null
+++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java
@@ -0,0 +1,1033 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.hive.service.rpc.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)")
+public class TProgressUpdateResp implements org.apache.thrift.TBase<TProgressUpdateResp, TProgressUpdateResp._Fields>, java.io.Serializable, Cloneable, Comparable<TProgressUpdateResp> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TProgressUpdateResp");
+
+ private static final org.apache.thrift.protocol.TField HEADER_NAMES_FIELD_DESC = new org.apache.thrift.protocol.TField("headerNames", org.apache.thrift.protocol.TType.LIST, (short)1);
+ private static final org.apache.thrift.protocol.TField ROWS_FIELD_DESC = new org.apache.thrift.protocol.TField("rows", org.apache.thrift.protocol.TType.LIST, (short)2);
+ private static final org.apache.thrift.protocol.TField PROGRESSED_PERCENTAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("progressedPercentage", org.apache.thrift.protocol.TType.DOUBLE, (short)3);
+ private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.I32, (short)4);
+ private static final org.apache.thrift.protocol.TField FOOTER_SUMMARY_FIELD_DESC = new org.apache.thrift.protocol.TField("footerSummary", org.apache.thrift.protocol.TType.STRING, (short)5);
+ private static final org.apache.thrift.protocol.TField START_TIME_FIELD_DESC = new org.apache.thrift.protocol.TField("startTime", org.apache.thrift.protocol.TType.I64, (short)6);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new TProgressUpdateRespStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new TProgressUpdateRespTupleSchemeFactory());
+ }
+
+ private List<String> headerNames; // required
+ private List<List<String>> rows; // required
+ private double progressedPercentage; // required
+ private TJobExecutionStatus status; // required
+ private String footerSummary; // required
+ private long startTime; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ HEADER_NAMES((short)1, "headerNames"),
+ ROWS((short)2, "rows"),
+ PROGRESSED_PERCENTAGE((short)3, "progressedPercentage"),
+ /**
+ *
+ * @see TJobExecutionStatus
+ */
+ STATUS((short)4, "status"),
+ FOOTER_SUMMARY((short)5, "footerSummary"),
+ START_TIME((short)6, "startTime");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // HEADER_NAMES
+ return HEADER_NAMES;
+ case 2: // ROWS
+ return ROWS;
+ case 3: // PROGRESSED_PERCENTAGE
+ return PROGRESSED_PERCENTAGE;
+ case 4: // STATUS
+ return STATUS;
+ case 5: // FOOTER_SUMMARY
+ return FOOTER_SUMMARY;
+ case 6: // START_TIME
+ return START_TIME;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __PROGRESSEDPERCENTAGE_ISSET_ID = 0;
+ private static final int __STARTTIME_ISSET_ID = 1;
+ private byte __isset_bitfield = 0;
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.HEADER_NAMES, new org.apache.thrift.meta_data.FieldMetaData("headerNames", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+ tmpMap.put(_Fields.ROWS, new org.apache.thrift.meta_data.FieldMetaData("rows", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))));
+ tmpMap.put(_Fields.PROGRESSED_PERCENTAGE, new org.apache.thrift.meta_data.FieldMetaData("progressedPercentage", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
+ tmpMap.put(_Fields.STATUS, new org.apache.thrift.meta_data.FieldMetaData("status", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TJobExecutionStatus.class)));
+ tmpMap.put(_Fields.FOOTER_SUMMARY, new org.apache.thrift.meta_data.FieldMetaData("footerSummary", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.START_TIME, new org.apache.thrift.meta_data.FieldMetaData("startTime", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TProgressUpdateResp.class, metaDataMap);
+ }
+
+ public TProgressUpdateResp() {
+ }
+
+ public TProgressUpdateResp(
+ List<String> headerNames,
+ List<List<String>> rows,
+ double progressedPercentage,
+ TJobExecutionStatus status,
+ String footerSummary,
+ long startTime)
+ {
+ this();
+ this.headerNames = headerNames;
+ this.rows = rows;
+ this.progressedPercentage = progressedPercentage;
+ setProgressedPercentageIsSet(true);
+ this.status = status;
+ this.footerSummary = footerSummary;
+ this.startTime = startTime;
+ setStartTimeIsSet(true);
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public TProgressUpdateResp(TProgressUpdateResp other) {
+ __isset_bitfield = other.__isset_bitfield;
+ if (other.isSetHeaderNames()) {
+ List<String> __this__headerNames = new ArrayList<String>(other.headerNames);
+ this.headerNames = __this__headerNames;
+ }
+ if (other.isSetRows()) {
+ List<List<String>> __this__rows = new ArrayList<List<String>>(other.rows.size());
+ for (List<String> other_element : other.rows) {
+ List<String> __this__rows_copy = new ArrayList<String>(other_element);
+ __this__rows.add(__this__rows_copy);
+ }
+ this.rows = __this__rows;
+ }
+ this.progressedPercentage = other.progressedPercentage;
+ if (other.isSetStatus()) {
+ this.status = other.status;
+ }
+ if (other.isSetFooterSummary()) {
+ this.footerSummary = other.footerSummary;
+ }
+ this.startTime = other.startTime;
+ }
+
+ public TProgressUpdateResp deepCopy() {
+ return new TProgressUpdateResp(this);
+ }
+
+ @Override
+ public void clear() {
+ this.headerNames = null;
+ this.rows = null;
+ setProgressedPercentageIsSet(false);
+ this.progressedPercentage = 0.0;
+ this.status = null;
+ this.footerSummary = null;
+ setStartTimeIsSet(false);
+ this.startTime = 0;
+ }
+
+ public int getHeaderNamesSize() {
+ return (this.headerNames == null) ? 0 : this.headerNames.size();
+ }
+
+ public java.util.Iterator<String> getHeaderNamesIterator() {
+ return (this.headerNames == null) ? null : this.headerNames.iterator();
+ }
+
+ public void addToHeaderNames(String elem) {
+ if (this.headerNames == null) {
+ this.headerNames = new ArrayList<String>();
+ }
+ this.headerNames.add(elem);
+ }
+
+ public List<String> getHeaderNames() {
+ return this.headerNames;
+ }
+
+ public void setHeaderNames(List<String> headerNames) {
+ this.headerNames = headerNames;
+ }
+
+ public void unsetHeaderNames() {
+ this.headerNames = null;
+ }
+
+ /** Returns true if field headerNames is set (has been assigned a value) and false otherwise */
+ public boolean isSetHeaderNames() {
+ return this.headerNames != null;
+ }
+
+ public void setHeaderNamesIsSet(boolean value) {
+ if (!value) {
+ this.headerNames = null;
+ }
+ }
+
+ public int getRowsSize() {
+ return (this.rows == null) ? 0 : this.rows.size();
+ }
+
+ public java.util.Iterator<List<String>> getRowsIterator() {
+ return (this.rows == null) ? null : this.rows.iterator();
+ }
+
+ public void addToRows(List<String> elem) {
+ if (this.rows == null) {
+ this.rows = new ArrayList<List<String>>();
+ }
+ this.rows.add(elem);
+ }
+
+ public List<List<String>> getRows() {
+ return this.rows;
+ }
+
+ public void setRows(List<List<String>> rows) {
+ this.rows = rows;
+ }
+
+ public void unsetRows() {
+ this.rows = null;
+ }
+
+ /** Returns true if field rows is set (has been assigned a value) and false otherwise */
+ public boolean isSetRows() {
+ return this.rows != null;
+ }
+
+ public void setRowsIsSet(boolean value) {
+ if (!value) {
+ this.rows = null;
+ }
+ }
+
+ public double getProgressedPercentage() {
+ return this.progressedPercentage;
+ }
+
+ public void setProgressedPercentage(double progressedPercentage) {
+ this.progressedPercentage = progressedPercentage;
+ setProgressedPercentageIsSet(true);
+ }
+
+ public void unsetProgressedPercentage() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PROGRESSEDPERCENTAGE_ISSET_ID);
+ }
+
+ /** Returns true if field progressedPercentage is set (has been assigned a value) and false otherwise */
+ public boolean isSetProgressedPercentage() {
+ return EncodingUtils.testBit(__isset_bitfield, __PROGRESSEDPERCENTAGE_ISSET_ID);
+ }
+
+ public void setProgressedPercentageIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PROGRESSEDPERCENTAGE_ISSET_ID, value);
+ }
+
+ /**
+ *
+ * @see TJobExecutionStatus
+ */
+ public TJobExecutionStatus getStatus() {
+ return this.status;
+ }
+
+ /**
+ *
+ * @see TJobExecutionStatus
+ */
+ public void setStatus(TJobExecutionStatus status) {
+ this.status = status;
+ }
+
+ public void unsetStatus() {
+ this.status = null;
+ }
+
+ /** Returns true if field status is set (has been assigned a value) and false otherwise */
+ public boolean isSetStatus() {
+ return this.status != null;
+ }
+
+ public void setStatusIsSet(boolean value) {
+ if (!value) {
+ this.status = null;
+ }
+ }
+
+ public String getFooterSummary() {
+ return this.footerSummary;
+ }
+
+ public void setFooterSummary(String footerSummary) {
+ this.footerSummary = footerSummary;
+ }
+
+ public void unsetFooterSummary() {
+ this.footerSummary = null;
+ }
+
+ /** Returns true if field footerSummary is set (has been assigned a value) and false otherwise */
+ public boolean isSetFooterSummary() {
+ return this.footerSummary != null;
+ }
+
+ public void setFooterSummaryIsSet(boolean value) {
+ if (!value) {
+ this.footerSummary = null;
+ }
+ }
+
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ setStartTimeIsSet(true);
+ }
+
+ public void unsetStartTime() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __STARTTIME_ISSET_ID);
+ }
+
+ /** Returns true if field startTime is set (has been assigned a value) and false otherwise */
+ public boolean isSetStartTime() {
+ return EncodingUtils.testBit(__isset_bitfield, __STARTTIME_ISSET_ID);
+ }
+
+ public void setStartTimeIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __STARTTIME_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case HEADER_NAMES:
+ if (value == null) {
+ unsetHeaderNames();
+ } else {
+ setHeaderNames((List<String>)value);
+ }
+ break;
+
+ case ROWS:
+ if (value == null) {
+ unsetRows();
+ } else {
+ setRows((List<List<String>>)value);
+ }
+ break;
+
+ case PROGRESSED_PERCENTAGE:
+ if (value == null) {
+ unsetProgressedPercentage();
+ } else {
+ setProgressedPercentage((Double)value);
+ }
+ break;
+
+ case STATUS:
+ if (value == null) {
+ unsetStatus();
+ } else {
+ setStatus((TJobExecutionStatus)value);
+ }
+ break;
+
+ case FOOTER_SUMMARY:
+ if (value == null) {
+ unsetFooterSummary();
+ } else {
+ setFooterSummary((String)value);
+ }
+ break;
+
+ case START_TIME:
+ if (value == null) {
+ unsetStartTime();
+ } else {
+ setStartTime((Long)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case HEADER_NAMES:
+ return getHeaderNames();
+
+ case ROWS:
+ return getRows();
+
+ case PROGRESSED_PERCENTAGE:
+ return getProgressedPercentage();
+
+ case STATUS:
+ return getStatus();
+
+ case FOOTER_SUMMARY:
+ return getFooterSummary();
+
+ case START_TIME:
+ return getStartTime();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case HEADER_NAMES:
+ return isSetHeaderNames();
+ case ROWS:
+ return isSetRows();
+ case PROGRESSED_PERCENTAGE:
+ return isSetProgressedPercentage();
+ case STATUS:
+ return isSetStatus();
+ case FOOTER_SUMMARY:
+ return isSetFooterSummary();
+ case START_TIME:
+ return isSetStartTime();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof TProgressUpdateResp)
+ return this.equals((TProgressUpdateResp)that);
+ return false;
+ }
+
+ public boolean equals(TProgressUpdateResp that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_headerNames = true && this.isSetHeaderNames();
+ boolean that_present_headerNames = true && that.isSetHeaderNames();
+ if (this_present_headerNames || that_present_headerNames) {
+ if (!(this_present_headerNames && that_present_headerNames))
+ return false;
+ if (!this.headerNames.equals(that.headerNames))
+ return false;
+ }
+
+ boolean this_present_rows = true && this.isSetRows();
+ boolean that_present_rows = true && that.isSetRows();
+ if (this_present_rows || that_present_rows) {
+ if (!(this_present_rows && that_present_rows))
+ return false;
+ if (!this.rows.equals(that.rows))
+ return false;
+ }
+
+ boolean this_present_progressedPercentage = true;
+ boolean that_present_progressedPercentage = true;
+ if (this_present_progressedPercentage || that_present_progressedPercentage) {
+ if (!(this_present_progressedPercentage && that_present_progressedPercentage))
+ return false;
+ if (this.progressedPercentage != that.progressedPercentage)
+ return false;
+ }
+
+ boolean this_present_status = true && this.isSetStatus();
+ boolean that_present_status = true && that.isSetStatus();
+ if (this_present_status || that_present_status) {
+ if (!(this_present_status && that_present_status))
+ return false;
+ if (!this.status.equals(that.status))
+ return false;
+ }
+
+ boolean this_present_footerSummary = true && this.isSetFooterSummary();
+ boolean that_present_footerSummary = true && that.isSetFooterSummary();
+ if (this_present_footerSummary || that_present_footerSummary) {
+ if (!(this_present_footerSummary && that_present_footerSummary))
+ return false;
+ if (!this.footerSummary.equals(that.footerSummary))
+ return false;
+ }
+
+ boolean this_present_startTime = true;
+ boolean that_present_startTime = true;
+ if (this_present_startTime || that_present_startTime) {
+ if (!(this_present_startTime && that_present_startTime))
+ return false;
+ if (this.startTime != that.startTime)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_headerNames = true && (isSetHeaderNames());
+ list.add(present_headerNames);
+ if (present_headerNames)
+ list.add(headerNames);
+
+ boolean present_rows = true && (isSetRows());
+ list.add(present_rows);
+ if (present_rows)
+ list.add(rows);
+
+ boolean present_progressedPercentage = true;
+ list.add(present_progressedPercentage);
+ if (present_progressedPercentage)
+ list.add(progressedPercentage);
+
+ boolean present_status = true && (isSetStatus());
+ list.add(present_status);
+ if (present_status)
+ list.add(status.getValue());
+
+ boolean present_footerSummary = true && (isSetFooterSummary());
+ list.add(present_footerSummary);
+ if (present_footerSummary)
+ list.add(footerSummary);
+
+ boolean present_startTime = true;
+ list.add(present_startTime);
+ if (present_startTime)
+ list.add(startTime);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(TProgressUpdateResp other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetHeaderNames()).compareTo(other.isSetHeaderNames());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetHeaderNames()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.headerNames, other.headerNames);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetRows()).compareTo(other.isSetRows());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRows()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.rows, other.rows);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetProgressedPercentage()).compareTo(other.isSetProgressedPercentage());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetProgressedPercentage()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.progressedPercentage, other.progressedPercentage);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetStatus()).compareTo(other.isSetStatus());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetStatus()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.status, other.status);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetFooterSummary()).compareTo(other.isSetFooterSummary());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetFooterSummary()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.footerSummary, other.footerSummary);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetStartTime()).compareTo(other.isSetStartTime());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetStartTime()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.startTime, other.startTime);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("TProgressUpdateResp(");
+ boolean first = true;
+
+ sb.append("headerNames:");
+ if (this.headerNames == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.headerNames);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("rows:");
+ if (this.rows == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.rows);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("progressedPercentage:");
+ sb.append(this.progressedPercentage);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("status:");
+ if (this.status == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.status);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("footerSummary:");
+ if (this.footerSummary == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.footerSummary);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("startTime:");
+ sb.append(this.startTime);
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!isSetHeaderNames()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'headerNames' is unset! Struct:" + toString());
+ }
+
+ if (!isSetRows()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'rows' is unset! Struct:" + toString());
+ }
+
+ if (!isSetProgressedPercentage()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'progressedPercentage' is unset! Struct:" + toString());
+ }
+
+ if (!isSetStatus()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'status' is unset! Struct:" + toString());
+ }
+
+ if (!isSetFooterSummary()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'footerSummary' is unset! Struct:" + toString());
+ }
+
+ if (!isSetStartTime()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'startTime' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class TProgressUpdateRespStandardSchemeFactory implements SchemeFactory {
+ public TProgressUpdateRespStandardScheme getScheme() {
+ return new TProgressUpdateRespStandardScheme();
+ }
+ }
+
+ private static class TProgressUpdateRespStandardScheme extends StandardScheme<TProgressUpdateResp> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, TProgressUpdateResp struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // HEADER_NAMES
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list180 = iprot.readListBegin();
+ struct.headerNames = new ArrayList<String>(_list180.size);
+ String _elem181;
+ for (int _i182 = 0; _i182 < _list180.size; ++_i182)
+ {
+ _elem181 = iprot.readString();
+ struct.headerNames.add(_elem181);
+ }
+ iprot.readListEnd();
+ }
+ struct.setHeaderNamesIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // ROWS
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list183 = iprot.readListBegin();
+ struct.rows = new ArrayList<List<String>>(_list183.size);
+ List<String> _elem184;
+ for (int _i185 = 0; _i185 < _list183.size; ++_i185)
+ {
+ {
+ org.apache.thrift.protocol.TList _list186 = iprot.readListBegin();
+ _elem184 = new ArrayList<String>(_list186.size);
+ String _elem187;
+ for (int _i188 = 0; _i188 < _list186.size; ++_i188)
+ {
+ _elem187 = iprot.readString();
+ _elem184.add(_elem187);
+ }
+ iprot.readListEnd();
+ }
+ struct.rows.add(_elem184);
+ }
+ iprot.readListEnd();
+ }
+ struct.setRowsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // PROGRESSED_PERCENTAGE
+ if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
+ struct.progressedPercentage = iprot.readDouble();
+ struct.setProgressedPercentageIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 4: // STATUS
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.status = org.apache.hive.service.rpc.thrift.TJobExecutionStatus.findByValue(iprot.readI32());
+ struct.setStatusIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 5: // FOOTER_SUMMARY
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.footerSummary = iprot.readString();
+ struct.setFooterSummaryIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 6: // START_TIME
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.startTime = iprot.readI64();
+ struct.setStartTimeIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, TProgressUpdateResp struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.headerNames != null) {
+ oprot.writeFieldBegin(HEADER_NAMES_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.headerNames.size()));
+ for (String _iter189 : struct.headerNames)
+ {
+ oprot.writeString(_iter189);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ if (struct.rows != null) {
+ oprot.writeFieldBegin(ROWS_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.LIST, struct.rows.size()));
+ for (List<String> _iter190 : struct.rows)
+ {
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, _iter190.size()));
+ for (String _iter191 : _iter190)
+ {
+ oprot.writeString(_iter191);
+ }
+ oprot.writeListEnd();
+ }
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldBegin(PROGRESSED_PERCENTAGE_FIELD_DESC);
+ oprot.writeDouble(struct.progressedPercentage);
+ oprot.writeFieldEnd();
+ if (struct.status != null) {
+ oprot.writeFieldBegin(STATUS_FIELD_DESC);
+ oprot.writeI32(struct.status.getValue());
+ oprot.writeFieldEnd();
+ }
+ if (struct.footerSummary != null) {
+ oprot.writeFieldBegin(FOOTER_SUMMARY_FIELD_DESC);
+ oprot.writeString(struct.footerSummary);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldBegin(START_TIME_FIELD_DESC);
+ oprot.writeI64(struct.startTime);
+ oprot.writeFieldEnd();
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class TProgressUpdateRespTupleSchemeFactory implements SchemeFactory {
+ public TProgressUpdateRespTupleScheme getScheme() {
+ return new TProgressUpdateRespTupleScheme();
+ }
+ }
+
+ private static class TProgressUpdateRespTupleScheme extends TupleScheme<TProgressUpdateResp> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, TProgressUpdateResp struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ {
+ oprot.writeI32(struct.headerNames.size());
+ for (String _iter192 : struct.headerNames)
+ {
+ oprot.writeString(_iter192);
+ }
+ }
+ {
+ oprot.writeI32(struct.rows.size());
+ for (List<String> _iter193 : struct.rows)
+ {
+ {
+ oprot.writeI32(_iter193.size());
+ for (String _iter194 : _iter193)
+ {
+ oprot.writeString(_iter194);
+ }
+ }
+ }
+ }
+ oprot.writeDouble(struct.progressedPercentage);
+ oprot.writeI32(struct.status.getValue());
+ oprot.writeString(struct.footerSummary);
+ oprot.writeI64(struct.startTime);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, TProgressUpdateResp struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ {
+ org.apache.thrift.protocol.TList _list195 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+ struct.headerNames = new ArrayList<String>(_list195.size);
+ String _elem196;
+ for (int _i197 = 0; _i197 < _list195.size; ++_i197)
+ {
+ _elem196 = iprot.readString();
+ struct.headerNames.add(_elem196);
+ }
+ }
+ struct.setHeaderNamesIsSet(true);
+ {
+ org.apache.thrift.protocol.TList _list198 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.LIST, iprot.readI32());
+ struct.rows = new ArrayList<List<String>>(_list198.size);
+ List<String> _elem199;
+ for (int _i200 = 0; _i200 < _list198.size; ++_i200)
+ {
+ {
+ org.apache.thrift.protocol.TList _list201 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+ _elem199 = new ArrayList<String>(_list201.size);
+ String _elem202;
+ for (int _i203 = 0; _i203 < _list201.size; ++_i203)
+ {
+ _elem202 = iprot.readString();
+ _elem199.add(_elem202);
+ }
+ }
+ struct.rows.add(_elem199);
+ }
+ }
+ struct.setRowsIsSet(true);
+ struct.progressedPercentage = iprot.readDouble();
+ struct.setProgressedPercentageIsSet(true);
+ struct.status = org.apache.hive.service.rpc.thrift.TJobExecutionStatus.findByValue(iprot.readI32());
+ struct.setStatusIsSet(true);
+ struct.footerSummary = iprot.readString();
+ struct.setFooterSummaryIsSet(true);
+ struct.startTime = iprot.readI64();
+ struct.setStartTimeIsSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-php/Types.php b/service-rpc/src/gen/thrift/gen-php/Types.php
index 786c773..29b14d9 100644
--- a/service-rpc/src/gen/thrift/gen-php/Types.php
+++ b/service-rpc/src/gen/thrift/gen-php/Types.php
@@ -264,6 +264,17 @@ final class TFetchOrientation {
);
}
+final class TJobExecutionStatus {
+ const IN_PROGRESS = 0;
+ const COMPLETE = 1;
+ const NOT_AVAILABLE = 2;
+ static public $__names = array(
+ 0 => 'IN_PROGRESS',
+ 1 => 'COMPLETE',
+ 2 => 'NOT_AVAILABLE',
+ );
+}
+
class TTypeQualifierValue {
static $_TSPEC;
@@ -7976,6 +7987,10 @@ class TGetOperationStatusReq {
* @var \TOperationHandle
*/
public $operationHandle = null;
+ /**
+ * @var bool
+ */
+ public $getProgressUpdate = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -7985,12 +8000,19 @@ class TGetOperationStatusReq {
'type' => TType::STRUCT,
'class' => '\TOperationHandle',
),
+ 2 => array(
+ 'var' => 'getProgressUpdate',
+ 'type' => TType::BOOL,
+ ),
);
}
if (is_array($vals)) {
if (isset($vals['operationHandle'])) {
$this->operationHandle = $vals['operationHandle'];
}
+ if (isset($vals['getProgressUpdate'])) {
+ $this->getProgressUpdate = $vals['getProgressUpdate'];
+ }
}
}
@@ -8021,6 +8043,13 @@ class TGetOperationStatusReq {
$xfer += $input->skip($ftype);
}
break;
+ case 2:
+ if ($ftype == TType::BOOL) {
+ $xfer += $input->readBool($this->getProgressUpdate);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -8042,6 +8071,11 @@ class TGetOperationStatusReq {
$xfer += $this->operationHandle->write($output);
$xfer += $output->writeFieldEnd();
}
+ if ($this->getProgressUpdate !== null) {
+ $xfer += $output->writeFieldBegin('getProgressUpdate', TType::BOOL, 2);
+ $xfer += $output->writeBool($this->getProgressUpdate);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
@@ -8088,6 +8122,10 @@ class TGetOperationStatusResp {
* @var bool
*/
public $hasResultSet = null;
+ /**
+ * @var \TProgressUpdateResp
+ */
+ public $progressUpdateResponse = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -8129,6 +8167,11 @@ class TGetOperationStatusResp {
'var' => 'hasResultSet',
'type' => TType::BOOL,
),
+ 10 => array(
+ 'var' => 'progressUpdateResponse',
+ 'type' => TType::STRUCT,
+ 'class' => '\TProgressUpdateResp',
+ ),
);
}
if (is_array($vals)) {
@@ -8159,6 +8202,9 @@ class TGetOperationStatusResp {
if (isset($vals['hasResultSet'])) {
$this->hasResultSet = $vals['hasResultSet'];
}
+ if (isset($vals['progressUpdateResponse'])) {
+ $this->progressUpdateResponse = $vals['progressUpdateResponse'];
+ }
}
}
@@ -8245,6 +8291,14 @@ class TGetOperationStatusResp {
$xfer += $input->skip($ftype);
}
break;
+ case 10:
+ if ($ftype == TType::STRUCT) {
+ $this->progressUpdateResponse = new \TProgressUpdateResp();
+ $xfer += $this->progressUpdateResponse->read($input);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -8306,6 +8360,14 @@ class TGetOperationStatusResp {
$xfer += $output->writeBool($this->hasResultSet);
$xfer += $output->writeFieldEnd();
}
+ if ($this->progressUpdateResponse !== null) {
+ if (!is_object($this->progressUpdateResponse)) {
+ throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
+ }
+ $xfer += $output->writeFieldBegin('progressUpdateResponse', TType::STRUCT, 10);
+ $xfer += $this->progressUpdateResponse->write($output);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
@@ -9696,6 +9758,271 @@ class TRenewDelegationTokenResp {
}
+class TProgressUpdateResp {
+ static $_TSPEC;
+
+ /**
+ * @var string[]
+ */
+ public $headerNames = null;
+ /**
+ * @var (string[])[]
+ */
+ public $rows = null;
+ /**
+ * @var double
+ */
+ public $progressedPercentage = null;
+ /**
+ * @var int
+ */
+ public $status = null;
+ /**
+ * @var string
+ */
+ public $footerSummary = null;
+ /**
+ * @var int
+ */
+ public $startTime = null;
+
+ public function __construct($vals=null) {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ 1 => array(
+ 'var' => 'headerNames',
+ 'type' => TType::LST,
+ 'etype' => TType::STRING,
+ 'elem' => array(
+ 'type' => TType::STRING,
+ ),
+ ),
+ 2 => array(
+ 'var' => 'rows',
+ 'type' => TType::LST,
+ 'etype' => TType::LST,
+ 'elem' => array(
+ 'type' => TType::LST,
+ 'etype' => TType::STRING,
+ 'elem' => array(
+ 'type' => TType::STRING,
+ ),
+ ),
+ ),
+ 3 => array(
+ 'var' => 'progressedPercentage',
+ 'type' => TType::DOUBLE,
+ ),
+ 4 => array(
+ 'var' => 'status',
+ 'type' => TType::I32,
+ ),
+ 5 => array(
+ 'var' => 'footerSummary',
+ 'type' => TType::STRING,
+ ),
+ 6 => array(
+ 'var' => 'startTime',
+ 'type' => TType::I64,
+ ),
+ );
+ }
+ if (is_array($vals)) {
+ if (isset($vals['headerNames'])) {
+ $this->headerNames = $vals['headerNames'];
+ }
+ if (isset($vals['rows'])) {
+ $this->rows = $vals['rows'];
+ }
+ if (isset($vals['progressedPercentage'])) {
+ $this->progressedPercentage = $vals['progressedPercentage'];
+ }
+ if (isset($vals['status'])) {
+ $this->status = $vals['status'];
+ }
+ if (isset($vals['footerSummary'])) {
+ $this->footerSummary = $vals['footerSummary'];
+ }
+ if (isset($vals['startTime'])) {
+ $this->startTime = $vals['startTime'];
+ }
+ }
+ }
+
+ public function getName() {
+ return 'TProgressUpdateResp';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ case 1:
+ if ($ftype == TType::LST) {
+ $this->headerNames = array();
+ $_size159 = 0;
+ $_etype162 = 0;
+ $xfer += $input->readListBegin($_etype162, $_size159);
+ for ($_i163 = 0; $_i163 < $_size159; ++$_i163)
+ {
+ $elem164 = null;
+ $xfer += $input->readString($elem164);
+ $this->headerNames []= $elem164;
+ }
+ $xfer += $input->readListEnd();
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 2:
+ if ($ftype == TType::LST) {
+ $this->rows = array();
+ $_size165 = 0;
+ $_etype168 = 0;
+ $xfer += $input->readListBegin($_etype168, $_size165);
+ for ($_i169 = 0; $_i169 < $_size165; ++$_i169)
+ {
+ $elem170 = null;
+ $elem170 = array();
+ $_size171 = 0;
+ $_etype174 = 0;
+ $xfer += $input->readListBegin($_etype174, $_size171);
+ for ($_i175 = 0; $_i175 < $_size171; ++$_i175)
+ {
+ $elem176 = null;
+ $xfer += $input->readString($elem176);
+ $elem170 []= $elem176;
+ }
+ $xfer += $input->readListEnd();
+ $this->rows []= $elem170;
+ }
+ $xfer += $input->readListEnd();
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 3:
+ if ($ftype == TType::DOUBLE) {
+ $xfer += $input->readDouble($this->progressedPercentage);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 4:
+ if ($ftype == TType::I32) {
+ $xfer += $input->readI32($this->status);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 5:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->footerSummary);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 6:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->startTime);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('TProgressUpdateResp');
+ if ($this->headerNames !== null) {
+ if (!is_array($this->headerNames)) {
+ throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
+ }
+ $xfer += $output->writeFieldBegin('headerNames', TType::LST, 1);
+ {
+ $output->writeListBegin(TType::STRING, count($this->headerNames));
+ {
+ foreach ($this->headerNames as $iter177)
+ {
+ $xfer += $output->writeString($iter177);
+ }
+ }
+ $output->writeListEnd();
+ }
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->rows !== null) {
+ if (!is_array($this->rows)) {
+ throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
+ }
+ $xfer += $output->writeFieldBegin('rows', TType::LST, 2);
+ {
+ $output->writeListBegin(TType::LST, count($this->rows));
+ {
+ foreach ($this->rows as $iter178)
+ {
+ {
+ $output->writeListBegin(TType::STRING, count($iter178));
+ {
+ foreach ($iter178 as $iter179)
+ {
+ $xfer += $output->writeString($iter179);
+ }
+ }
+ $output->writeListEnd();
+ }
+ }
+ }
+ $output->writeListEnd();
+ }
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->progressedPercentage !== null) {
+ $xfer += $output->writeFieldBegin('progressedPercentage', TType::DOUBLE, 3);
+ $xfer += $output->writeDouble($this->progressedPercentage);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->status !== null) {
+ $xfer += $output->writeFieldBegin('status', TType::I32, 4);
+ $xfer += $output->writeI32($this->status);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->footerSummary !== null) {
+ $xfer += $output->writeFieldBegin('footerSummary', TType::STRING, 5);
+ $xfer += $output->writeString($this->footerSummary);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->startTime !== null) {
+ $xfer += $output->writeFieldBegin('startTime', TType::I64, 6);
+ $xfer += $output->writeI64($this->startTime);
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
final class Constant extends \Thrift\Type\TConstant {
static protected $PRIMITIVE_TYPES;
static protected $COMPLEX_TYPES;
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
index fdf6b1f..4ed2091 100644
--- a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
+++ b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
@@ -393,6 +393,23 @@ class TFetchOrientation:
"FETCH_LAST": 5,
}
+class TJobExecutionStatus:
+ IN_PROGRESS = 0
+ COMPLETE = 1
+ NOT_AVAILABLE = 2
+
+ _VALUES_TO_NAMES = {
+ 0: "IN_PROGRESS",
+ 1: "COMPLETE",
+ 2: "NOT_AVAILABLE",
+ }
+
+ _NAMES_TO_VALUES = {
+ "IN_PROGRESS": 0,
+ "COMPLETE": 1,
+ "NOT_AVAILABLE": 2,
+ }
+
class TTypeQualifierValue:
"""
@@ -6025,15 +6042,18 @@ class TGetOperationStatusReq:
"""
Attributes:
- operationHandle
+ - getProgressUpdate
"""
thrift_spec = (
None, # 0
(1, TType.STRUCT, 'operationHandle', (TOperationHandle, TOperationHandle.thrift_spec), None, ), # 1
+ (2, TType.BOOL, 'getProgressUpdate', None, None, ), # 2
)
- def __init__(self, operationHandle=None,):
+ def __init__(self, operationHandle=None, getProgressUpdate=None,):
self.operationHandle = operationHandle
+ self.getProgressUpdate = getProgressUpdate
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -6050,6 +6070,11 @@ class TGetOperationStatusReq:
self.operationHandle.read(iprot)
else:
iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.BOOL:
+ self.getProgressUpdate = iprot.readBool()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -6064,6 +6089,10 @@ class TGetOperationStatusReq:
oprot.writeFieldBegin('operationHandle', TType.STRUCT, 1)
self.operationHandle.write(oprot)
oprot.writeFieldEnd()
+ if self.getProgressUpdate is not None:
+ oprot.writeFieldBegin('getProgressUpdate', TType.BOOL, 2)
+ oprot.writeBool(self.getProgressUpdate)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -6076,6 +6105,7 @@ class TGetOperationStatusReq:
def __hash__(self):
value = 17
value = (value * 31) ^ hash(self.operationHandle)
+ value = (value * 31) ^ hash(self.getProgressUpdate)
return value
def __repr__(self):
@@ -6101,6 +6131,7 @@ class TGetOperationStatusResp:
- operationStarted
- operationCompleted
- hasResultSet
+ - progressUpdateResponse
"""
thrift_spec = (
@@ -6114,9 +6145,10 @@ class TGetOperationStatusResp:
(7, TType.I64, 'operationStarted', None, None, ), # 7
(8, TType.I64, 'operationCompleted', None, None, ), # 8
(9, TType.BOOL, 'hasResultSet', None, None, ), # 9
+ (10, TType.STRUCT, 'progressUpdateResponse', (TProgressUpdateResp, TProgressUpdateResp.thrift_spec), None, ), # 10
)
- def __init__(self, status=None, operationState=None, sqlState=None, errorCode=None, errorMessage=None, taskStatus=None, operationStarted=None, operationCompleted=None, hasResultSet=None,):
+ def __init__(self, status=None, operationState=None, sqlState=None, errorCode=None, errorMessage=None, taskStatus=None, operationStarted=None, operationCompleted=None, hasResultSet=None, progressUpdateResponse=None,):
self.status = status
self.operationState = operationState
self.sqlState = sqlState
@@ -6126,6 +6158,7 @@ class TGetOperationStatusResp:
self.operationStarted = operationStarted
self.operationCompleted = operationCompleted
self.hasResultSet = hasResultSet
+ self.progressUpdateResponse = progressUpdateResponse
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -6182,6 +6215,12 @@ class TGetOperationStatusResp:
self.hasResultSet = iprot.readBool()
else:
iprot.skip(ftype)
+ elif fid == 10:
+ if ftype == TType.STRUCT:
+ self.progressUpdateResponse = TProgressUpdateResp()
+ self.progressUpdateResponse.read(iprot)
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -6228,6 +6267,10 @@ class TGetOperationStatusResp:
oprot.writeFieldBegin('hasResultSet', TType.BOOL, 9)
oprot.writeBool(self.hasResultSet)
oprot.writeFieldEnd()
+ if self.progressUpdateResponse is not None:
+ oprot.writeFieldBegin('progressUpdateResponse', TType.STRUCT, 10)
+ self.progressUpdateResponse.write(oprot)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -6248,6 +6291,7 @@ class TGetOperationStatusResp:
value = (value * 31) ^ hash(self.operationStarted)
value = (value * 31) ^ hash(self.operationCompleted)
value = (value * 31) ^ hash(self.hasResultSet)
+ value = (value * 31) ^ hash(self.progressUpdateResponse)
return value
def __repr__(self):
@@ -7369,3 +7413,169 @@ class TRenewDelegationTokenResp:
def __ne__(self, other):
return not (self == other)
+
+class TProgressUpdateResp:
+ """
+ Attributes:
+ - headerNames
+ - rows
+ - progressedPercentage
+ - status
+ - footerSummary
+ - startTime
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.LIST, 'headerNames', (TType.STRING,None), None, ), # 1
+ (2, TType.LIST, 'rows', (TType.LIST,(TType.STRING,None)), None, ), # 2
+ (3, TType.DOUBLE, 'progressedPercentage', None, None, ), # 3
+ (4, TType.I32, 'status', None, None, ), # 4
+ (5, TType.STRING, 'footerSummary', None, None, ), # 5
+ (6, TType.I64, 'startTime', None, None, ), # 6
+ )
+
+ def __init__(self, headerNames=None, rows=None, progressedPercentage=None, status=None, footerSummary=None, startTime=None,):
+ self.headerNames = headerNames
+ self.rows = rows
+ self.progressedPercentage = progressedPercentage
+ self.status = status
+ self.footerSummary = footerSummary
+ self.startTime = startTime
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.LIST:
+ self.headerNames = []
+ (_etype162, _size159) = iprot.readListBegin()
+ for _i163 in xrange(_size159):
+ _elem164 = iprot.readString()
+ self.headerNames.append(_elem164)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.LIST:
+ self.rows = []
+ (_etype168, _size165) = iprot.readListBegin()
+ for _i169 in xrange(_size165):
+ _elem170 = []
+ (_etype174, _size171) = iprot.readListBegin()
+ for _i175 in xrange(_size171):
+ _elem176 = iprot.readString()
+ _elem170.append(_elem176)
+ iprot.readListEnd()
+ self.rows.append(_elem170)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.DOUBLE:
+ self.progressedPercentage = iprot.readDouble()
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.I32:
+ self.status = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.STRING:
+ self.footerSummary = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.I64:
+ self.startTime = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('TProgressUpdateResp')
+ if self.headerNames is not None:
+ oprot.writeFieldBegin('headerNames', TType.LIST, 1)
+ oprot.writeListBegin(TType.STRING, len(self.headerNames))
+ for iter177 in self.headerNames:
+ oprot.writeString(iter177)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.rows is not None:
+ oprot.writeFieldBegin('rows', TType.LIST, 2)
+ oprot.writeListBegin(TType.LIST, len(self.rows))
+ for iter178 in self.rows:
+ oprot.writeListBegin(TType.STRING, len(iter178))
+ for iter179 in iter178:
+ oprot.writeString(iter179)
+ oprot.writeListEnd()
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.progressedPercentage is not None:
+ oprot.writeFieldBegin('progressedPercentage', TType.DOUBLE, 3)
+ oprot.writeDouble(self.progressedPercentage)
+ oprot.writeFieldEnd()
+ if self.status is not None:
+ oprot.writeFieldBegin('status', TType.I32, 4)
+ oprot.writeI32(self.status)
+ oprot.writeFieldEnd()
+ if self.footerSummary is not None:
+ oprot.writeFieldBegin('footerSummary', TType.STRING, 5)
+ oprot.writeString(self.footerSummary)
+ oprot.writeFieldEnd()
+ if self.startTime is not None:
+ oprot.writeFieldBegin('startTime', TType.I64, 6)
+ oprot.writeI64(self.startTime)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.headerNames is None:
+ raise TProtocol.TProtocolException(message='Required field headerNames is unset!')
+ if self.rows is None:
+ raise TProtocol.TProtocolException(message='Required field rows is unset!')
+ if self.progressedPercentage is None:
+ raise TProtocol.TProtocolException(message='Required field progressedPercentage is unset!')
+ if self.status is None:
+ raise TProtocol.TProtocolException(message='Required field status is unset!')
+ if self.footerSummary is None:
+ raise TProtocol.TProtocolException(message='Required field footerSummary is unset!')
+ if self.startTime is None:
+ raise TProtocol.TProtocolException(message='Required field startTime is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.headerNames)
+ value = (value * 31) ^ hash(self.rows)
+ value = (value * 31) ^ hash(self.progressedPercentage)
+ value = (value * 31) ^ hash(self.status)
+ value = (value * 31) ^ hash(self.footerSummary)
+ value = (value * 31) ^ hash(self.startTime)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
index 4b1854c..c536a8a 100644
--- a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
+++ b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
@@ -148,6 +148,14 @@ module TFetchOrientation
VALID_VALUES = Set.new([FETCH_NEXT, FETCH_PRIOR, FETCH_RELATIVE, FETCH_ABSOLUTE, FETCH_FIRST, FETCH_LAST]).freeze
end
+module TJobExecutionStatus
+ IN_PROGRESS = 0
+ COMPLETE = 1
+ NOT_AVAILABLE = 2
+ VALUE_MAP = {0 => "IN_PROGRESS", 1 => "COMPLETE", 2 => "NOT_AVAILABLE"}
+ VALID_VALUES = Set.new([IN_PROGRESS, COMPLETE, NOT_AVAILABLE]).freeze
+end
+
class TTypeQualifierValue < ::Thrift::Union
include ::Thrift::Struct_Union
class << self
@@ -1548,9 +1556,11 @@ end
class TGetOperationStatusReq
include ::Thrift::Struct, ::Thrift::Struct_Union
OPERATIONHANDLE = 1
+ GETPROGRESSUPDATE = 2
FIELDS = {
- OPERATIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'operationHandle', :class => ::TOperationHandle}
+ OPERATIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'operationHandle', :class => ::TOperationHandle},
+ GETPROGRESSUPDATE => {:type => ::Thrift::Types::BOOL, :name => 'getProgressUpdate', :optional => true}
}
def struct_fields; FIELDS; end
@@ -1573,6 +1583,7 @@ class TGetOperationStatusResp
OPERATIONSTARTED = 7
OPERATIONCOMPLETED = 8
HASRESULTSET = 9
+ PROGRESSUPDATERESPONSE = 10
FIELDS = {
STATUS => {:type => ::Thrift::Types::STRUCT, :name => 'status', :class => ::TStatus},
@@ -1583,7 +1594,8 @@ class TGetOperationStatusResp
TASKSTATUS => {:type => ::Thrift::Types::STRING, :name => 'taskStatus', :optional => true},
OPERATIONSTARTED => {:type => ::Thrift::Types::I64, :name => 'operationStarted', :optional => true},
OPERATIONCOMPLETED => {:type => ::Thrift::Types::I64, :name => 'operationCompleted', :optional => true},
- HASRESULTSET => {:type => ::Thrift::Types::BOOL, :name => 'hasResultSet', :optional => true}
+ HASRESULTSET => {:type => ::Thrift::Types::BOOL, :name => 'hasResultSet', :optional => true},
+ PROGRESSUPDATERESPONSE => {:type => ::Thrift::Types::STRUCT, :name => 'progressUpdateResponse', :class => ::TProgressUpdateResp, :optional => true}
}
def struct_fields; FIELDS; end
@@ -1867,3 +1879,38 @@ class TRenewDelegationTokenResp
::Thrift::Struct.generate_accessors self
end
+class TProgressUpdateResp
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ HEADERNAMES = 1
+ ROWS = 2
+ PROGRESSEDPERCENTAGE = 3
+ STATUS = 4
+ FOOTERSUMMARY = 5
+ STARTTIME = 6
+
+ FIELDS = {
+ HEADERNAMES => {:type => ::Thrift::Types::LIST, :name => 'headerNames', :element => {:type => ::Thrift::Types::STRING}},
+ ROWS => {:type => ::Thrift::Types::LIST, :name => 'rows', :element => {:type => ::Thrift::Types::LIST, :element => {:type => ::Thrift::Types::STRING}}},
+ PROGRESSEDPERCENTAGE => {:type => ::Thrift::Types::DOUBLE, :name => 'progressedPercentage'},
+ STATUS => {:type => ::Thrift::Types::I32, :name => 'status', :enum_class => ::TJobExecutionStatus},
+ FOOTERSUMMARY => {:type => ::Thrift::Types::STRING, :name => 'footerSummary'},
+ STARTTIME => {:type => ::Thrift::Types::I64, :name => 'startTime'}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field headerNames is unset!') unless @headerNames
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field rows is unset!') unless @rows
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field progressedPercentage is unset!') unless @progressedPercentage
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field status is unset!') unless @status
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field footerSummary is unset!') unless @footerSummary
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field startTime is unset!') unless @startTime
+ unless @status.nil? || ::TJobExecutionStatus::VALID_VALUES.include?(@status)
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Invalid value of field status!')
+ end
+ end
+
+ ::Thrift::Struct.generate_accessors self
+end
+
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java
index de44ecb..b842f37 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -46,6 +46,8 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import org.apache.hive.service.server.HiveServer2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hive.service.cli.session.HiveSession;
/**
* CLIService.
@@ -255,8 +257,11 @@ public class CLIService extends CompositeService implements ICLIService {
@Override
public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay) throws HiveSQLException {
- OperationHandle opHandle =
- sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay);
+ HiveSession session = sessionManager.getSession(sessionHandle);
+ // need to reset the monitor, as operation handle is not available down stream, Ideally the
+ // monitor should be associated with the operation handle.
+ session.getSessionState().updateProgressMonitor(null);
+ OperationHandle opHandle = session.executeStatement(statement, confOverlay);
LOG.debug(sessionHandle + ": executeStatement()");
return opHandle;
}
@@ -267,9 +272,11 @@ public class CLIService extends CompositeService implements ICLIService {
@Override
public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
- OperationHandle opHandle =
- sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay,
- queryTimeout);
+ HiveSession session = sessionManager.getSession(sessionHandle);
+ // need to reset the monitor, as operation handle is not available down stream, Ideally the
+ // monitor should be associated with the operation handle.
+ session.getSessionState().updateProgressMonitor(null);
+ OperationHandle opHandle = session.executeStatement(statement, confOverlay, queryTimeout);
LOG.debug(sessionHandle + ": executeStatement()");
return opHandle;
}
@@ -280,8 +287,11 @@ public class CLIService extends CompositeService implements ICLIService {
@Override
public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay) throws HiveSQLException {
- OperationHandle opHandle =
- sessionManager.getSession(sessionHandle).executeStatementAsync(statement, confOverlay);
+ HiveSession session = sessionManager.getSession(sessionHandle);
+ // need to reset the monitor, as operation handle is not available down stream, Ideally the
+ // monitor should be associated with the operation handle.
+ session.getSessionState().updateProgressMonitor(null);
+ OperationHandle opHandle = session.executeStatementAsync(statement, confOverlay);
LOG.debug(sessionHandle + ": executeStatementAsync()");
return opHandle;
}
@@ -292,9 +302,11 @@ public class CLIService extends CompositeService implements ICLIService {
@Override
public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
- OperationHandle opHandle =
- sessionManager.getSession(sessionHandle).executeStatementAsync(statement, confOverlay,
- queryTimeout);
+ HiveSession session = sessionManager.getSession(sessionHandle);
+ // need to reset the monitor, as operation handle is not available down stream, Ideally the
+ // monitor should be associated with the operation handle.
+ session.getSessionState().updateProgressMonitor(null);
+ OperationHandle opHandle = session.executeStatementAsync(statement, confOverlay, queryTimeout);
LOG.debug(sessionHandle + ": executeStatementAsync()");
return opHandle;
}
@@ -410,18 +422,18 @@ public class CLIService extends CompositeService implements ICLIService {
String foreignSchema, String foreignTable)
throws HiveSQLException {
OperationHandle opHandle = sessionManager.getSession(sessionHandle)
- .getCrossReference(primaryCatalog, primarySchema, primaryTable,
+ .getCrossReference(primaryCatalog, primarySchema, primaryTable,
foreignCatalog,
foreignSchema, foreignTable);
LOG.debug(sessionHandle + ": getCrossReference()");
return opHandle;
}
-
+
/* (non-Javadoc)
* @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle)
*/
@Override
- public OperationStatus getOperationStatus(OperationHandle opHandle)
+ public OperationStatus getOperationStatus(OperationHandle opHandle, boolean getProgressUpdate)
throws HiveSQLException {
Operation operation = sessionManager.getOperationManager().getOperation(opHandle);
/**
@@ -457,9 +469,34 @@ public class CLIService extends CompositeService implements ICLIService {
}
OperationStatus opStatus = operation.getStatus();
LOG.debug(opHandle + ": getOperationStatus()");
+ opStatus.setJobProgressUpdate(progressUpdateLog(getProgressUpdate, operation));
return opStatus;
}
+ private JobProgressUpdate progressUpdateLog(boolean isProgressLogRequested, Operation operation) {
+ if (isProgressLogRequested && canProvideProgressLog()) {
+ if (OperationType.EXECUTE_STATEMENT.equals(operation.getType())) {
+ SessionState sessionState = operation.getParentSession().getSessionState();
+ try {
+ while (sessionState.getProgressMonitor() == null && !operation.isFinished()) {
+ Thread.sleep(10L); // sleep for 10 ms
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Error while getting progress update", e);
+ }
+ if (sessionState.getProgressMonitor() != null) {
+ return new JobProgressUpdate(sessionState.getProgressMonitor());
+ }
+ }
+ }
+ return new JobProgressUpdate(ProgressMonitor.NULL);
+ }
+
+ private boolean canProvideProgressLog() {
+ return "tez".equals(hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))
+ && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_INPLACE_PROGRESS);
+ }
+
/* (non-Javadoc)
* @see org.apache.hive.service.cli.ICLIService#cancelOperation(org.apache.hive.service.cli.OperationHandle)
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
index 2a3bcca..43fbb00 100644
--- a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
@@ -159,8 +159,8 @@ public class EmbeddedCLIServiceClient extends CLIServiceClient {
* @see org.apache.hive.service.cli.CLIServiceClient#getOperationStatus(org.apache.hive.service.cli.OperationHandle)
*/
@Override
- public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException {
- return cliService.getOperationStatus(opHandle);
+ public OperationStatus getOperationStatus(OperationHandle opHandle, boolean getProgressUpdate) throws HiveSQLException {
+ return cliService.getOperationStatus(opHandle, getProgressUpdate);
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/ICLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/ICLIService.java b/service/src/java/org/apache/hive/service/cli/ICLIService.java
index fef772d..9f2039c 100644
--- a/service/src/java/org/apache/hive/service/cli/ICLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/ICLIService.java
@@ -75,7 +75,7 @@ public interface ICLIService {
String catalogName, String schemaName, String functionName)
throws HiveSQLException;
- OperationStatus getOperationStatus(OperationHandle opHandle)
+ OperationStatus getOperationStatus(OperationHandle opHandle, boolean getProgressUpdate)
throws HiveSQLException;
void cancelOperation(OperationHandle opHandle)