You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dm...@apache.org on 2020/05/07 15:35:46 UTC

[hive] branch master updated: HIVE-23124: Review of SQLOperation Class (David Mollitor, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

dmollitor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new bbfb0f8  HIVE-23124: Review of SQLOperation Class (David Mollitor, reviewed by Peter Vary)
bbfb0f8 is described below

commit bbfb0f804202db8c8423a61ffb7c9fe8d888309b
Author: David Mollitor <dm...@apache.org>
AuthorDate: Thu May 7 11:35:21 2020 -0400

    HIVE-23124: Review of SQLOperation Class (David Mollitor, reviewed by Peter Vary)
---
 .../hive/service/cli/operation/SQLOperation.java   | 291 ++++++++++-----------
 1 file changed, 137 insertions(+), 154 deletions(-)

diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 25b6ab3..75b84d3 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -19,7 +19,6 @@
 package org.apache.hive.service.cli.operation;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
@@ -27,18 +26,21 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.common.io.SessionStream;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
@@ -78,22 +80,19 @@ import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * SQLOperation.
- *
  */
 public class SQLOperation extends ExecuteStatementOperation {
   private IDriver driver = null;
-  private TableSchema resultSchema;
+  private Optional<TableSchema> resultSchema;
   private AbstractSerDe serde = null;
   private boolean fetchStarted = false;
   private volatile MetricsScope currentSQLStateScope;
-  private QueryInfo queryInfo;
-  private long queryTimeout;
+  private final QueryInfo queryInfo;
+  private final long queryTimeout;
   private ScheduledExecutorService timeoutExecutor;
   private final boolean runAsync;
   private final long operationLogCleanupDelayMs;
@@ -102,21 +101,25 @@ public class SQLOperation extends ExecuteStatementOperation {
   /**
    * A map to track query count running by each user
    */
-  private static Map<String, AtomicInteger> userQueries = new HashMap<String, AtomicInteger>();
+  private static final Map<String, AtomicInteger> USER_QUERIES = new ConcurrentHashMap<>();
   private static final String ACTIVE_SQL_USER = MetricsConstant.SQL_OPERATION_PREFIX + "active_user";
-  private MetricsScope submittedQryScp;
+  private final Optional<MetricsScope> submittedQryScp;
 
   public SQLOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay,
       boolean runInBackground, long queryTimeout) {
     // TODO: call setRemoteUser in ExecuteStatementOperation or higher.
     super(parentSession, statement, confOverlay, runInBackground);
     this.runAsync = runInBackground;
-    this.queryTimeout = queryTimeout;
-    long timeout = HiveConf.getTimeVar(queryState.getConf(),
-        HiveConf.ConfVars.HIVE_QUERY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    this.resultSchema = Optional.empty();
+
+    final long timeout =
+        HiveConf.getTimeVar(queryState.getConf(), HiveConf.ConfVars.HIVE_QUERY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
     if (timeout > 0 && (queryTimeout <= 0 || timeout < queryTimeout)) {
       this.queryTimeout = timeout;
+    } else {
+      this.queryTimeout = queryTimeout;
     }
+
     this.operationLogCleanupDelayMs = HiveConf.getTimeVar(queryState.getConf(),
       HiveConf.ConfVars.HIVE_SERVER2_OPERATION_LOG_CLEANUP_DELAY, TimeUnit.MILLISECONDS);
 
@@ -125,10 +128,9 @@ public class SQLOperation extends ExecuteStatementOperation {
     queryInfo = new QueryInfo(getState().toString(), getParentSession().getUserName(),
             getExecutionEngine(), getHandle().getHandleIdentifier().toString());
 
-    Metrics metrics = MetricsFactory.getInstance();
-    if (metrics != null) {
-      submittedQryScp = metrics.createScope(MetricsConstant.HS2_SUBMITTED_QURIES);
-    }
+    final Metrics metrics = MetricsFactory.getInstance();
+    this.submittedQryScp =
+        (metrics == null) ? Optional.empty() : Optional.of(metrics.createScope(MetricsConstant.HS2_SUBMITTED_QURIES));
   }
 
   @Override
@@ -158,32 +160,25 @@ public class SQLOperation extends ExecuteStatementOperation {
    *
    * @throws HiveSQLException
    */
-  public void prepare(QueryState queryState) throws HiveSQLException {
+  private void prepare(QueryState queryState) throws HiveSQLException {
     setState(OperationState.RUNNING);
     try {
       driver = DriverFactory.newDriver(queryState, queryInfo);
 
-      // Start the timer thread for cancelling the query when query timeout is reached
+      // Start the timer thread for canceling the query when query timeout is reached
       // queryTimeout == 0 means no timeout
-      if (queryTimeout > 0) {
-        timeoutExecutor = new ScheduledThreadPoolExecutor(1);
-        Runnable timeoutTask = new Runnable() {
-          @Override
-          public void run() {
-            try {
-              String queryId = queryState.getQueryId();
-              LOG.info("Query timed out after: " + queryTimeout
-                  + " seconds. Cancelling the execution now: " + queryId);
-              SQLOperation.this.cancel(OperationState.TIMEDOUT);
-            } catch (HiveSQLException e) {
-              LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e);
-            } finally {
-              // Stop
-              timeoutExecutor.shutdown();
-            }
+      if (queryTimeout > 0L) {
+        timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
+        timeoutExecutor.schedule(() -> {
+          try {
+            final String queryId = queryState.getQueryId();
+            LOG.info("Query timed out after: " + queryTimeout + " seconds. Cancelling the execution now: " + queryId);
+            SQLOperation.this.cancel(OperationState.TIMEDOUT);
+          } catch (HiveSQLException e) {
+            LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e);
           }
-        };
-        timeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS);
+          return null;
+        }, queryTimeout, TimeUnit.SECONDS);
       }
 
       queryInfo.setQueryDisplay(driver.getQueryDisplay());
@@ -211,7 +206,7 @@ public class SQLOperation extends ExecuteStatementOperation {
       throw toSQLException("Error while compiling statement", e);
     } catch (Throwable e) {
       setState(OperationState.ERROR);
-      throw new HiveSQLException("Error running query: " + e.toString(), e);
+      throw new HiveSQLException("Error running query", e);
     }
   }
 
@@ -247,7 +242,7 @@ public class SQLOperation extends ExecuteStatementOperation {
       } else if (e instanceof HiveSQLException) {
         throw (HiveSQLException) e;
       } else {
-        throw new HiveSQLException("Error running query: " + e.toString(), e);
+        throw new HiveSQLException("Error running query", e);
       }
     }
     setState(OperationState.FINISHED);
@@ -257,14 +252,14 @@ public class SQLOperation extends ExecuteStatementOperation {
   public void runInternal() throws HiveSQLException {
     setState(OperationState.PENDING);
 
-    boolean runAsync = shouldRunAsync();
-    final boolean asyncPrepare = runAsync
+    final boolean doRunAsync = shouldRunAsync();
+    final boolean asyncPrepare = doRunAsync
       && HiveConf.getBoolVar(queryState.getConf(),
         HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_ASYNC_COMPILE);
     if (!asyncPrepare) {
       prepare(queryState);
     }
-    if (!runAsync) {
+    if (!doRunAsync) {
       runQuery();
     } else {
       // We'll pass ThreadLocals in the background thread from the foreground (handler) thread.
@@ -329,7 +324,7 @@ public class SQLOperation extends ExecuteStatementOperation {
           } catch (HiveSQLException e) {
             // TODO: why do we invent our own error path op top of the one from Future.get?
             setOperationException(e);
-            LOG.error("Error running hive query: ", e);
+            LOG.error("Error running hive query", e);
           } finally {
             LogUtils.unregisterLoggingContext();
 
@@ -440,10 +435,10 @@ public class SQLOperation extends ExecuteStatementOperation {
     // Since compilation is always a blocking RPC call, and schema is ready after compilation,
     // we can return when are in the RUNNING state.
     assertState(Arrays.asList(OperationState.RUNNING, OperationState.FINISHED));
-    if (resultSchema == null) {
-      resultSchema = new TableSchema(driver.getSchema());
+    if (!resultSchema.isPresent()) {
+      resultSchema = Optional.of(new TableSchema(driver.getSchema()));
     }
-    return resultSchema;
+    return resultSchema.get();
   }
 
   @Override
@@ -493,26 +488,11 @@ public class SQLOperation extends ExecuteStatementOperation {
     if (driver != null) {
       List<QueryDisplay.TaskDisplay> statuses = driver.getQueryDisplay().getTaskDisplays();
       if (statuses != null) {
-        ByteArrayOutputStream out = null;
-        try {
-          ObjectMapper mapper = new ObjectMapper();
-          out = new ByteArrayOutputStream();
-          mapper.writeValue(out, statuses);
-          return out.toString("UTF-8");
-        } catch (JsonGenerationException e) {
+        try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+          new ObjectMapper().writeValue(out, statuses);
+          return out.toString(StandardCharsets.UTF_8.name());
+        } catch (Exception e) {
           throw new HiveSQLException(e);
-        } catch (JsonMappingException e) {
-          throw new HiveSQLException(e);
-        } catch (IOException e) {
-          throw new HiveSQLException(e);
-        } finally {
-          if (out != null) {
-            try {
-              out.close();
-            } catch (IOException e) {
-              throw new HiveSQLException(e);
-            }
-          }
         }
       }
     }
@@ -520,18 +500,13 @@ public class SQLOperation extends ExecuteStatementOperation {
     return null;
   }
 
-  private RowSet decode(List<Object> rows, RowSet rowSet) throws Exception {
-    if (driver.isFetchingTable()) {
-      return prepareFromRow(rows, rowSet);
-    }
-    return decodeFromString(rows, rowSet);
+  private RowSet decode(final List<Object> rows, final RowSet rowSet) throws Exception {
+    return (driver.isFetchingTable()) ? prepareFromRow(rows, rowSet) : decodeFromString(rows, rowSet);
   }
 
   // already encoded to thrift-able object in ThriftFormatter
-  private RowSet prepareFromRow(List<Object> rows, RowSet rowSet) throws Exception {
-    for (Object row : rows) {
-      rowSet.addRow((Object[]) row);
-    }
+  private RowSet prepareFromRow(final List<Object> rows, final RowSet rowSet) throws Exception {
+    rows.forEach(row -> rowSet.addRow((Object[]) row));
     return rowSet;
   }
 
@@ -560,43 +535,30 @@ public class SQLOperation extends ExecuteStatementOperation {
   }
 
   private AbstractSerDe getSerDe() throws SQLException {
-    if (serde != null) {
-      return serde;
-    }
-    try {
-      Schema mResultSchema = driver.getSchema();
+    if (serde == null) {
+      try {
+        this.serde = new LazySimpleSerDe();
 
-      List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas();
-      StringBuilder namesSb = new StringBuilder();
-      StringBuilder typesSb = new StringBuilder();
+        final Schema mResultSchema = driver.getSchema();
+        final List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas();
+        final Properties props = new Properties();
 
-      if (fieldSchemas != null && !fieldSchemas.isEmpty()) {
-        for (int pos = 0; pos < fieldSchemas.size(); pos++) {
-          if (pos != 0) {
-            namesSb.append(",");
-            typesSb.append(",");
-          }
-          namesSb.append(fieldSchemas.get(pos).getName());
-          typesSb.append(fieldSchemas.get(pos).getType());
+        if (!fieldSchemas.isEmpty()) {
+
+          final String names = fieldSchemas.stream().map(i -> i.getName()).collect(Collectors.joining(","));
+          final String types = fieldSchemas.stream().map(i -> i.getType()).collect(Collectors.joining(","));
+
+          LOG.debug("Column names: " + names);
+          LOG.debug("Column types: " + types);
+
+          props.setProperty(serdeConstants.LIST_COLUMNS, names);
+          props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types);
         }
-      }
-      String names = namesSb.toString();
-      String types = typesSb.toString();
-
-      serde = new LazySimpleSerDe();
-      Properties props = new Properties();
-      if (names.length() > 0) {
-        LOG.debug("Column names: " + names);
-        props.setProperty(serdeConstants.LIST_COLUMNS, names);
-      }
-      if (types.length() > 0) {
-        LOG.debug("Column types: " + types);
-        props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types);
-      }
-      SerDeUtils.initializeSerDe(serde, queryState.getConf(), props, null);
 
-    } catch (Exception ex) {
-      throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex);
+        SerDeUtils.initializeSerDe(serde, queryState.getConf(), props, null);
+      } catch (Exception ex) {
+        throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex);
+      }
     }
     return serde;
   }
@@ -609,82 +571,103 @@ public class SQLOperation extends ExecuteStatementOperation {
   }
 
   @Override
-  protected void onNewState(OperationState state, OperationState prevState) {
-
+  protected void onNewState(final OperationState state, final OperationState prevState) {
     super.onNewState(state, prevState);
+
     currentSQLStateScope = updateOperationStateMetrics(currentSQLStateScope,
         MetricsConstant.SQL_OPERATION_PREFIX,
         MetricsConstant.COMPLETED_SQL_OPERATION_PREFIX, state);
 
-    Metrics metrics = MetricsFactory.getInstance();
-    if (metrics != null) {
+    final Optional<Metrics> metrics = Optional.ofNullable(MetricsFactory.getInstance());
+    if (metrics.isPresent()) {
       // New state is changed to running from something else (user is active)
       if (state == OperationState.RUNNING && prevState != state) {
-        incrementUserQueries(metrics);
+        incrementUserQueries(metrics.get());
       }
       // New state is not running (user not active) any more
       if (prevState == OperationState.RUNNING && prevState != state) {
-        decrementUserQueries(metrics);
+        decrementUserQueries(metrics.get());
       }
     }
 
-    if (state == OperationState.FINISHED || state == OperationState.CANCELED || state == OperationState.ERROR) {
-      //update runtime
+    switch (state) {
+    case CANCELED:
       queryInfo.setRuntime(getOperationComplete() - getOperationStart());
-      if (metrics != null && submittedQryScp != null) {
-        metrics.endScope(submittedQryScp);
+      if (metrics.isPresent() && submittedQryScp.isPresent()) {
+        metrics.get().endScope(submittedQryScp.get());
       }
-    }
-
-    if (state == OperationState.CLOSED) {
-      queryInfo.setEndTime();
-    } else {
-      //CLOSED state not interesting, state before (FINISHED, ERROR) is.
       queryInfo.updateState(state.toString());
-    }
-
-    if (state == OperationState.ERROR) {
+      break;
+    case CLOSED:
+      queryInfo.setEndTime();
+      break;
+    case ERROR:
+      queryInfo.setRuntime(getOperationComplete() - getOperationStart());
+      if (metrics.isPresent() && submittedQryScp.isPresent()) {
+        metrics.get().endScope(submittedQryScp.get());
+      }
       markQueryMetric(MetricsFactory.getInstance(), MetricsConstant.HS2_FAILED_QUERIES);
-    }
-    if (state == OperationState.FINISHED) {
+      queryInfo.updateState(state.toString());
+      break;
+    case FINISHED:
+      queryInfo.setRuntime(getOperationComplete() - getOperationStart());
+      if (metrics.isPresent() && submittedQryScp.isPresent()) {
+        metrics.get().endScope(submittedQryScp.get());
+      }
       markQueryMetric(MetricsFactory.getInstance(), MetricsConstant.HS2_SUCCEEDED_QUERIES);
+      queryInfo.updateState(state.toString());
+      break;
+    case INITIALIZED:
+      /* fall through */
+    case PENDING:
+      /* fall through */
+    case RUNNING:
+      /* fall through */
+    case TIMEDOUT:
+      /* fall through */
+    case UNKNOWN:
+      /* fall through */
+    default:
+      queryInfo.updateState(state.toString());
+      break;
     }
   }
 
-  private void incrementUserQueries(Metrics metrics) {
-    String username = parentSession.getUserName();
-    if (username != null) {
-      synchronized (userQueries) {
-        AtomicInteger count = userQueries.get(username);
-        if (count == null) {
-          count = new AtomicInteger(0);
-          AtomicInteger prev = userQueries.put(username, count);
-          if (prev == null) {
-            metrics.incrementCounter(ACTIVE_SQL_USER);
-          } else {
-            count = prev;
-          }
+  private void incrementUserQueries(final Metrics metrics) {
+    final String username = parentSession.getUserName();
+    if (StringUtils.isNotBlank(username)) {
+      USER_QUERIES.compute(username, (key, value) -> {
+        if (value == null) {
+          metrics.incrementCounter(ACTIVE_SQL_USER);
+          return new AtomicInteger(1);
+        } else {
+          value.incrementAndGet();
+          return value;
         }
-        count.incrementAndGet();
-      }
+      });
     }
   }
 
-  private void decrementUserQueries(Metrics metrics) {
-    String username = parentSession.getUserName();
-    if (username != null) {
-      synchronized (userQueries) {
-        AtomicInteger count = userQueries.get(username);
-        if (count != null && count.decrementAndGet() <= 0) {
-          metrics.decrementCounter(ACTIVE_SQL_USER);
-          userQueries.remove(username);
+  private void decrementUserQueries(final Metrics metrics) {
+    final String username = parentSession.getUserName();
+    if (StringUtils.isNotBlank(username)) {
+      USER_QUERIES.compute(username, (key, value) -> {
+        if (value == null) {
+          return null;
+        } else {
+          final int newValue = value.decrementAndGet();
+          if (newValue == 0) {
+            metrics.decrementCounter(ACTIVE_SQL_USER);
+            return null;
+          }
+          return value;
         }
-      }
+      });
     }
   }
 
   private void markQueryMetric(Metrics metric, String name) {
-    if(metric != null) {
+    if (metric != null) {
       metric.markMeter(name);
     }
   }