You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/12/23 16:41:29 UTC

svn commit: r1647595 - in /hive/trunk: itests/hive-unit/src/test/java/org/apache/hive/beeline/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/session/ service/src/java/org/apache...

Author: brock
Date: Tue Dec 23 15:41:29 2014
New Revision: 1647595

URL: http://svn.apache.org/r1647595
Log:
HIVE-9120 - Hive Query log does not work when hive.exec.parallel is true (Dong Chen via Brock)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java
Removed:
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java
Modified:
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java

Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java Tue Dec 23 15:41:29 2014
@@ -534,6 +534,20 @@ public class TestBeeLineWithArgs {
   }
 
   /**
+   * Test Beeline could show the query progress for time-consuming query when hive.exec.parallel
+   * is true
+   * @throws Throwable
+   */
+  @Test
+  public void testQueryProgressParallel() throws Throwable {
+    final String SCRIPT_TEXT = "set hive.support.concurrency = false;\n" +
+        "set hive.exec.parallel = true;\n" +
+        "select count(*) from " + tableName + ";\n";
+    final String EXPECTED_PATTERN = "number of splits";
+    testScriptFile(SCRIPT_TEXT, EXPECTED_PATTERN, true, getBaseArgs(miniHS2.getBaseJdbcURL()));
+  }
+
+  /**
    * Test Beeline will hide the query progress when silent option is set.
    * @throws Throwable
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Tue Dec 23 15:41:29 2014
@@ -111,6 +111,7 @@ import org.apache.hadoop.hive.ql.securit
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
+import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.ByteStream;
@@ -1623,6 +1624,7 @@ public class Driver implements CommandPr
       if (LOG.isInfoEnabled()){
         LOG.info("Starting task [" + tsk + "] in parallel");
       }
+      tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
       tskRun.start();
     } else {
       if (LOG.isInfoEnabled()){

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java Tue Dec 23 15:41:29 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
 /**
@@ -32,6 +33,7 @@ public class TaskRunner extends Thread {
   protected Task<? extends Serializable> tsk;
   protected TaskResult result;
   protected SessionState ss;
+  private OperationLog operationLog;
   private static AtomicLong taskCounter = new AtomicLong(0);
   private static ThreadLocal<Long> taskRunnerID = new ThreadLocal<Long>() {
     @Override
@@ -68,6 +70,7 @@ public class TaskRunner extends Thread {
   public void run() {
     runner = Thread.currentThread();
     try {
+      OperationLog.setCurrentOperationLog(operationLog);
       SessionState.start(ss);
       runSequential();
     } finally {
@@ -95,4 +98,8 @@ public class TaskRunner extends Thread {
   public static long getTaskRunnerID () {
     return taskRunnerID.get();
   }
+
+  public void setOperationLog(OperationLog operationLog) {
+    this.operationLog = operationLog;
+  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java?rev=1647595&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java Tue Dec 23 15:41:29 2014
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.session;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.*;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * OperationLog wraps the actual operation log file, and provides interface
+ * for accessing, reading, writing, and removing the file.
+ */
+public class OperationLog {
+  private static final Log LOG = LogFactory.getLog(OperationLog.class.getName());
+
+  private final String operationName;
+  private final LogFile logFile;
+
+  public OperationLog(String name, File file) throws FileNotFoundException{
+    operationName = name;
+    logFile = new LogFile(file);
+  }
+
+  /**
+   * Singleton OperationLog object per thread.
+   */
+  private static final ThreadLocal<OperationLog> THREAD_LOCAL_OPERATION_LOG = new
+      ThreadLocal<OperationLog>() {
+    @Override
+    protected synchronized OperationLog initialValue() {
+      return null;
+    }
+  };
+
+  public static void setCurrentOperationLog(OperationLog operationLog) {
+    THREAD_LOCAL_OPERATION_LOG.set(operationLog);
+  }
+
+  public static OperationLog getCurrentOperationLog() {
+    return THREAD_LOCAL_OPERATION_LOG.get();
+  }
+
+  public static void removeCurrentOperationLog() {
+    THREAD_LOCAL_OPERATION_LOG.remove();
+  }
+
+  /**
+   * Write operation execution logs into log file
+   * @param operationLogMessage one line of log emitted from log4j
+   */
+  public void writeOperationLog(String operationLogMessage) {
+    logFile.write(operationLogMessage);
+  }
+
+  /**
+   * Read operation execution logs from log file
+   * @param isFetchFirst true if the Enum FetchOrientation value is Fetch_First
+   * @param maxRows the max number of fetched lines from log
+   * @return
+   * @throws java.sql.SQLException
+   */
+  public List<String> readOperationLog(boolean isFetchFirst, long maxRows)
+      throws SQLException{
+    return logFile.read(isFetchFirst, maxRows);
+  }
+
+  /**
+   * Close this OperationLog when operation is closed. The log file will be removed.
+   */
+  public void close() {
+    logFile.remove();
+  }
+
+  /**
+   * Wrapper for read/write the operation log file
+   */
+  private class LogFile {
+    private File file;
+    private BufferedReader in;
+    private PrintStream out;
+    private volatile boolean isRemoved;
+
+    LogFile(File file) throws FileNotFoundException {
+      this.file = file;
+      in = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
+      out = new PrintStream(new FileOutputStream(file));
+      isRemoved = false;
+    }
+
+    synchronized void write(String msg) {
+      // write log to the file
+      out.print(msg);
+    }
+
+    synchronized List<String> read(boolean isFetchFirst, long maxRows)
+        throws SQLException{
+      // reset the BufferReader, if fetching from the beginning of the file
+      if (isFetchFirst) {
+        resetIn();
+      }
+
+      return readResults(maxRows);
+    }
+
+    void remove() {
+      try {
+        if (in != null) {
+          in.close();
+        }
+        if (out != null) {
+          out.close();
+        }
+        FileUtils.forceDelete(file);
+        isRemoved = true;
+      } catch (Exception e) {
+        LOG.error("Failed to remove corresponding log file of operation: " + operationName, e);
+      }
+    }
+
+    private void resetIn() {
+      if (in != null) {
+        IOUtils.cleanup(LOG, in);
+        in = null;
+      }
+    }
+
+    private List<String> readResults(long nLines) throws SQLException {
+      if (in == null) {
+        try {
+          in = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
+        } catch (FileNotFoundException e) {
+          if (isRemoved) {
+            throw new SQLException("The operation has been closed and its log file " +
+                file.getAbsolutePath() + " has been removed.", e);
+          } else {
+            throw new SQLException("Operation Log file " + file.getAbsolutePath() +
+                " is not found.", e);
+          }
+        }
+      }
+
+      List<String> logs = new ArrayList<String>();
+      String line = "";
+      // if nLines <= 0, read all lines in log file.
+      for (int i = 0; i < nLines || nLines <= 0; i++) {
+        try {
+          line = in.readLine();
+          if (line == null) {
+            break;
+          } else {
+            logs.add(line);
+          }
+        } catch (IOException e) {
+          if (isRemoved) {
+            throw new SQLException("The operation has been closed and its log file " +
+                file.getAbsolutePath() + " has been removed.", e);
+          } else {
+            throw new SQLException("Reading operation log file encountered an exception: ", e);
+          }
+        }
+      }
+      return logs;
+    }
+  }
+}

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java Tue Dec 23 15:41:29 2014
@@ -21,6 +21,7 @@ import java.io.CharArrayWriter;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
 import org.apache.log4j.WriterAppender;

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Tue Dec 23 15:41:29 2014
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hive.service.cli.FetchOrientation;
 import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.OperationHandle;

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Tue Dec 23 15:41:29 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hive.service.cli.operation;
 
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -29,6 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hive.service.AbstractService;
 import org.apache.hive.service.cli.FetchOrientation;
 import org.apache.hive.service.cli.HiveSQLException;
@@ -258,7 +260,13 @@ public class OperationManager extends Ab
     }
 
     // read logs
-    List<String> logs = operationLog.readOperationLog(orientation, maxRows);
+    List<String> logs;
+    try {
+      logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows);
+    } catch (SQLException e) {
+      throw new HiveSQLException(e.getMessage(), e.getCause());
+    }
+
 
     // convert logs to RowSet
     TableSchema tableSchema = new TableSchema(getLogSchema());
@@ -270,6 +278,15 @@ public class OperationManager extends Ab
     return rowSet;
   }
 
+  private boolean isFetchFirst(FetchOrientation fetchOrientation) {
+    //TODO: Since OperationLog is moved to package o.a.h.h.ql.session,
+    // we may add a Enum there and map FetchOrientation to it.
+    if (fetchOrientation.equals(FetchOrientation.FETCH_FIRST)) {
+      return true;
+    }
+    return false;
+  }
+
   private Schema getLogSchema() {
     Schema schema = new Schema();
     FieldSchema fieldSchema = new FieldSchema();

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Tue Dec 23 15:41:29 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDe;
@@ -50,7 +51,6 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.security.UserGroupInformation;