You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/05/18 01:21:21 UTC

svn commit: r1104625 - in /hive/trunk: cli/src/java/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/common/ ql/src/java/org/apache/hadoop/hive/ql/exec/

Author: nzhang
Date: Tue May 17 23:21:20 2011
New Revision: 1104625

URL: http://svn.apache.org/viewvc?rev=1104625&view=rev
Log:
HIVE-243. ^C breaks out of running query, but not whole CLI (George Djabarov via Ning Zhang)

Added:
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java
Modified:
    hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java

Modified: hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java?rev=1104625&r1=1104624&r2=1104625&view=diff
==============================================================================
--- hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (original)
+++ hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java Tue May 17 23:21:20 2011
@@ -43,12 +43,14 @@ import org.apache.commons.lang.StringUti
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.HiveInterruptUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.HadoopJobExecHelper;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
 import org.apache.hadoop.hive.ql.parse.ParseDriver;
@@ -61,6 +63,10 @@ import org.apache.hadoop.hive.service.Hi
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.thrift.TException;
 
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
+
 /**
  * CliDriver.
  *
@@ -155,48 +161,48 @@ public class CliDriver {
         }
       }
     } else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server
-        HiveClient client = ss.getClient();
-        PrintStream out = ss.out;
-        PrintStream err = ss.err;
+      HiveClient client = ss.getClient();
+      PrintStream out = ss.out;
+      PrintStream err = ss.err;
 
-        try {
-          client.execute(cmd_trimmed);
-          List<String> results;
-          do {
-            results = client.fetchN(LINES_TO_FETCH);
-            for (String line: results) {
-              out.println(line);
-            }
-          } while (results.size() == LINES_TO_FETCH);
-        } catch (HiveServerException e) {
-          ret = e.getErrorCode();
-          if (ret != 0) { // OK if ret == 0 -- reached the EOF
-            String errMsg = e.getMessage();
-            if (errMsg == null) {
-              errMsg = e.toString();
-            }
-            ret = e.getErrorCode();
-            err.println("[Hive Error]: " + errMsg);
+      try {
+        client.execute(cmd_trimmed);
+        List<String> results;
+        do {
+          results = client.fetchN(LINES_TO_FETCH);
+          for (String line : results) {
+            out.println(line);
           }
-        } catch (TException e) {
+        } while (results.size() == LINES_TO_FETCH);
+      } catch (HiveServerException e) {
+        ret = e.getErrorCode();
+        if (ret != 0) { // OK if ret == 0 -- reached the EOF
           String errMsg = e.getMessage();
           if (errMsg == null) {
             errMsg = e.toString();
           }
-          ret = -10002;
-          err.println("[Thrift Error]: " + errMsg);
-        } finally {
-          try {
-            client.clean();
-          } catch (TException e) {
-            String errMsg = e.getMessage();
-            if (errMsg == null) {
-              errMsg = e.toString();
-            }
-            err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: "
-                + errMsg);
+          ret = e.getErrorCode();
+          err.println("[Hive Error]: " + errMsg);
+        }
+      } catch (TException e) {
+        String errMsg = e.getMessage();
+        if (errMsg == null) {
+          errMsg = e.toString();
+        }
+        ret = -10002;
+        err.println("[Thrift Error]: " + errMsg);
+      } finally {
+        try {
+          client.clean();
+        } catch (TException e) {
+          String errMsg = e.getMessage();
+          if (errMsg == null) {
+            errMsg = e.toString();
           }
+          err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: "
+              + errMsg);
         }
+      }
     } else { // local mode
       CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf)conf);
       int tryCount = 0;
@@ -284,32 +290,88 @@ public class CliDriver {
   }
 
   public int processLine(String line) {
-    int lastRet = 0, ret = 0;
+    return processLine(line, false);
+  }
 
-    String command = "";
-    for (String oneCmd : line.split(";")) {
+  /**
+   * Processes a line of semicolon separated commands
+   *
+   * @param line
+   *          The commands to process
+   * @param allowInterupting
+   *          When true the function will handle SIG_INT (Ctrl+C) by interrupting the processing and
+   *          returning -1
+   * @return
+   */
+  public int processLine(String line, boolean allowInterupting) {
+    SignalHandler oldSignal = null;
+    Signal interupSignal = null;
+
+    if (allowInterupting) {
+      // Remember all threads that were running at the time we started line processing.
+      // Hook up the custom Ctrl+C handler while processing this line
+      interupSignal = new Signal("INT");
+      oldSignal = Signal.handle(interupSignal, new SignalHandler() {
+        private final Thread cliThread = Thread.currentThread();
+        private boolean interruptRequested;
+
+        @Override
+        public void handle(Signal signal) {
+          boolean initialRequest = !interruptRequested;
+          interruptRequested = true;
+
+          // Kill the VM on second ctrl+c
+          if (!initialRequest) {
+            console.printInfo("Exiting the JVM");
+            System.exit(127);
+          }
 
-      if (StringUtils.endsWith(oneCmd, "\\")) {
-        command += StringUtils.chop(oneCmd) + ";";
-        continue;
-      } else {
-        command += oneCmd;
-      }
-      if (StringUtils.isBlank(command)) {
-        continue;
-      }
+          // Interrupt the CLI thread to stop the current statement and return
+          // to prompt
+          console.printInfo("Interrupting... Be patient, this might take some time.");
+          console.printInfo("Press Ctrl+C again to kill JVM");
+
+          // First, kill any running MR jobs
+          HadoopJobExecHelper.killRunningJobs();
+          HiveInterruptUtils.interrupt();
+          this.cliThread.interrupt();
+        }
+      });
+    }
 
-      ret = processCmd(command);
-      command = "";
-      lastRet = ret;
-      boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
-      if (ret != 0 && !ignoreErrors) {
-        CommandProcessorFactory.clean((HiveConf)conf);
-        return ret;
+    try {
+      int lastRet = 0, ret = 0;
+
+      String command = "";
+      for (String oneCmd : line.split(";")) {
+
+        if (StringUtils.endsWith(oneCmd, "\\")) {
+          command += StringUtils.chop(oneCmd) + ";";
+          continue;
+        } else {
+          command += oneCmd;
+        }
+        if (StringUtils.isBlank(command)) {
+          continue;
+        }
+
+        ret = processCmd(command);
+        command = "";
+        lastRet = ret;
+        boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
+        if (ret != 0 && !ignoreErrors) {
+          CommandProcessorFactory.clean((HiveConf) conf);
+          return ret;
+        }
+      }
+      CommandProcessorFactory.clean((HiveConf) conf);
+      return lastRet;
+    } finally {
+      // Once we are done processing the line, restore the old handler
+      if (oldSignal != null && interupSignal != null) {
+        Signal.handle(interupSignal, oldSignal);
       }
     }
-    CommandProcessorFactory.clean((HiveConf)conf);
-    return lastRet;
   }
 
   public int processReader(BufferedReader r) throws IOException {
@@ -528,7 +590,7 @@ public class CliDriver {
       }
       if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
         line = prefix + line;
-        ret = cli.processLine(line);
+        ret = cli.processLine(line, true);
         prefix = "";
         curPrompt = prompt;
       } else {

Added: hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java?rev=1104625&view=auto
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java (added)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java Tue May 17 23:21:20 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+public interface HiveInterruptCallback {
+  /**
+   * Request interrupting of the processing
+   */
+  void interrupt();
+}

Added: hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java?rev=1104625&view=auto
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java (added)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java Tue May 17 23:21:20 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HiveInterruptUtils {
+
+  /**
+   * A list of currently running comments that needs cleanup when the command is canceled
+   */
+  private static List<HiveInterruptCallback> interruptCallbacks = new ArrayList<HiveInterruptCallback>();
+
+  public static HiveInterruptCallback add(HiveInterruptCallback command) {
+    synchronized (interruptCallbacks) {
+      interruptCallbacks.add(command);
+    }
+    return command;
+  }
+
+  public static HiveInterruptCallback remove(HiveInterruptCallback command) {
+    synchronized (interruptCallbacks) {
+      interruptCallbacks.remove(command);
+    }
+    return command;
+  }
+
+  /**
+   * Request interruption of current hive command
+   */
+  public static void interrupt() {
+    synchronized (interruptCallbacks) {
+      for (HiveInterruptCallback resource : new ArrayList<HiveInterruptCallback>(interruptCallbacks)) {
+        resource.interrupt();
+      }
+    }
+  }
+
+  /**
+   * Checks if the current thread has been interrupted and throws RuntimeException is it has.
+   */
+  public static void checkInterrupted() {
+    if (Thread.currentThread().isInterrupted()) {
+      InterruptedException interrupt = null;
+      try {
+        Thread.sleep(0);
+      } catch (InterruptedException e) {
+        interrupt = e;
+      }
+      throw new RuntimeException("Interuppted", interrupt);
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1104625&r1=1104624&r2=1104625&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Tue May 17 23:21:20 2011
@@ -158,29 +158,33 @@ public class HadoopJobExecHelper {
       Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
-          synchronized (runningJobKillURIs) {
-            for (String uri : runningJobKillURIs.values()) {
-              try {
-                System.err.println("killing job with: " + uri);
-                java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
-                    .openConnection();
-                conn.setRequestMethod("POST");
-                int retCode = conn.getResponseCode();
-                if (retCode != 200) {
-                  System.err.println("Got an error trying to kill job with URI: " + uri + " = "
-                      + retCode);
-                }
-              } catch (Exception e) {
-                System.err.println("trying to kill job, caught: " + e);
-                // do nothing
-              }
-            }
-          }
+          killRunningJobs();
         }
       });
     }
   }
-  
+
+  public static void killRunningJobs() {
+    synchronized (runningJobKillURIs) {
+      for (String uri : runningJobKillURIs.values()) {
+        try {
+          System.err.println("killing job with: " + uri);
+          java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
+               .openConnection();
+          conn.setRequestMethod("POST");
+          int retCode = conn.getResponseCode();
+          if (retCode != 200) {
+            System.err.println("Got an error trying to kill job with URI: " + uri + " = "
+                + retCode);
+          }
+        } catch (Exception e) {
+          System.err.println("trying to kill job, caught: " + e);
+          // do nothing
+        }
+      }
+    }
+  }
+
   public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
     if (ctrs == null) {
       // hadoop might return null if it cannot locate the job.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1104625&r1=1104624&r2=1104625&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue May 17 23:21:20 2011
@@ -85,6 +85,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.HiveInterruptCallback;
+import org.apache.hadoop.hive.common.HiveInterruptUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -1619,99 +1621,98 @@ public final class Utilities {
       // Process the case when name node call is needed
       final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>();
       ArrayList<Future<?>> results = new ArrayList<Future<?>>();
-      ThreadPoolExecutor executor = null;
+      final ThreadPoolExecutor executor;
       int maxThreads = ctx.getConf().getInt("mapred.dfsclient.parallelism.max", 0);
       if (pathNeedProcess.size() > 1 && maxThreads > 1) {
         int numExecutors = Math.min(pathNeedProcess.size(), maxThreads);
         LOG.info("Using " + numExecutors + " threads for getContentSummary");
         executor = new ThreadPoolExecutor(numExecutors, numExecutors, 60, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>());
+      } else {
+        executor = null;
       }
 
-      //
-      Configuration conf = ctx.getConf();
-      JobConf jobConf = new JobConf(conf);
-      for (String path : pathNeedProcess) {
-        final Path p = new Path(path);
-        final String pathStr = path;
-        // All threads share the same Configuration and JobConf based on the
-        // assumption that they are thread safe if only read operations are
-        // executed. It is not stated in Hadoop's javadoc, the sourcce codes
-        // clearly showed that they made efforts for it and we believe it is
-        // thread safe. Will revisit this piece of codes if we find the assumption
-        // is not correct.
-        final Configuration myConf = conf;
-        final JobConf myJobConf = jobConf;
-        final PartitionDesc partDesc = work.getPathToPartitionInfo().get(
-            p.toString());
-        Runnable r = new Runnable() {
-          public void run() {
-            try {
-              ContentSummary resultCs;
-
-              Class<? extends InputFormat> inputFormatCls = partDesc
-                  .getInputFileFormatClass();
-              InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
-                  inputFormatCls, myJobConf);
-              if (inputFormatObj instanceof ContentSummaryInputFormat) {
-                resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p,
-                    myJobConf);
-              } else {
-                FileSystem fs = p.getFileSystem(myConf);
-                resultCs = fs.getContentSummary(p);
+      HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() {
+        @Override
+        public void interrupt() {
+          if (executor != null) {
+            executor.shutdownNow();
+          }
+        }
+      });
+      try {
+        Configuration conf = ctx.getConf();
+        JobConf jobConf = new JobConf(conf);
+        for (String path : pathNeedProcess) {
+          final Path p = new Path(path);
+          final String pathStr = path;
+          // All threads share the same Configuration and JobConf based on the
+          // assumption that they are thread safe if only read operations are
+          // executed. It is not stated in Hadoop's javadoc, the sourcce codes
+          // clearly showed that they made efforts for it and we believe it is
+          // thread safe. Will revisit this piece of codes if we find the assumption
+          // is not correct.
+          final Configuration myConf = conf;
+          final JobConf myJobConf = jobConf;
+          final PartitionDesc partDesc = work.getPathToPartitionInfo().get(
+              p.toString());
+          Runnable r = new Runnable() {
+            public void run() {
+              try {
+                ContentSummary resultCs;
+
+                Class<? extends InputFormat> inputFormatCls = partDesc
+                    .getInputFileFormatClass();
+                InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
+                    inputFormatCls, myJobConf);
+                if (inputFormatObj instanceof ContentSummaryInputFormat) {
+                  resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p,
+                      myJobConf);
+                } else {
+                  FileSystem fs = p.getFileSystem(myConf);
+                  resultCs = fs.getContentSummary(p);
+                }
+                resultMap.put(pathStr, resultCs);
+              } catch (IOException e) {
+                // We safely ignore this exception for summary data.
+                // We don't update the cache to protect it from polluting other
+                // usages. The worst case is that IOException will always be
+                // retried for another getInputSummary(), which is fine as
+                // IOException is not considered as a common case.
+                LOG.info("Cannot get size of " + pathStr + ". Safely ignored.");
               }
-              resultMap.put(pathStr, resultCs);
-            } catch (IOException e) {
-              // We safely ignore this exception for summary data.
-              // We don't update the cache to protect it from polluting other
-              // usages. The worst case is that IOException will always be
-              // retried for another getInputSummary(), which is fine as
-              // IOException is not considered as a common case.
-              LOG.info("Cannot get size of " + pathStr + ". Safely ignored.");
             }
-          }
-        };
+          };
 
-        if (executor == null) {
-          r.run();
-        } else {
-          Future<?> result = executor.submit(r);
-          results.add(result);
+          if (executor == null) {
+            r.run();
+          } else {
+            Future<?> result = executor.submit(r);
+            results.add(result);
+          }
         }
-      }
 
-      if (executor != null) {
-        for (Future<?> result : results) {
-          boolean executorDone = false;
-          do {
-            try {
-              result.get();
-              executorDone = true;
-            } catch (InterruptedException e) {
-              LOG.info("Interrupted when waiting threads: ", e);
-              Thread.currentThread().interrupt();
-            } catch (ExecutionException e) {
-              throw new IOException(e);
-            }
-          } while (!executorDone);
+        if (executor != null) {
+          executor.shutdown();
         }
-        executor.shutdown();
-      }
+        HiveInterruptUtils.checkInterrupted();
+        for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) {
+          ContentSummary cs = entry.getValue();
 
-      for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) {
-        ContentSummary cs = entry.getValue();
+          summary[0] += cs.getLength();
+          summary[1] += cs.getFileCount();
+          summary[2] += cs.getDirectoryCount();
 
-        summary[0] += cs.getLength();
-        summary[1] += cs.getFileCount();
-        summary[2] += cs.getDirectoryCount();
+          ctx.addCS(entry.getKey(), cs);
+          LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength()
+              + " file count: "
+              + cs.getFileCount() + " directory count: " + cs.getDirectoryCount());
+        }
 
-        ctx.addCS(entry.getKey(), cs);
-        LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength()
-            + " file count: "
-            + cs.getFileCount() + " directory count: " + cs.getDirectoryCount());
+        return new ContentSummary(summary[0], summary[1], summary[2]);
+      } finally {
+        HiveInterruptUtils.remove(interrup);
       }
-
-      return new ContentSummary(summary[0], summary[1], summary[2]);
     }
   }