You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/12/23 16:41:29 UTC
svn commit: r1647595 - in /hive/trunk:
itests/hive-unit/src/test/java/org/apache/hive/beeline/
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/session/ service/src/java/org/apache...
Author: brock
Date: Tue Dec 23 15:41:29 2014
New Revision: 1647595
URL: http://svn.apache.org/r1647595
Log:
HIVE-9120 - Hive Query log does not work when hive.exec.parallel is true (Dong Chen via Brock)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java
Removed:
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java
Modified:
hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java Tue Dec 23 15:41:29 2014
@@ -534,6 +534,20 @@ public class TestBeeLineWithArgs {
}
/**
+ * Test Beeline could show the query progress for time-consuming query when hive.exec.parallel
+ * is true
+ * @throws Throwable
+ */
+ @Test
+ public void testQueryProgressParallel() throws Throwable {
+ final String SCRIPT_TEXT = "set hive.support.concurrency = false;\n" +
+ "set hive.exec.parallel = true;\n" +
+ "select count(*) from " + tableName + ";\n";
+ final String EXPECTED_PATTERN = "number of splits";
+ testScriptFile(SCRIPT_TEXT, EXPECTED_PATTERN, true, getBaseArgs(miniHS2.getBaseJdbcURL()));
+ }
+
+ /**
* Test Beeline will hide the query progress when silent option is set.
* @throws Throwable
*/
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Tue Dec 23 15:41:29 2014
@@ -111,6 +111,7 @@ import org.apache.hadoop.hive.ql.securit
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
+import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.ByteStream;
@@ -1623,6 +1624,7 @@ public class Driver implements CommandPr
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in parallel");
}
+ tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
tskRun.start();
} else {
if (LOG.isInfoEnabled()){
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java Tue Dec 23 15:41:29 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.hadoop.hive.ql.session.SessionState;
/**
@@ -32,6 +33,7 @@ public class TaskRunner extends Thread {
protected Task<? extends Serializable> tsk;
protected TaskResult result;
protected SessionState ss;
+ private OperationLog operationLog;
private static AtomicLong taskCounter = new AtomicLong(0);
private static ThreadLocal<Long> taskRunnerID = new ThreadLocal<Long>() {
@Override
@@ -68,6 +70,7 @@ public class TaskRunner extends Thread {
public void run() {
runner = Thread.currentThread();
try {
+ OperationLog.setCurrentOperationLog(operationLog);
SessionState.start(ss);
runSequential();
} finally {
@@ -95,4 +98,8 @@ public class TaskRunner extends Thread {
public static long getTaskRunnerID () {
return taskRunnerID.get();
}
+
+ public void setOperationLog(OperationLog operationLog) {
+ this.operationLog = operationLog;
+ }
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java?rev=1647595&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java Tue Dec 23 15:41:29 2014
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.session;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.*;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * OperationLog wraps the actual operation log file, and provides interface
+ * for accessing, reading, writing, and removing the file.
+ */
+public class OperationLog {
+ private static final Log LOG = LogFactory.getLog(OperationLog.class.getName());
+
+ private final String operationName;
+ private final LogFile logFile;
+
+ public OperationLog(String name, File file) throws FileNotFoundException{
+ operationName = name;
+ logFile = new LogFile(file);
+ }
+
+ /**
+ * Singleton OperationLog object per thread.
+ */
+ private static final ThreadLocal<OperationLog> THREAD_LOCAL_OPERATION_LOG = new
+ ThreadLocal<OperationLog>() {
+ @Override
+ protected synchronized OperationLog initialValue() {
+ return null;
+ }
+ };
+
+ public static void setCurrentOperationLog(OperationLog operationLog) {
+ THREAD_LOCAL_OPERATION_LOG.set(operationLog);
+ }
+
+ public static OperationLog getCurrentOperationLog() {
+ return THREAD_LOCAL_OPERATION_LOG.get();
+ }
+
+ public static void removeCurrentOperationLog() {
+ THREAD_LOCAL_OPERATION_LOG.remove();
+ }
+
+ /**
+ * Write operation execution logs into log file
+ * @param operationLogMessage one line of log emitted from log4j
+ */
+ public void writeOperationLog(String operationLogMessage) {
+ logFile.write(operationLogMessage);
+ }
+
+ /**
+ * Read operation execution logs from log file
+ * @param isFetchFirst true if the Enum FetchOrientation value is Fetch_First
+ * @param maxRows the max number of fetched lines from log
+ * @return
+ * @throws java.sql.SQLException
+ */
+ public List<String> readOperationLog(boolean isFetchFirst, long maxRows)
+ throws SQLException{
+ return logFile.read(isFetchFirst, maxRows);
+ }
+
+ /**
+ * Close this OperationLog when operation is closed. The log file will be removed.
+ */
+ public void close() {
+ logFile.remove();
+ }
+
+ /**
+ * Wrapper for read/write the operation log file
+ */
+ private class LogFile {
+ private File file;
+ private BufferedReader in;
+ private PrintStream out;
+ private volatile boolean isRemoved;
+
+ LogFile(File file) throws FileNotFoundException {
+ this.file = file;
+ in = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
+ out = new PrintStream(new FileOutputStream(file));
+ isRemoved = false;
+ }
+
+ synchronized void write(String msg) {
+ // write log to the file
+ out.print(msg);
+ }
+
+ synchronized List<String> read(boolean isFetchFirst, long maxRows)
+ throws SQLException{
+ // reset the BufferReader, if fetching from the beginning of the file
+ if (isFetchFirst) {
+ resetIn();
+ }
+
+ return readResults(maxRows);
+ }
+
+ void remove() {
+ try {
+ if (in != null) {
+ in.close();
+ }
+ if (out != null) {
+ out.close();
+ }
+ FileUtils.forceDelete(file);
+ isRemoved = true;
+ } catch (Exception e) {
+ LOG.error("Failed to remove corresponding log file of operation: " + operationName, e);
+ }
+ }
+
+ private void resetIn() {
+ if (in != null) {
+ IOUtils.cleanup(LOG, in);
+ in = null;
+ }
+ }
+
+ private List<String> readResults(long nLines) throws SQLException {
+ if (in == null) {
+ try {
+ in = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
+ } catch (FileNotFoundException e) {
+ if (isRemoved) {
+ throw new SQLException("The operation has been closed and its log file " +
+ file.getAbsolutePath() + " has been removed.", e);
+ } else {
+ throw new SQLException("Operation Log file " + file.getAbsolutePath() +
+ " is not found.", e);
+ }
+ }
+ }
+
+ List<String> logs = new ArrayList<String>();
+ String line = "";
+ // if nLines <= 0, read all lines in log file.
+ for (int i = 0; i < nLines || nLines <= 0; i++) {
+ try {
+ line = in.readLine();
+ if (line == null) {
+ break;
+ } else {
+ logs.add(line);
+ }
+ } catch (IOException e) {
+ if (isRemoved) {
+ throw new SQLException("The operation has been closed and its log file " +
+ file.getAbsolutePath() + " has been removed.", e);
+ } else {
+ throw new SQLException("Reading operation log file encountered an exception: ", e);
+ }
+ }
+ }
+ return logs;
+ }
+ }
+}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java Tue Dec 23 15:41:29 2014
@@ -21,6 +21,7 @@ import java.io.CharArrayWriter;
import java.util.regex.Pattern;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.WriterAppender;
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Tue Dec 23 15:41:29 2014
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.hive.service.cli.FetchOrientation;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.OperationHandle;
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Tue Dec 23 15:41:29 2014
@@ -18,6 +18,7 @@
package org.apache.hive.service.cli.operation;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
@@ -29,6 +30,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.cli.FetchOrientation;
import org.apache.hive.service.cli.HiveSQLException;
@@ -258,7 +260,13 @@ public class OperationManager extends Ab
}
// read logs
- List<String> logs = operationLog.readOperationLog(orientation, maxRows);
+ List<String> logs;
+ try {
+ logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows);
+ } catch (SQLException e) {
+ throw new HiveSQLException(e.getMessage(), e.getCause());
+ }
+
// convert logs to RowSet
TableSchema tableSchema = new TableSchema(getLogSchema());
@@ -270,6 +278,15 @@ public class OperationManager extends Ab
return rowSet;
}
+ private boolean isFetchFirst(FetchOrientation fetchOrientation) {
+ //TODO: Since OperationLog is moved to package o.a.h.h.ql.session,
+ // we may add a Enum there and map FetchOrientation to it.
+ if (fetchOrientation.equals(FetchOrientation.FETCH_FIRST)) {
+ return true;
+ }
+ return false;
+ }
+
private Schema getLogSchema() {
Schema schema = new Schema();
FieldSchema fieldSchema = new FieldSchema();
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1647595&r1=1647594&r2=1647595&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Tue Dec 23 15:41:29 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDe;
@@ -50,7 +51,6 @@ import org.apache.hadoop.hive.serde2.laz
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.security.UserGroupInformation;