You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dm...@apache.org on 2020/05/07 15:35:46 UTC
[hive] branch master updated: HIVE-23124: Review of SQLOperation
Class (David Mollitor, reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
dmollitor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new bbfb0f8 HIVE-23124: Review of SQLOperation Class (David Mollitor, reviewed by Peter Vary)
bbfb0f8 is described below
commit bbfb0f804202db8c8423a61ffb7c9fe8d888309b
Author: David Mollitor <dm...@apache.org>
AuthorDate: Thu May 7 11:35:21 2020 -0400
HIVE-23124: Review of SQLOperation Class (David Mollitor, reviewed by Peter Vary)
---
.../hive/service/cli/operation/SQLOperation.java | 291 ++++++++++-----------
1 file changed, 137 insertions(+), 154 deletions(-)
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 25b6ab3..75b84d3 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -19,7 +19,6 @@
package org.apache.hive.service.cli.operation;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
@@ -27,18 +26,21 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.io.SessionStream;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
@@ -78,22 +80,19 @@ import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.cli.TableSchema;
import org.apache.hive.service.cli.session.HiveSession;
import org.apache.hive.service.server.ThreadWithGarbageCleanup;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
/**
* SQLOperation.
- *
*/
public class SQLOperation extends ExecuteStatementOperation {
private IDriver driver = null;
- private TableSchema resultSchema;
+ private Optional<TableSchema> resultSchema;
private AbstractSerDe serde = null;
private boolean fetchStarted = false;
private volatile MetricsScope currentSQLStateScope;
- private QueryInfo queryInfo;
- private long queryTimeout;
+ private final QueryInfo queryInfo;
+ private final long queryTimeout;
private ScheduledExecutorService timeoutExecutor;
private final boolean runAsync;
private final long operationLogCleanupDelayMs;
@@ -102,21 +101,25 @@ public class SQLOperation extends ExecuteStatementOperation {
/**
* A map to track query count running by each user
*/
- private static Map<String, AtomicInteger> userQueries = new HashMap<String, AtomicInteger>();
+ private static final Map<String, AtomicInteger> USER_QUERIES = new ConcurrentHashMap<>();
private static final String ACTIVE_SQL_USER = MetricsConstant.SQL_OPERATION_PREFIX + "active_user";
- private MetricsScope submittedQryScp;
+ private final Optional<MetricsScope> submittedQryScp;
public SQLOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay,
boolean runInBackground, long queryTimeout) {
// TODO: call setRemoteUser in ExecuteStatementOperation or higher.
super(parentSession, statement, confOverlay, runInBackground);
this.runAsync = runInBackground;
- this.queryTimeout = queryTimeout;
- long timeout = HiveConf.getTimeVar(queryState.getConf(),
- HiveConf.ConfVars.HIVE_QUERY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ this.resultSchema = Optional.empty();
+
+ final long timeout =
+ HiveConf.getTimeVar(queryState.getConf(), HiveConf.ConfVars.HIVE_QUERY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (timeout > 0 && (queryTimeout <= 0 || timeout < queryTimeout)) {
this.queryTimeout = timeout;
+ } else {
+ this.queryTimeout = queryTimeout;
}
+
this.operationLogCleanupDelayMs = HiveConf.getTimeVar(queryState.getConf(),
HiveConf.ConfVars.HIVE_SERVER2_OPERATION_LOG_CLEANUP_DELAY, TimeUnit.MILLISECONDS);
@@ -125,10 +128,9 @@ public class SQLOperation extends ExecuteStatementOperation {
queryInfo = new QueryInfo(getState().toString(), getParentSession().getUserName(),
getExecutionEngine(), getHandle().getHandleIdentifier().toString());
- Metrics metrics = MetricsFactory.getInstance();
- if (metrics != null) {
- submittedQryScp = metrics.createScope(MetricsConstant.HS2_SUBMITTED_QURIES);
- }
+ final Metrics metrics = MetricsFactory.getInstance();
+ this.submittedQryScp =
+ (metrics == null) ? Optional.empty() : Optional.of(metrics.createScope(MetricsConstant.HS2_SUBMITTED_QURIES));
}
@Override
@@ -158,32 +160,25 @@ public class SQLOperation extends ExecuteStatementOperation {
*
* @throws HiveSQLException
*/
- public void prepare(QueryState queryState) throws HiveSQLException {
+ private void prepare(QueryState queryState) throws HiveSQLException {
setState(OperationState.RUNNING);
try {
driver = DriverFactory.newDriver(queryState, queryInfo);
- // Start the timer thread for cancelling the query when query timeout is reached
+ // Start the timer thread for canceling the query when query timeout is reached
// queryTimeout == 0 means no timeout
- if (queryTimeout > 0) {
- timeoutExecutor = new ScheduledThreadPoolExecutor(1);
- Runnable timeoutTask = new Runnable() {
- @Override
- public void run() {
- try {
- String queryId = queryState.getQueryId();
- LOG.info("Query timed out after: " + queryTimeout
- + " seconds. Cancelling the execution now: " + queryId);
- SQLOperation.this.cancel(OperationState.TIMEDOUT);
- } catch (HiveSQLException e) {
- LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e);
- } finally {
- // Stop
- timeoutExecutor.shutdown();
- }
+ if (queryTimeout > 0L) {
+ timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
+ timeoutExecutor.schedule(() -> {
+ try {
+ final String queryId = queryState.getQueryId();
+ LOG.info("Query timed out after: " + queryTimeout + " seconds. Cancelling the execution now: " + queryId);
+ SQLOperation.this.cancel(OperationState.TIMEDOUT);
+ } catch (HiveSQLException e) {
+ LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e);
}
- };
- timeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS);
+ return null;
+ }, queryTimeout, TimeUnit.SECONDS);
}
queryInfo.setQueryDisplay(driver.getQueryDisplay());
@@ -211,7 +206,7 @@ public class SQLOperation extends ExecuteStatementOperation {
throw toSQLException("Error while compiling statement", e);
} catch (Throwable e) {
setState(OperationState.ERROR);
- throw new HiveSQLException("Error running query: " + e.toString(), e);
+ throw new HiveSQLException("Error running query", e);
}
}
@@ -247,7 +242,7 @@ public class SQLOperation extends ExecuteStatementOperation {
} else if (e instanceof HiveSQLException) {
throw (HiveSQLException) e;
} else {
- throw new HiveSQLException("Error running query: " + e.toString(), e);
+ throw new HiveSQLException("Error running query", e);
}
}
setState(OperationState.FINISHED);
@@ -257,14 +252,14 @@ public class SQLOperation extends ExecuteStatementOperation {
public void runInternal() throws HiveSQLException {
setState(OperationState.PENDING);
- boolean runAsync = shouldRunAsync();
- final boolean asyncPrepare = runAsync
+ final boolean doRunAsync = shouldRunAsync();
+ final boolean asyncPrepare = doRunAsync
&& HiveConf.getBoolVar(queryState.getConf(),
HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_ASYNC_COMPILE);
if (!asyncPrepare) {
prepare(queryState);
}
- if (!runAsync) {
+ if (!doRunAsync) {
runQuery();
} else {
// We'll pass ThreadLocals in the background thread from the foreground (handler) thread.
@@ -329,7 +324,7 @@ public class SQLOperation extends ExecuteStatementOperation {
} catch (HiveSQLException e) {
// TODO: why do we invent our own error path op top of the one from Future.get?
setOperationException(e);
- LOG.error("Error running hive query: ", e);
+ LOG.error("Error running hive query", e);
} finally {
LogUtils.unregisterLoggingContext();
@@ -440,10 +435,10 @@ public class SQLOperation extends ExecuteStatementOperation {
// Since compilation is always a blocking RPC call, and schema is ready after compilation,
// we can return when are in the RUNNING state.
assertState(Arrays.asList(OperationState.RUNNING, OperationState.FINISHED));
- if (resultSchema == null) {
- resultSchema = new TableSchema(driver.getSchema());
+ if (!resultSchema.isPresent()) {
+ resultSchema = Optional.of(new TableSchema(driver.getSchema()));
}
- return resultSchema;
+ return resultSchema.get();
}
@Override
@@ -493,26 +488,11 @@ public class SQLOperation extends ExecuteStatementOperation {
if (driver != null) {
List<QueryDisplay.TaskDisplay> statuses = driver.getQueryDisplay().getTaskDisplays();
if (statuses != null) {
- ByteArrayOutputStream out = null;
- try {
- ObjectMapper mapper = new ObjectMapper();
- out = new ByteArrayOutputStream();
- mapper.writeValue(out, statuses);
- return out.toString("UTF-8");
- } catch (JsonGenerationException e) {
+ try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ new ObjectMapper().writeValue(out, statuses);
+ return out.toString(StandardCharsets.UTF_8.name());
+ } catch (Exception e) {
throw new HiveSQLException(e);
- } catch (JsonMappingException e) {
- throw new HiveSQLException(e);
- } catch (IOException e) {
- throw new HiveSQLException(e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- throw new HiveSQLException(e);
- }
- }
}
}
}
@@ -520,18 +500,13 @@ public class SQLOperation extends ExecuteStatementOperation {
return null;
}
- private RowSet decode(List<Object> rows, RowSet rowSet) throws Exception {
- if (driver.isFetchingTable()) {
- return prepareFromRow(rows, rowSet);
- }
- return decodeFromString(rows, rowSet);
+ private RowSet decode(final List<Object> rows, final RowSet rowSet) throws Exception {
+ return (driver.isFetchingTable()) ? prepareFromRow(rows, rowSet) : decodeFromString(rows, rowSet);
}
// already encoded to thrift-able object in ThriftFormatter
- private RowSet prepareFromRow(List<Object> rows, RowSet rowSet) throws Exception {
- for (Object row : rows) {
- rowSet.addRow((Object[]) row);
- }
+ private RowSet prepareFromRow(final List<Object> rows, final RowSet rowSet) throws Exception {
+ rows.forEach(row -> rowSet.addRow((Object[]) row));
return rowSet;
}
@@ -560,43 +535,30 @@ public class SQLOperation extends ExecuteStatementOperation {
}
private AbstractSerDe getSerDe() throws SQLException {
- if (serde != null) {
- return serde;
- }
- try {
- Schema mResultSchema = driver.getSchema();
+ if (serde == null) {
+ try {
+ this.serde = new LazySimpleSerDe();
- List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas();
- StringBuilder namesSb = new StringBuilder();
- StringBuilder typesSb = new StringBuilder();
+ final Schema mResultSchema = driver.getSchema();
+ final List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas();
+ final Properties props = new Properties();
- if (fieldSchemas != null && !fieldSchemas.isEmpty()) {
- for (int pos = 0; pos < fieldSchemas.size(); pos++) {
- if (pos != 0) {
- namesSb.append(",");
- typesSb.append(",");
- }
- namesSb.append(fieldSchemas.get(pos).getName());
- typesSb.append(fieldSchemas.get(pos).getType());
+ if (!fieldSchemas.isEmpty()) {
+
+ final String names = fieldSchemas.stream().map(i -> i.getName()).collect(Collectors.joining(","));
+ final String types = fieldSchemas.stream().map(i -> i.getType()).collect(Collectors.joining(","));
+
+ LOG.debug("Column names: " + names);
+ LOG.debug("Column types: " + types);
+
+ props.setProperty(serdeConstants.LIST_COLUMNS, names);
+ props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types);
}
- }
- String names = namesSb.toString();
- String types = typesSb.toString();
-
- serde = new LazySimpleSerDe();
- Properties props = new Properties();
- if (names.length() > 0) {
- LOG.debug("Column names: " + names);
- props.setProperty(serdeConstants.LIST_COLUMNS, names);
- }
- if (types.length() > 0) {
- LOG.debug("Column types: " + types);
- props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types);
- }
- SerDeUtils.initializeSerDe(serde, queryState.getConf(), props, null);
- } catch (Exception ex) {
- throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex);
+ SerDeUtils.initializeSerDe(serde, queryState.getConf(), props, null);
+ } catch (Exception ex) {
+ throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex);
+ }
}
return serde;
}
@@ -609,82 +571,103 @@ public class SQLOperation extends ExecuteStatementOperation {
}
@Override
- protected void onNewState(OperationState state, OperationState prevState) {
-
+ protected void onNewState(final OperationState state, final OperationState prevState) {
super.onNewState(state, prevState);
+
currentSQLStateScope = updateOperationStateMetrics(currentSQLStateScope,
MetricsConstant.SQL_OPERATION_PREFIX,
MetricsConstant.COMPLETED_SQL_OPERATION_PREFIX, state);
- Metrics metrics = MetricsFactory.getInstance();
- if (metrics != null) {
+ final Optional<Metrics> metrics = Optional.ofNullable(MetricsFactory.getInstance());
+ if (metrics.isPresent()) {
// New state is changed to running from something else (user is active)
if (state == OperationState.RUNNING && prevState != state) {
- incrementUserQueries(metrics);
+ incrementUserQueries(metrics.get());
}
// New state is not running (user not active) any more
if (prevState == OperationState.RUNNING && prevState != state) {
- decrementUserQueries(metrics);
+ decrementUserQueries(metrics.get());
}
}
- if (state == OperationState.FINISHED || state == OperationState.CANCELED || state == OperationState.ERROR) {
- //update runtime
+ switch (state) {
+ case CANCELED:
queryInfo.setRuntime(getOperationComplete() - getOperationStart());
- if (metrics != null && submittedQryScp != null) {
- metrics.endScope(submittedQryScp);
+ if (metrics.isPresent() && submittedQryScp.isPresent()) {
+ metrics.get().endScope(submittedQryScp.get());
}
- }
-
- if (state == OperationState.CLOSED) {
- queryInfo.setEndTime();
- } else {
- //CLOSED state not interesting, state before (FINISHED, ERROR) is.
queryInfo.updateState(state.toString());
- }
-
- if (state == OperationState.ERROR) {
+ break;
+ case CLOSED:
+ queryInfo.setEndTime();
+ break;
+ case ERROR:
+ queryInfo.setRuntime(getOperationComplete() - getOperationStart());
+ if (metrics.isPresent() && submittedQryScp.isPresent()) {
+ metrics.get().endScope(submittedQryScp.get());
+ }
markQueryMetric(MetricsFactory.getInstance(), MetricsConstant.HS2_FAILED_QUERIES);
- }
- if (state == OperationState.FINISHED) {
+ queryInfo.updateState(state.toString());
+ break;
+ case FINISHED:
+ queryInfo.setRuntime(getOperationComplete() - getOperationStart());
+ if (metrics.isPresent() && submittedQryScp.isPresent()) {
+ metrics.get().endScope(submittedQryScp.get());
+ }
markQueryMetric(MetricsFactory.getInstance(), MetricsConstant.HS2_SUCCEEDED_QUERIES);
+ queryInfo.updateState(state.toString());
+ break;
+ case INITIALIZED:
+ /* fall through */
+ case PENDING:
+ /* fall through */
+ case RUNNING:
+ /* fall through */
+ case TIMEDOUT:
+ /* fall through */
+ case UNKNOWN:
+ /* fall through */
+ default:
+ queryInfo.updateState(state.toString());
+ break;
}
}
- private void incrementUserQueries(Metrics metrics) {
- String username = parentSession.getUserName();
- if (username != null) {
- synchronized (userQueries) {
- AtomicInteger count = userQueries.get(username);
- if (count == null) {
- count = new AtomicInteger(0);
- AtomicInteger prev = userQueries.put(username, count);
- if (prev == null) {
- metrics.incrementCounter(ACTIVE_SQL_USER);
- } else {
- count = prev;
- }
+ private void incrementUserQueries(final Metrics metrics) {
+ final String username = parentSession.getUserName();
+ if (StringUtils.isNotBlank(username)) {
+ USER_QUERIES.compute(username, (key, value) -> {
+ if (value == null) {
+ metrics.incrementCounter(ACTIVE_SQL_USER);
+ return new AtomicInteger(1);
+ } else {
+ value.incrementAndGet();
+ return value;
}
- count.incrementAndGet();
- }
+ });
}
}
- private void decrementUserQueries(Metrics metrics) {
- String username = parentSession.getUserName();
- if (username != null) {
- synchronized (userQueries) {
- AtomicInteger count = userQueries.get(username);
- if (count != null && count.decrementAndGet() <= 0) {
- metrics.decrementCounter(ACTIVE_SQL_USER);
- userQueries.remove(username);
+ private void decrementUserQueries(final Metrics metrics) {
+ final String username = parentSession.getUserName();
+ if (StringUtils.isNotBlank(username)) {
+ USER_QUERIES.compute(username, (key, value) -> {
+ if (value == null) {
+ return null;
+ } else {
+ final int newValue = value.decrementAndGet();
+ if (newValue == 0) {
+ metrics.decrementCounter(ACTIVE_SQL_USER);
+ return null;
+ }
+ return value;
}
- }
+ });
}
}
private void markQueryMetric(Metrics metric, String name) {
- if(metric != null) {
+ if (metric != null) {
metric.markMeter(name);
}
}