You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/05/18 01:21:21 UTC
svn commit: r1104625 - in /hive/trunk:
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/common/
ql/src/java/org/apache/hadoop/hive/ql/exec/
Author: nzhang
Date: Tue May 17 23:21:20 2011
New Revision: 1104625
URL: http://svn.apache.org/viewvc?rev=1104625&view=rev
Log:
HIVE-243. ^C breaks out of running query, but not whole CLI (George Djabarov via Ning Zhang)
Added:
hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java
hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java
Modified:
hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Modified: hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java?rev=1104625&r1=1104624&r2=1104625&view=diff
==============================================================================
--- hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (original)
+++ hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java Tue May 17 23:21:20 2011
@@ -43,12 +43,14 @@ import org.apache.commons.lang.StringUti
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.HiveInterruptUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.HadoopJobExecHelper;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
@@ -61,6 +63,10 @@ import org.apache.hadoop.hive.service.Hi
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.thrift.TException;
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
+
/**
* CliDriver.
*
@@ -155,48 +161,48 @@ public class CliDriver {
}
}
} else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server
- HiveClient client = ss.getClient();
- PrintStream out = ss.out;
- PrintStream err = ss.err;
+ HiveClient client = ss.getClient();
+ PrintStream out = ss.out;
+ PrintStream err = ss.err;
- try {
- client.execute(cmd_trimmed);
- List<String> results;
- do {
- results = client.fetchN(LINES_TO_FETCH);
- for (String line: results) {
- out.println(line);
- }
- } while (results.size() == LINES_TO_FETCH);
- } catch (HiveServerException e) {
- ret = e.getErrorCode();
- if (ret != 0) { // OK if ret == 0 -- reached the EOF
- String errMsg = e.getMessage();
- if (errMsg == null) {
- errMsg = e.toString();
- }
- ret = e.getErrorCode();
- err.println("[Hive Error]: " + errMsg);
+ try {
+ client.execute(cmd_trimmed);
+ List<String> results;
+ do {
+ results = client.fetchN(LINES_TO_FETCH);
+ for (String line : results) {
+ out.println(line);
}
- } catch (TException e) {
+ } while (results.size() == LINES_TO_FETCH);
+ } catch (HiveServerException e) {
+ ret = e.getErrorCode();
+ if (ret != 0) { // OK if ret == 0 -- reached the EOF
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
- ret = -10002;
- err.println("[Thrift Error]: " + errMsg);
- } finally {
- try {
- client.clean();
- } catch (TException e) {
- String errMsg = e.getMessage();
- if (errMsg == null) {
- errMsg = e.toString();
- }
- err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: "
- + errMsg);
+ ret = e.getErrorCode();
+ err.println("[Hive Error]: " + errMsg);
+ }
+ } catch (TException e) {
+ String errMsg = e.getMessage();
+ if (errMsg == null) {
+ errMsg = e.toString();
+ }
+ ret = -10002;
+ err.println("[Thrift Error]: " + errMsg);
+ } finally {
+ try {
+ client.clean();
+ } catch (TException e) {
+ String errMsg = e.getMessage();
+ if (errMsg == null) {
+ errMsg = e.toString();
}
+ err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: "
+ + errMsg);
}
+ }
} else { // local mode
CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf)conf);
int tryCount = 0;
@@ -284,32 +290,88 @@ public class CliDriver {
}
public int processLine(String line) {
- int lastRet = 0, ret = 0;
+ return processLine(line, false);
+ }
- String command = "";
- for (String oneCmd : line.split(";")) {
+ /**
+ * Processes a line of semicolon separated commands
+ *
+ * @param line
+ * The commands to process
+ * @param allowInterupting
+ * When true the function will handle SIG_INT (Ctrl+C) by interrupting the processing and
+ * returning -1
+ * @return
+ */
+ public int processLine(String line, boolean allowInterupting) {
+ SignalHandler oldSignal = null;
+ Signal interupSignal = null;
+
+ if (allowInterupting) {
+ // Remember all threads that were running at the time we started line processing.
+ // Hook up the custom Ctrl+C handler while processing this line
+ interupSignal = new Signal("INT");
+ oldSignal = Signal.handle(interupSignal, new SignalHandler() {
+ private final Thread cliThread = Thread.currentThread();
+ private boolean interruptRequested;
+
+ @Override
+ public void handle(Signal signal) {
+ boolean initialRequest = !interruptRequested;
+ interruptRequested = true;
+
+ // Kill the VM on second ctrl+c
+ if (!initialRequest) {
+ console.printInfo("Exiting the JVM");
+ System.exit(127);
+ }
- if (StringUtils.endsWith(oneCmd, "\\")) {
- command += StringUtils.chop(oneCmd) + ";";
- continue;
- } else {
- command += oneCmd;
- }
- if (StringUtils.isBlank(command)) {
- continue;
- }
+ // Interrupt the CLI thread to stop the current statement and return
+ // to prompt
+ console.printInfo("Interrupting... Be patient, this might take some time.");
+ console.printInfo("Press Ctrl+C again to kill JVM");
+
+ // First, kill any running MR jobs
+ HadoopJobExecHelper.killRunningJobs();
+ HiveInterruptUtils.interrupt();
+ this.cliThread.interrupt();
+ }
+ });
+ }
- ret = processCmd(command);
- command = "";
- lastRet = ret;
- boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
- if (ret != 0 && !ignoreErrors) {
- CommandProcessorFactory.clean((HiveConf)conf);
- return ret;
+ try {
+ int lastRet = 0, ret = 0;
+
+ String command = "";
+ for (String oneCmd : line.split(";")) {
+
+ if (StringUtils.endsWith(oneCmd, "\\")) {
+ command += StringUtils.chop(oneCmd) + ";";
+ continue;
+ } else {
+ command += oneCmd;
+ }
+ if (StringUtils.isBlank(command)) {
+ continue;
+ }
+
+ ret = processCmd(command);
+ command = "";
+ lastRet = ret;
+ boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
+ if (ret != 0 && !ignoreErrors) {
+ CommandProcessorFactory.clean((HiveConf) conf);
+ return ret;
+ }
+ }
+ CommandProcessorFactory.clean((HiveConf) conf);
+ return lastRet;
+ } finally {
+ // Once we are done processing the line, restore the old handler
+ if (oldSignal != null && interupSignal != null) {
+ Signal.handle(interupSignal, oldSignal);
}
}
- CommandProcessorFactory.clean((HiveConf)conf);
- return lastRet;
}
public int processReader(BufferedReader r) throws IOException {
@@ -528,7 +590,7 @@ public class CliDriver {
}
if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
line = prefix + line;
- ret = cli.processLine(line);
+ ret = cli.processLine(line, true);
prefix = "";
curPrompt = prompt;
} else {
Added: hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java?rev=1104625&view=auto
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java (added)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java Tue May 17 23:21:20 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+public interface HiveInterruptCallback {
+ /**
+ * Request interrupting of the processing
+ */
+ void interrupt();
+}
Added: hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java?rev=1104625&view=auto
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java (added)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java Tue May 17 23:21:20 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HiveInterruptUtils {
+
+ /**
+ * A list of currently running comments that needs cleanup when the command is canceled
+ */
+ private static List<HiveInterruptCallback> interruptCallbacks = new ArrayList<HiveInterruptCallback>();
+
+ public static HiveInterruptCallback add(HiveInterruptCallback command) {
+ synchronized (interruptCallbacks) {
+ interruptCallbacks.add(command);
+ }
+ return command;
+ }
+
+ public static HiveInterruptCallback remove(HiveInterruptCallback command) {
+ synchronized (interruptCallbacks) {
+ interruptCallbacks.remove(command);
+ }
+ return command;
+ }
+
+ /**
+ * Request interruption of current hive command
+ */
+ public static void interrupt() {
+ synchronized (interruptCallbacks) {
+ for (HiveInterruptCallback resource : new ArrayList<HiveInterruptCallback>(interruptCallbacks)) {
+ resource.interrupt();
+ }
+ }
+ }
+
+ /**
+ * Checks if the current thread has been interrupted and throws RuntimeException is it has.
+ */
+ public static void checkInterrupted() {
+ if (Thread.currentThread().isInterrupted()) {
+ InterruptedException interrupt = null;
+ try {
+ Thread.sleep(0);
+ } catch (InterruptedException e) {
+ interrupt = e;
+ }
+ throw new RuntimeException("Interuppted", interrupt);
+ }
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1104625&r1=1104624&r2=1104625&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Tue May 17 23:21:20 2011
@@ -158,29 +158,33 @@ public class HadoopJobExecHelper {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- synchronized (runningJobKillURIs) {
- for (String uri : runningJobKillURIs.values()) {
- try {
- System.err.println("killing job with: " + uri);
- java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
- .openConnection();
- conn.setRequestMethod("POST");
- int retCode = conn.getResponseCode();
- if (retCode != 200) {
- System.err.println("Got an error trying to kill job with URI: " + uri + " = "
- + retCode);
- }
- } catch (Exception e) {
- System.err.println("trying to kill job, caught: " + e);
- // do nothing
- }
- }
- }
+ killRunningJobs();
}
});
}
}
-
+
+ public static void killRunningJobs() {
+ synchronized (runningJobKillURIs) {
+ for (String uri : runningJobKillURIs.values()) {
+ try {
+ System.err.println("killing job with: " + uri);
+ java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
+ .openConnection();
+ conn.setRequestMethod("POST");
+ int retCode = conn.getResponseCode();
+ if (retCode != 200) {
+ System.err.println("Got an error trying to kill job with URI: " + uri + " = "
+ + retCode);
+ }
+ } catch (Exception e) {
+ System.err.println("trying to kill job, caught: " + e);
+ // do nothing
+ }
+ }
+ }
+ }
+
public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
if (ctrs == null) {
// hadoop might return null if it cannot locate the job.
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1104625&r1=1104624&r2=1104625&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue May 17 23:21:20 2011
@@ -85,6 +85,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.HiveInterruptCallback;
+import org.apache.hadoop.hive.common.HiveInterruptUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -1619,99 +1621,98 @@ public final class Utilities {
// Process the case when name node call is needed
final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>();
ArrayList<Future<?>> results = new ArrayList<Future<?>>();
- ThreadPoolExecutor executor = null;
+ final ThreadPoolExecutor executor;
int maxThreads = ctx.getConf().getInt("mapred.dfsclient.parallelism.max", 0);
if (pathNeedProcess.size() > 1 && maxThreads > 1) {
int numExecutors = Math.min(pathNeedProcess.size(), maxThreads);
LOG.info("Using " + numExecutors + " threads for getContentSummary");
executor = new ThreadPoolExecutor(numExecutors, numExecutors, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
+ } else {
+ executor = null;
}
- //
- Configuration conf = ctx.getConf();
- JobConf jobConf = new JobConf(conf);
- for (String path : pathNeedProcess) {
- final Path p = new Path(path);
- final String pathStr = path;
- // All threads share the same Configuration and JobConf based on the
- // assumption that they are thread safe if only read operations are
- // executed. It is not stated in Hadoop's javadoc, the sourcce codes
- // clearly showed that they made efforts for it and we believe it is
- // thread safe. Will revisit this piece of codes if we find the assumption
- // is not correct.
- final Configuration myConf = conf;
- final JobConf myJobConf = jobConf;
- final PartitionDesc partDesc = work.getPathToPartitionInfo().get(
- p.toString());
- Runnable r = new Runnable() {
- public void run() {
- try {
- ContentSummary resultCs;
-
- Class<? extends InputFormat> inputFormatCls = partDesc
- .getInputFileFormatClass();
- InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
- inputFormatCls, myJobConf);
- if (inputFormatObj instanceof ContentSummaryInputFormat) {
- resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p,
- myJobConf);
- } else {
- FileSystem fs = p.getFileSystem(myConf);
- resultCs = fs.getContentSummary(p);
+ HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() {
+ @Override
+ public void interrupt() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+ });
+ try {
+ Configuration conf = ctx.getConf();
+ JobConf jobConf = new JobConf(conf);
+ for (String path : pathNeedProcess) {
+ final Path p = new Path(path);
+ final String pathStr = path;
+ // All threads share the same Configuration and JobConf based on the
+ // assumption that they are thread safe if only read operations are
+ // executed. It is not stated in Hadoop's javadoc, the sourcce codes
+ // clearly showed that they made efforts for it and we believe it is
+ // thread safe. Will revisit this piece of codes if we find the assumption
+ // is not correct.
+ final Configuration myConf = conf;
+ final JobConf myJobConf = jobConf;
+ final PartitionDesc partDesc = work.getPathToPartitionInfo().get(
+ p.toString());
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ ContentSummary resultCs;
+
+ Class<? extends InputFormat> inputFormatCls = partDesc
+ .getInputFileFormatClass();
+ InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
+ inputFormatCls, myJobConf);
+ if (inputFormatObj instanceof ContentSummaryInputFormat) {
+ resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p,
+ myJobConf);
+ } else {
+ FileSystem fs = p.getFileSystem(myConf);
+ resultCs = fs.getContentSummary(p);
+ }
+ resultMap.put(pathStr, resultCs);
+ } catch (IOException e) {
+ // We safely ignore this exception for summary data.
+ // We don't update the cache to protect it from polluting other
+ // usages. The worst case is that IOException will always be
+ // retried for another getInputSummary(), which is fine as
+ // IOException is not considered as a common case.
+ LOG.info("Cannot get size of " + pathStr + ". Safely ignored.");
}
- resultMap.put(pathStr, resultCs);
- } catch (IOException e) {
- // We safely ignore this exception for summary data.
- // We don't update the cache to protect it from polluting other
- // usages. The worst case is that IOException will always be
- // retried for another getInputSummary(), which is fine as
- // IOException is not considered as a common case.
- LOG.info("Cannot get size of " + pathStr + ". Safely ignored.");
}
- }
- };
+ };
- if (executor == null) {
- r.run();
- } else {
- Future<?> result = executor.submit(r);
- results.add(result);
+ if (executor == null) {
+ r.run();
+ } else {
+ Future<?> result = executor.submit(r);
+ results.add(result);
+ }
}
- }
- if (executor != null) {
- for (Future<?> result : results) {
- boolean executorDone = false;
- do {
- try {
- result.get();
- executorDone = true;
- } catch (InterruptedException e) {
- LOG.info("Interrupted when waiting threads: ", e);
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- throw new IOException(e);
- }
- } while (!executorDone);
+ if (executor != null) {
+ executor.shutdown();
}
- executor.shutdown();
- }
+ HiveInterruptUtils.checkInterrupted();
+ for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) {
+ ContentSummary cs = entry.getValue();
- for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) {
- ContentSummary cs = entry.getValue();
+ summary[0] += cs.getLength();
+ summary[1] += cs.getFileCount();
+ summary[2] += cs.getDirectoryCount();
- summary[0] += cs.getLength();
- summary[1] += cs.getFileCount();
- summary[2] += cs.getDirectoryCount();
+ ctx.addCS(entry.getKey(), cs);
+ LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength()
+ + " file count: "
+ + cs.getFileCount() + " directory count: " + cs.getDirectoryCount());
+ }
- ctx.addCS(entry.getKey(), cs);
- LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength()
- + " file count: "
- + cs.getFileCount() + " directory count: " + cs.getDirectoryCount());
+ return new ContentSummary(summary[0], summary[1], summary[2]);
+ } finally {
+ HiveInterruptUtils.remove(interrup);
}
-
- return new ContentSummary(summary[0], summary[1], summary[2]);
}
}