You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2013/03/01 08:02:48 UTC

svn commit: r1451494 - in /incubator/ambari/trunk: CHANGES.txt ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java ambari-server/src/main/python/bootstrap.py

Author: mahadev
Date: Fri Mar  1 07:02:47 2013
New Revision: 1451494

URL: http://svn.apache.org/r1451494
Log:
AMBARI-1299. Bootstrap can hang indefinitely. (mahadev)

Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
    incubator/ambari/trunk/ambari-server/src/main/python/bootstrap.py

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1451494&r1=1451493&r2=1451494&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Fri Mar  1 07:02:47 2013
@@ -688,6 +688,8 @@ Trunk (unreleased changes):
  AMBARI-1498. Hive service check fails on secure HDP cluster. (Siddharth Wagle
  via mahadev)
 
+ AMBARI-1299. Bootstrap can hang indefinitely. (mahadev)
+
 AMBARI-1.2.0 branch:
 
  INCOMPATIBLE CHANGES

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java?rev=1451494&r1=1451493&r2=1451494&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java Fri Mar  1 07:02:47 2013
@@ -273,16 +273,21 @@ public class WorkflowJsonService {
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tasklocality")
   public TaskLocalityData getTaskLocalitySummary(@QueryParam("jobId") String jobId, @DefaultValue("4") @QueryParam("minr") int minr,
-      @DefaultValue("24") @QueryParam("maxr") int maxr) {
+      @DefaultValue("24") @QueryParam("maxr") int maxr, @QueryParam("workflowId") String workflowId,
+      @DefaultValue("-1") @QueryParam("startTime") long startTime, @DefaultValue("-1") @QueryParam("endTime") long endTime) {
     if (maxr < minr)
       maxr = minr;
     TaskLocalityData data = new TaskLocalityData();
     PostgresConnector conn = null;
     try {
       conn = getConnector();
-      long[] times = conn.fetchJobStartStopTimes(jobId);
-      if (times != null) {
-        getTaskAttemptsByLocality(conn, jobId, times[0], times[1], data, minr, maxr);
+      if (jobId != null) {
+        long[] times = conn.fetchJobStartStopTimes(jobId);
+        if (times != null) {
+          getTaskAttemptsByLocality(conn.fetchJobTaskAttempts(jobId), times[0], times[1], data, minr, maxr);
+        }
+      } else if (workflowId != null) {
+        getTaskAttemptsByLocality(conn.fetchWorkflowTaskAttempts(workflowId), startTime, endTime, data, minr, maxr);
       }
     } catch (IOException e) {
       e.printStackTrace();
@@ -324,11 +329,10 @@ public class WorkflowJsonService {
     points.setReduceData(reducePoints);
   }
   
-  private static void getTaskAttemptsByLocality(PostgresConnector conn, String jobId, long submitTime, long finishTime, TaskLocalityData data, int minr,
+  private static void getTaskAttemptsByLocality(List<TaskAttempt> taskAttempts, long submitTime, long finishTime, TaskLocalityData data, int minr,
       int maxr) throws IOException {
     long submitTimeX = transformX(submitTime);
     long finishTimeX = transformX(finishTime);
-    List<TaskAttempt> taskAttempts = conn.fetchJobTaskAttempts(jobId);
     Set<Long> xPoints = getXPoints(taskAttempts, submitTimeX, finishTimeX);
     Long[] xList = xPoints.toArray(new Long[xPoints.size()]);
     MinMax io = new MinMax();

Modified: incubator/ambari/trunk/ambari-server/src/main/python/bootstrap.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/python/bootstrap.py?rev=1451494&r1=1451493&r2=1451494&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/python/bootstrap.py (original)
+++ incubator/ambari/trunk/ambari-server/src/main/python/bootstrap.py Fri Mar  1 07:02:47 2013
@@ -30,6 +30,7 @@ import traceback
 from pprint import pformat
 
 AMBARI_PASSPHRASE_VAR_NAME = "AMBARI_PASSPHRASE"
+HOST_BOOTSTRAP_TIMEOUT = 300
 
 class SCP(threading.Thread):
   """ SCP implementation that is thread based. The status can be returned using
@@ -42,6 +43,7 @@ class SCP(threading.Thread):
     self.bootdir = bootdir
     self.ret = {"exitstatus" : -1, "log" : "FAILED"}
     threading.Thread.__init__(self)
+    self.daemon = True
     pass
   
   def getStatus(self):
@@ -80,6 +82,7 @@ class SSH(threading.Thread):
     self.bootdir = bootdir
     self.ret = {"exitstatus" : -1, "log": "FAILED"}
     threading.Thread.__init__(self)
+    self.daemon = True
     pass
   
   def getHost(self):
@@ -173,8 +176,14 @@ class PSSH:
         chunkstats.append(ssh)
         pass
       # wait for the ssh's to complete
+      starttime = time.time()
       for chunkstat in chunkstats:
-        chunkstat.join()
+        elapsedtime = time.time() - starttime
+        if elapsedtime < HOST_BOOTSTRAP_TIMEOUT:
+          timeout = HOST_BOOTSTRAP_TIMEOUT - elapsedtime
+        else:
+          timeout = 0.0
+        chunkstat.join(timeout)
         self.ret[chunkstat.getHost()] = chunkstat.getStatus()
       pass
     pass
@@ -205,8 +214,14 @@ class PSCP:
         chunkstats.append(scp)
         pass
       # wait for the scp's to complete
+      starttime = time.time()
       for chunkstat in chunkstats:
-        chunkstat.join()
+        elapsedtime = time.time() - starttime
+        if elapsedtime < HOST_BOOTSTRAP_TIMEOUT:
+          timeout = HOST_BOOTSTRAP_TIMEOUT - elapsedtime
+        else:
+          timeout = 0.0
+        chunkstat.join(timeout)
         self.ret[chunkstat.getHost()] = chunkstat.getStatus()
       pass