You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/03/24 18:56:01 UTC

svn commit: r1580940 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: exec/ exec/mr/ exec/tez/ io/rcfile/merge/ io/rcfile/stats/ io/rcfile/truncate/

Author: hashutosh
Date: Mon Mar 24 17:56:00 2014
New Revision: 1580940

URL: http://svn.apache.org/r1580940
Log:
HIVE-6635 : Heartbeats are not being sent when DbLockMgr is used and an operation holds locks (Alan Gates via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java?rev=1580940&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java Mon Mar 24 17:56:00 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+
+import java.io.IOException;
+
+/**
+ * Class to handle heartbeats for MR and Tez tasks.
+ */
+public class Heartbeater {
+  private long lastHeartbeat = 0;
+  private long heartbeatInterval = 0;
+  private boolean dontHeartbeat = false;
+  private HiveTxnManager txnMgr;
+  private Configuration conf;
+
+  static final private Log LOG = LogFactory.getLog(Heartbeater.class.getName());
+
+  /**
+   *
+   * @param txnMgr transaction manager for this operation
+   * @param conf Configuration for this operation
+   */
+  public Heartbeater(HiveTxnManager txnMgr, Configuration conf) {
+    this.txnMgr = txnMgr;
+    this.conf = conf;
+  }
+
+  /**
+   * Send a heartbeat to the metastore for locks and transactions.
+   * @throws IOException
+   */
+  public void heartbeat() throws IOException {
+    if (dontHeartbeat) return;
+
+    if (txnMgr == null) {
+      LOG.debug("txnMgr null, not heartbeating");
+      dontHeartbeat = true;
+      return;
+    }
+
+    if (heartbeatInterval == 0) {
+      // Multiply the heartbeat interval by 1000 to convert to milliseconds,
+      // but divide by 2 to give us a safety factor.
+      heartbeatInterval = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT) * 500;
+      if (heartbeatInterval == 0) {
+        LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent");
+        dontHeartbeat = true;
+        LOG.debug("heartbeat interval 0, not heartbeating");
+        return;
+      }
+    }
+    long now = System.currentTimeMillis();
+    if (now - lastHeartbeat > heartbeatInterval) {
+      try {
+        LOG.debug("heartbeating");
+        txnMgr.heartbeat();
+      } catch (LockException e) {
+        LOG.warn("Failed trying to heartbeat " + e.getMessage());
+        throw new IOException(e);
+      }
+      lastHeartbeat = now;
+    }
+  }
+
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1580940&r1=1580939&r2=1580940&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Mon Mar 24 17:56:00 2014
@@ -423,7 +423,7 @@ public class ExecDriver extends Task<Map
         HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
       }
 
-      returnVal = jobExecHelper.progress(rj, jc);
+      returnVal = jobExecHelper.progress(rj, jc, ctx.getHiveTxnManager());
       success = (returnVal == 0);
     } catch (Exception e) {
       e.printStackTrace();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1580940&r1=1580939&r2=1580940&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Mon Mar 24 17:56:00 2014
@@ -37,11 +37,14 @@ import org.apache.hadoop.hive.common.Jav
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.MapRedStats;
+import org.apache.hadoop.hive.ql.exec.Heartbeater;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskHandle;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -73,6 +76,7 @@ public class HadoopJobExecHelper {
   private final LogHelper console;
   private final HadoopJobExecHook callBackObj;
 
+
   /**
    * Update counters relevant to this task.
    */
@@ -224,11 +228,14 @@ public class HadoopJobExecHelper {
     int numReduce = -1;
     List<ClientStatsPublisher> clientStatPublishers = getClientStatPublishers();
 
+    Heartbeater heartbeater = new Heartbeater(th.getTxnManager(), job);
+
     while (!rj.isComplete()) {
       try {
         Thread.sleep(pullInterval);
       } catch (InterruptedException e) {
       }
+      heartbeater.heartbeat();
 
       if (initializing && rj.getJobState() == JobStatus.PREP) {
         // No reason to poll untill the job is initialized
@@ -408,6 +415,7 @@ public class HadoopJobExecHelper {
     return mapRedStats;
   }
 
+
   private String getId() {
     return this.task.getId();
   }
@@ -437,6 +445,7 @@ public class HadoopJobExecHelper {
   private static class ExecDriverTaskHandle extends TaskHandle {
     JobClient jc;
     RunningJob rj;
+    HiveTxnManager txnMgr;
 
     JobClient getJobClient() {
       return jc;
@@ -446,9 +455,14 @@ public class HadoopJobExecHelper {
       return rj;
     }
 
-    public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
+    HiveTxnManager getTxnManager() {
+      return txnMgr;
+    }
+
+    public ExecDriverTaskHandle(JobClient jc, RunningJob rj, HiveTxnManager txnMgr) {
       this.jc = jc;
       this.rj = rj;
+      this.txnMgr = txnMgr;
     }
 
     public void setRunningJob(RunningJob job) {
@@ -501,7 +515,7 @@ public class HadoopJobExecHelper {
   }
 
 
-  public int progress(RunningJob rj, JobClient jc) throws IOException {
+  public int progress(RunningJob rj, JobClient jc, HiveTxnManager txnMgr) throws IOException {
     jobId = rj.getID();
 
     int returnVal = 0;
@@ -522,7 +536,7 @@ public class HadoopJobExecHelper {
 
     runningJobs.add(rj);
 
-    ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
+    ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, txnMgr);
     jobInfo(rj);
     MapRedStats mapRedStats = progress(th);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1580940&r1=1580939&r2=1580940&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Mon Mar 24 17:56:00 2014
@@ -32,6 +32,9 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Heartbeater;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.tez.dag.api.TezException;
@@ -93,9 +96,12 @@ public class TezJobMonitor {
    * status retrieval.
    *
    * @param dagClient client that was used to kick off the job
+   * @param txnMgr transaction manager for this operation
+   * @param conf configuration file for this operation
    * @return int 0 - success, 1 - killed, 2 - failed
    */
-  public int monitorExecution(final DAGClient dagClient) throws InterruptedException {
+  public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr,
+                              HiveConf conf) throws InterruptedException {
     DAGStatus status = null;
     completed = new HashSet<String>();
 
@@ -106,6 +112,7 @@ public class TezJobMonitor {
     DAGStatus.State lastState = null;
     String lastReport = null;
     Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
+    Heartbeater heartbeater = new Heartbeater(txnMgr, conf);
 
     shutdownList.add(dagClient);
 
@@ -119,6 +126,7 @@ public class TezJobMonitor {
         status = dagClient.getDAGStatus(opts);
         Map<String, Progress> progressMap = status.getVertexProgress();
         DAGStatus.State state = status.getState();
+        heartbeater.heartbeat();
 
         if (state != lastState || state == RUNNING) {
           lastState = state;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1580940&r1=1580939&r2=1580940&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Mar 24 17:56:00 2014
@@ -147,7 +147,7 @@ public class TezTask extends Task<TezWor
 
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor();
-      rc = monitor.monitorExecution(client);
+      rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf);
 
       // fetch the counters
       Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1580940&r1=1580939&r2=1580940&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Mon Mar 24 17:56:00 2014
@@ -213,7 +213,7 @@ public class BlockMergeTask extends Task
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
 
-      returnVal = jobExecHelper.progress(rj, jc);
+      returnVal = jobExecHelper.progress(rj, jc, null);
       success = (returnVal == 0);
 
     } catch (Exception e) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java?rev=1580940&r1=1580939&r2=1580940&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java Mon Mar 24 17:56:00 2014
@@ -220,7 +220,7 @@ public class PartialScanTask extends Tas
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
 
-      returnVal = jobExecHelper.progress(rj, jc);
+      returnVal = jobExecHelper.progress(rj, jc, null);
       success = (returnVal == 0);
 
     } catch (Exception e) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java?rev=1580940&r1=1580939&r2=1580940&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java Mon Mar 24 17:56:00 2014
@@ -189,7 +189,7 @@ public class ColumnTruncateTask extends 
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
 
-      returnVal = jobExecHelper.progress(rj, jc);
+      returnVal = jobExecHelper.progress(rj, jc, null);
       success = (returnVal == 0);
 
     } catch (Exception e) {