You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2011/06/15 19:37:35 UTC

svn commit: r1136131 - /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java

Author: heyongqiang
Date: Wed Jun 15 17:37:34 2011
New Revision: 1136131

URL: http://svn.apache.org/viewvc?rev=1136131&view=rev
Log:
HIVE-2222: runnable queue in Driver and DriverContext is not thread safe (namit via He Yongqiang)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1136131&r1=1136130&r2=1136131&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed Jun 15 17:37:34 2011
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -166,20 +167,20 @@ public class Driver implements CommandPr
         hiveLockMgr.setContext(new HiveLockManagerCtx(conf));
       } catch (Exception e) {
         // set hiveLockMgr to null just in case this invalid manager got set to
-        // next query's ctx.  
+        // next query's ctx.
         if (hiveLockMgr != null) {
           try {
             hiveLockMgr.close();
           } catch (LockException e1) {
             //nothing can do here
           }
-          hiveLockMgr = null;          
+          hiveLockMgr = null;
         }
         throw new SemanticException(ErrorMsg.LOCKMGR_NOT_INITIALIZED.getMsg() + e.getMessage());
       }
     }
   }
-  
+
   public void init() {
     Operator.resetId();
   }
@@ -336,9 +337,9 @@ public class Driver implements CommandPr
 
   /**
    * Compile a new query, but potentially reset taskID counter.  Not resetting task counter
-   * is useful for generating re-entrant QL queries.  
+   * is useful for generating re-entrant QL queries.
    * @param command  The HiveQL query to compile
-   * @param resetTaskIds Resets taskID counter if true.  
+   * @param resetTaskIds Resets taskID counter if true.
    * @return
    */
   public int compile(String command, boolean resetTaskIds) {
@@ -864,16 +865,16 @@ public class Driver implements CommandPr
             taskQueue.addAll(((ConditionalTask)tsk).getListTasks());
           }
           if(tsk.getChildTasks()!= null) {
-            taskQueue.addAll(tsk.getChildTasks());        
+            taskQueue.addAll(tsk.getChildTasks());
           }
           // does not add back up task here, because back up task should be the same
-          // type of the original task. 
+          // type of the original task.
         }
       } else {
         requireLock = true;
       }
     }
-    
+
     if (requireLock) {
       ret = acquireReadWriteLocks();
       if (ret != 0) {
@@ -1030,7 +1031,7 @@ public class Driver implements CommandPr
       // At any time, at most maxthreads tasks can be running
       // The main thread polls the TaskRunners to check if they have finished.
 
-      Queue<Task<? extends Serializable>> runnable = new LinkedList<Task<? extends Serializable>>();
+      Queue<Task<? extends Serializable>> runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
       Map<TaskResult, TaskRunner> running = new HashMap<TaskResult, TaskRunner>();
 
       DriverContext driverCxt = new DriverContext(runnable, ctx);