You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2013/03/01 08:02:48 UTC
svn commit: r1451494 - in /incubator/ambari/trunk: CHANGES.txt
ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
ambari-server/src/main/python/bootstrap.py
Author: mahadev
Date: Fri Mar 1 07:02:47 2013
New Revision: 1451494
URL: http://svn.apache.org/r1451494
Log:
AMBARI-1299. Bootstrap can hang indefinitely. (mahadev)
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
incubator/ambari/trunk/ambari-server/src/main/python/bootstrap.py
Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1451494&r1=1451493&r2=1451494&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Fri Mar 1 07:02:47 2013
@@ -688,6 +688,8 @@ Trunk (unreleased changes):
AMBARI-1498. Hive service check fails on secure HDP cluster. (Siddharth Wagle
via mahadev)
+ AMBARI-1299. Bootstrap can hang indefinitely. (mahadev)
+
AMBARI-1.2.0 branch:
INCOMPATIBLE CHANGES
Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java?rev=1451494&r1=1451493&r2=1451494&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java Fri Mar 1 07:02:47 2013
@@ -273,16 +273,21 @@ public class WorkflowJsonService {
@Produces(MediaType.APPLICATION_JSON)
@Path("/tasklocality")
public TaskLocalityData getTaskLocalitySummary(@QueryParam("jobId") String jobId, @DefaultValue("4") @QueryParam("minr") int minr,
- @DefaultValue("24") @QueryParam("maxr") int maxr) {
+ @DefaultValue("24") @QueryParam("maxr") int maxr, @QueryParam("workflowId") String workflowId,
+ @DefaultValue("-1") @QueryParam("startTime") long startTime, @DefaultValue("-1") @QueryParam("endTime") long endTime) {
if (maxr < minr)
maxr = minr;
TaskLocalityData data = new TaskLocalityData();
PostgresConnector conn = null;
try {
conn = getConnector();
- long[] times = conn.fetchJobStartStopTimes(jobId);
- if (times != null) {
- getTaskAttemptsByLocality(conn, jobId, times[0], times[1], data, minr, maxr);
+ if (jobId != null) {
+ long[] times = conn.fetchJobStartStopTimes(jobId);
+ if (times != null) {
+ getTaskAttemptsByLocality(conn.fetchJobTaskAttempts(jobId), times[0], times[1], data, minr, maxr);
+ }
+ } else if (workflowId != null) {
+ getTaskAttemptsByLocality(conn.fetchWorkflowTaskAttempts(workflowId), startTime, endTime, data, minr, maxr);
}
} catch (IOException e) {
e.printStackTrace();
@@ -324,11 +329,10 @@ public class WorkflowJsonService {
points.setReduceData(reducePoints);
}
- private static void getTaskAttemptsByLocality(PostgresConnector conn, String jobId, long submitTime, long finishTime, TaskLocalityData data, int minr,
+ private static void getTaskAttemptsByLocality(List<TaskAttempt> taskAttempts, long submitTime, long finishTime, TaskLocalityData data, int minr,
int maxr) throws IOException {
long submitTimeX = transformX(submitTime);
long finishTimeX = transformX(finishTime);
- List<TaskAttempt> taskAttempts = conn.fetchJobTaskAttempts(jobId);
Set<Long> xPoints = getXPoints(taskAttempts, submitTimeX, finishTimeX);
Long[] xList = xPoints.toArray(new Long[xPoints.size()]);
MinMax io = new MinMax();
Modified: incubator/ambari/trunk/ambari-server/src/main/python/bootstrap.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/python/bootstrap.py?rev=1451494&r1=1451493&r2=1451494&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/python/bootstrap.py (original)
+++ incubator/ambari/trunk/ambari-server/src/main/python/bootstrap.py Fri Mar 1 07:02:47 2013
@@ -30,6 +30,7 @@ import traceback
from pprint import pformat
AMBARI_PASSPHRASE_VAR_NAME = "AMBARI_PASSPHRASE"
+HOST_BOOTSTRAP_TIMEOUT = 300
class SCP(threading.Thread):
""" SCP implementation that is thread based. The status can be returned using
@@ -42,6 +43,7 @@ class SCP(threading.Thread):
self.bootdir = bootdir
self.ret = {"exitstatus" : -1, "log" : "FAILED"}
threading.Thread.__init__(self)
+ self.daemon = True
pass
def getStatus(self):
@@ -80,6 +82,7 @@ class SSH(threading.Thread):
self.bootdir = bootdir
self.ret = {"exitstatus" : -1, "log": "FAILED"}
threading.Thread.__init__(self)
+ self.daemon = True
pass
def getHost(self):
@@ -173,8 +176,14 @@ class PSSH:
chunkstats.append(ssh)
pass
# wait for the ssh's to complete
+ starttime = time.time()
for chunkstat in chunkstats:
- chunkstat.join()
+ elapsedtime = time.time() - starttime
+ if elapsedtime < HOST_BOOTSTRAP_TIMEOUT:
+ timeout = HOST_BOOTSTRAP_TIMEOUT - elapsedtime
+ else:
+ timeout = 0.0
+ chunkstat.join(timeout)
self.ret[chunkstat.getHost()] = chunkstat.getStatus()
pass
pass
@@ -205,8 +214,14 @@ class PSCP:
chunkstats.append(scp)
pass
# wait for the scp's to complete
+ starttime = time.time()
for chunkstat in chunkstats:
- chunkstat.join()
+ elapsedtime = time.time() - starttime
+ if elapsedtime < HOST_BOOTSTRAP_TIMEOUT:
+ timeout = HOST_BOOTSTRAP_TIMEOUT - elapsedtime
+ else:
+ timeout = 0.0
+ chunkstat.join(timeout)
self.ret[chunkstat.getHost()] = chunkstat.getStatus()
pass