You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2013/03/04 05:50:49 UTC

svn commit: r1452186 - in /incubator/ambari/trunk: ambari-agent/ ambari-agent/conf/unix/ ambari-agent/src/main/python/ambari_agent/ ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/ ambari-server/src/main/java/org/apache/ambari/server/c...

Author: mahadev
Date: Mon Mar  4 04:50:49 2013
New Revision: 1452186

URL: http://svn.apache.org/r1452186
Log:
AMBARI-1547. Fix ambari agent test cases that are failing due to missing directory. (mahadev)

Added:
    incubator/ambari/trunk/ambari-server/src/test/python/TestBootstrap.py
Modified:
    incubator/ambari/trunk/ambari-agent/conf/unix/ambari-agent.ini
    incubator/ambari/trunk/ambari-agent/pom.xml
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Grep.py
    incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/NetUtil.py
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java

Modified: incubator/ambari/trunk/ambari-agent/conf/unix/ambari-agent.ini
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/conf/unix/ambari-agent.ini?rev=1452186&r1=1452185&r2=1452186&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/conf/unix/ambari-agent.ini (original)
+++ incubator/ambari/trunk/ambari-agent/conf/unix/ambari-agent.ini Mon Mar  4 04:50:49 2013
@@ -22,6 +22,7 @@ prefix=/var/lib/ambari-agent/data
 
 [stack]
 installprefix=/var/ambari-agent/
+upgradeScriptsDir=/var/lib/ambari-agent/upgrade_stack
 
 [puppet]
 puppetmodules=/var/lib/ambari-agent/puppet

Modified: incubator/ambari/trunk/ambari-agent/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/pom.xml?rev=1452186&r1=1452185&r2=1452186&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/pom.xml (original)
+++ incubator/ambari/trunk/ambari-agent/pom.xml Mon Mar  4 04:50:49 2013
@@ -204,6 +204,17 @@
               </sources>
             </mapping>
             <mapping>
+              <directory>/var/lib/${project.artifactId}/upgrade_scripts</directory>
+              <filemode>755</filemode>
+              <username>root</username>
+              <groupname>root</groupname>
+              <sources>
+                <source>
+                  <location>src/main/upgrade_scripts</location>
+                </source>
+              </sources>
+            </mapping>
+            <mapping>
               <directory>${package.conf.dir}</directory>
               <configuration>true</configuration>
               <filemode>755</filemode>

Modified: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py?rev=1452186&r1=1452185&r2=1452186&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py Mon Mar  4 04:50:49 2013
@@ -33,7 +33,9 @@ import os
 import time
 import subprocess
 import copy
-import puppetExecutor
+import PuppetExecutor
+import UpgradeExecutor
+import PythonExecutor
 import tempfile
 from Grep import Grep
 
@@ -43,26 +45,31 @@ installScriptHash = -1
 class ActionQueue(threading.Thread):
   """ Action Queue for the agent. We pick one command at a time from the queue
   and execute that """
-  global commandQueue, resultQueue #, STATUS_COMMAND, EXECUTION_COMMAND
+  
   commandQueue = Queue.Queue()
   resultQueue = Queue.Queue()
 
   STATUS_COMMAND='STATUS_COMMAND'
   EXECUTION_COMMAND='EXECUTION_COMMAND'
+  UPGRADE_STATUS='UPGRADE'
+
   IDLE_SLEEP_TIME = 5
 
   def __init__(self, config):
     super(ActionQueue, self).__init__()
-    #threading.Thread.__init__(self)
     self.config = config
     self.sh = shellRunner()
     self._stop = threading.Event()
     self.maxRetries = config.getint('command', 'maxretries') 
     self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
-    self.executor = puppetExecutor.puppetExecutor(config.get('puppet', 'puppetmodules'),
+    self.puppetExecutor = PuppetExecutor.PuppetExecutor(
+                                   config.get('puppet', 'puppetmodules'),
                                    config.get('puppet', 'puppet_home'),
                                    config.get('puppet', 'facter_home'),
                                    config.get('agent', 'prefix'), config)
+    self.pythonExecutor = PythonExecutor.PythonExecutor()
+    self.upgradeExecutor = UpgradeExecutor.UpgradeExecutor(self.pythonExecutor,
+                                   PuppetExecutor, config)
     self.tmpdir = config.get('agent', 'prefix')
     self.commandInProgress = None
 
@@ -72,24 +79,16 @@ class ActionQueue(threading.Thread):
   def stopped(self):
     return self._stop.isSet()
 
-  def getshellinstance(self):
-    """ For Testing purpose only.""" 
-    return self.sh
-
   def put(self, command):
     logger.info("The " + command['commandType'] + " from the server is \n" + pprint.pformat(command))
-    commandQueue.put(command)
+    self.commandQueue.put(command)
     pass
 
-  def getCommandQueue(self):
-    """ For Testing purpose only."""
-    return commandQueue
-
   def run(self):
     result = []
     while not self.stopped():
-      while not commandQueue.empty():
-        command = commandQueue.get()
+      while not self.commandQueue.empty():
+        command = self.commandQueue.get()
         logger.info("Took an element of Queue: " + pprint.pformat(command))
         if command['commandType'] == self.EXECUTION_COMMAND:
           try:
@@ -103,8 +102,8 @@ class ActionQueue(threading.Thread):
             pass
 
           for entry in result:
-            resultQueue.put((ActionQueue.EXECUTION_COMMAND, entry))
-          pass
+            self.resultQueue.put((command['commandType'], entry))
+
         elif command['commandType'] == self.STATUS_COMMAND:
           cluster = command['clusterName']
           service = command['serviceName']
@@ -116,7 +115,7 @@ class ActionQueue(threading.Thread):
             logger.info("Got live status for component " + component + " of service " + str(service) +\
                         " of cluster " + str(cluster) + "\n" + pprint.pformat(result))
             if result is not None:
-              resultQueue.put((ActionQueue.STATUS_COMMAND, result))
+              self.resultQueue.put((ActionQueue.STATUS_COMMAND, result))
           except Exception, err:
             traceback.print_exc()
             logger.warn(err)
@@ -130,9 +129,9 @@ class ActionQueue(threading.Thread):
   def result(self):
     resultReports = []
     resultComponentStatus = []
-    while not resultQueue.empty():
-      res = resultQueue.get()
-      if res[0] == ActionQueue.EXECUTION_COMMAND:
+    while not self.resultQueue.empty():
+      res = self.resultQueue.get()
+      if res[0] == self.EXECUTION_COMMAND:
         resultReports.append(res[1])
       elif res[0] == ActionQueue.STATUS_COMMAND:
         resultComponentStatus.append(res[1])
@@ -147,7 +146,7 @@ class ActionQueue(threading.Thread):
         tmpout='...'
         tmperr='...'
       grep = Grep()
-      output = grep.tail(tmpout, puppetExecutor.puppetExecutor.OUTPUT_LAST_LINES)
+      output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
       inprogress = {
         'role' : self.commandInProgress['role'],
         'actionId' : self.commandInProgress['actionId'],
@@ -166,12 +165,6 @@ class ActionQueue(threading.Thread):
     }
     return result
 
-  def registerCommand(self, command):
-    return {}
-  
-  def statusCommand(self, command):
-    return {}
-  
   def executeCommand(self, command):
     logger.info("Executing command \n" + pprint.pformat(command))
     clusterName = command['clusterName']
@@ -196,7 +189,11 @@ class ActionQueue(threading.Thread):
       'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt'
     }
     # running command
-    commandresult = self.executor.runCommand(command, self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
+    if command['commandType'] == ActionQueue.EXECUTION_COMMAND:
+      if command['roleCommand'] == ActionQueue.UPGRADE_STATUS:
+        commandresult = self.upgradeExecutor.perform_stack_upgrade(command, self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
+      else:
+        commandresult = self.puppetExecutor.runCommand(command, self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
     # dumping results
     self.commandInProgress = None
     status = "COMPLETED"
@@ -218,17 +215,8 @@ class ActionQueue(threading.Thread):
     if roleResult['stderr'] == '':
       roleResult['stderr'] = 'None'
     result.append(roleResult)
-    pass
     return result
 
-  def noOpCommand(self, command):
-    result = {'commandId' : command['Id']}
-    return result
-
-  def unknownAction(self, action):
-    logger.error('Unknown action: %s' % action['id'])
-    result = { 'id': action['id'] }
-    return result
 
   def isIdle(self):
-    return commandQueue.empty()
+    return self.commandQueue.empty()

Modified: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py?rev=1452186&r1=1452185&r2=1452186&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py Mon Mar  4 04:50:49 2013
@@ -38,6 +38,7 @@ prefix=/tmp/ambari-agent
 
 [stack]
 installprefix=/tmp
+upgradeScriptsDir=/var/lib/ambari-agent/upgrade_stack
 
 [puppet]
 puppetmodules=/var/lib/ambari-agent/puppet/

Modified: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Grep.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Grep.py?rev=1452186&r1=1452185&r2=1452186&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Grep.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Grep.py Mon Mar  4 04:50:49 2013
@@ -17,6 +17,13 @@ import re
 
 class Grep:
 
+  # How many lines from command output send to server
+  OUTPUT_LAST_LINES = 10
+  # How many lines from command error output send to server (before Err phrase)
+  ERROR_LAST_LINES_BEFORE = 30
+  # How many lines from command error output send to server (after Err phrase)
+  ERROR_LAST_LINES_AFTER = 30
+
   def __init__(self):
     pass
 

Modified: incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/NetUtil.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/NetUtil.py?rev=1452186&r1=1452185&r2=1452186&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/NetUtil.py (original)
+++ incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/NetUtil.py Mon Mar  4 04:50:49 2013
@@ -29,7 +29,6 @@ class NetUtil:
   CONNECT_SERVER_RETRY_INTERVAL_SEC = 10
   HEARTBEAT_IDDLE_INTERVAL_SEC = 10
   HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 5
-  HEARTBEAT_STATE_INTERVAL = 6 # default one per minute
 
   # Url within server to request during status check. This url
   # should return HTTP code 200

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java?rev=1452186&r1=1452185&r2=1452186&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java Mon Mar  4 04:50:49 2013
@@ -273,8 +273,7 @@ public class WorkflowJsonService {
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tasklocality")
   public TaskLocalityData getTaskLocalitySummary(@QueryParam("jobId") String jobId, @DefaultValue("4") @QueryParam("minr") int minr,
-      @DefaultValue("24") @QueryParam("maxr") int maxr, @QueryParam("workflowId") String workflowId,
-      @DefaultValue("-1") @QueryParam("startTime") long startTime, @DefaultValue("-1") @QueryParam("endTime") long endTime) {
+      @DefaultValue("24") @QueryParam("maxr") int maxr, @QueryParam("workflowId") String workflowId) {
     if (maxr < minr)
       maxr = minr;
     TaskLocalityData data = new TaskLocalityData();
@@ -284,10 +283,10 @@ public class WorkflowJsonService {
       if (jobId != null) {
         long[] times = conn.fetchJobStartStopTimes(jobId);
         if (times != null) {
-          getTaskAttemptsByLocality(conn.fetchJobTaskAttempts(jobId), times[0], times[1], data, minr, maxr);
+          getApproxTaskAttemptsByLocality(conn.fetchJobTaskAttempts(jobId), times[0], times[1], data, minr, maxr);
         }
       } else if (workflowId != null) {
-        getTaskAttemptsByLocality(conn.fetchWorkflowTaskAttempts(workflowId), startTime, endTime, data, minr, maxr);
+        getExactTaskAttemptsByLocality(conn.fetchWorkflowTaskAttempts(workflowId), data, minr, maxr);
       }
     } catch (IOException e) {
       e.printStackTrace();
@@ -329,7 +328,19 @@ public class WorkflowJsonService {
     points.setReduceData(reducePoints);
   }
   
-  private static void getTaskAttemptsByLocality(List<TaskAttempt> taskAttempts, long submitTime, long finishTime, TaskLocalityData data, int minr,
+  private static void getExactTaskAttemptsByLocality(List<TaskAttempt> taskAttempts, TaskLocalityData data, int minr, int maxr) throws IOException {
+    MinMax io = new MinMax();
+    data.setMapNodeLocal(processExactLocalityData(taskAttempts, "MAP", "NODE_LOCAL", io));
+    data.setMapRackLocal(processExactLocalityData(taskAttempts, "MAP", "RACK_LOCAL", io));
+    data.setMapOffSwitch(processExactLocalityData(taskAttempts, "MAP", "OFF_SWITCH", io));
+    data.setReduceOffSwitch(processExactLocalityData(taskAttempts, "REDUCE", "OFF_SWITCH", io));
+    setRValues(data.getMapNodeLocal(), minr, maxr, io.max);
+    setRValues(data.getMapRackLocal(), minr, maxr, io.max);
+    setRValues(data.getMapOffSwitch(), minr, maxr, io.max);
+    setRValues(data.getReduceOffSwitch(), minr, maxr, io.max);
+  }
+
+  private static void getApproxTaskAttemptsByLocality(List<TaskAttempt> taskAttempts, long submitTime, long finishTime, TaskLocalityData data, int minr,
       int maxr) throws IOException {
     long submitTimeX = transformX(submitTime);
     long finishTimeX = transformX(finishTime);
@@ -407,6 +418,24 @@ public class WorkflowJsonService {
     return index;
   }
   
+  private static List<DataPoint> processExactLocalityData(List<TaskAttempt> taskAttempts, String taskType, String locality, MinMax io) {
+    List<DataPoint> data = new ArrayList<DataPoint>();
+    for (TaskAttempt taskAttempt : taskAttempts) {
+      if (taskType.equals(taskAttempt.getTaskType()) && locality.equals(taskAttempt.getLocality())) {
+        DataPoint point = new DataPoint();
+        point.setX(taskAttempt.getStartTime());
+        point.setY(taskAttempt.getFinishTime() - taskAttempt.getStartTime());
+        point.setIO(taskAttempt.getInputBytes() + taskAttempt.getOutputBytes());
+        point.setLabel(taskAttempt.getTaskAttemptId());
+        point.setStatus(taskAttempt.getStatus());
+        data.add(point);
+        io.max = Math.max(io.max, point.getIO());
+        io.min = Math.min(io.min, point.getIO());
+      }
+    }
+    return data;
+  }
+
   private static List<DataPoint> processLocalityData(List<TaskAttempt> taskAttempts, String taskType, String locality, Long[] xPoints, MinMax io) {
     List<DataPoint> data = new ArrayList<DataPoint>();
     int i = 0;

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java?rev=1452186&r1=1452185&r2=1452186&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java Mon Mar  4 04:50:49 2013
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import org.apache.ambari.server.Role;
 
 /**
  * Resource provider for host component resources.
@@ -47,6 +48,7 @@ class HostComponentResourceProvider exte
   protected static final String HOST_COMPONENT_DESIRED_STATE_PROPERTY_ID  = PropertyHelper.getPropertyId("HostRoles", "desired_state");
   protected static final String HOST_COMPONENT_CONFIGS_PROPERTY_ID          = PropertyHelper.getPropertyId("HostRoles", "configs");
   protected static final String HOST_COMPONENT_DESIRED_CONFIGS_PROPERTY_ID  = PropertyHelper.getPropertyId("HostRoles", "desired_configs");
+  protected static final String HOST_COMPONENT_HIGH_AVAILABILITY_PROPERTY_ID  = PropertyHelper.getPropertyId("HostRoles", "ha_status");
 
   private static Set<String> pkPropertyIds =
       new HashSet<String>(Arrays.asList(new String[]{
@@ -116,15 +118,21 @@ class HostComponentResourceProvider exte
         return getManagementController().getHostComponents(requests);
       }
     });
-
+    
     for (ServiceComponentHostResponse response : responses) {
       Resource resource = new ResourceImpl(Resource.Type.HostComponent);
+      if((response.getComponentName()).equals(Role.HBASE_MASTER.toString())) {
+          requestedIds.add(HOST_COMPONENT_HIGH_AVAILABILITY_PROPERTY_ID);
+      }else{
+          requestedIds.remove(HOST_COMPONENT_HIGH_AVAILABILITY_PROPERTY_ID);
+      }
       setResourceProperty(resource, HOST_COMPONENT_CLUSTER_NAME_PROPERTY_ID, response.getClusterName(), requestedIds);
       setResourceProperty(resource, HOST_COMPONENT_SERVICE_NAME_PROPERTY_ID, response.getServiceName(), requestedIds);
       setResourceProperty(resource, HOST_COMPONENT_COMPONENT_NAME_PROPERTY_ID, response.getComponentName(), requestedIds);
       setResourceProperty(resource, HOST_COMPONENT_HOST_NAME_PROPERTY_ID, response.getHostname(), requestedIds);
       setResourceProperty(resource, HOST_COMPONENT_STATE_PROPERTY_ID, response.getLiveState(), requestedIds);
       setResourceProperty(resource, HOST_COMPONENT_DESIRED_STATE_PROPERTY_ID, response.getDesiredState(), requestedIds);
+      setResourceProperty(resource, HOST_COMPONENT_HIGH_AVAILABILITY_PROPERTY_ID, "NA", requestedIds);
       setResourceProperty(resource, HOST_COMPONENT_CONFIGS_PROPERTY_ID,
           response.getConfigs(), requestedIds);
       setResourceProperty(resource, HOST_COMPONENT_DESIRED_CONFIGS_PROPERTY_ID,

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java?rev=1452186&r1=1452185&r2=1452186&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java Mon Mar  4 04:50:49 2013
@@ -1,4 +1,4 @@
- /**
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

Added: incubator/ambari/trunk/ambari-server/src/test/python/TestBootstrap.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/python/TestBootstrap.py?rev=1452186&view=auto
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/python/TestBootstrap.py (added)
+++ incubator/ambari/trunk/ambari-server/src/test/python/TestBootstrap.py Mon Mar  4 04:50:49 2013
@@ -0,0 +1,56 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import bootstrap
+import time
+
+from bootstrap import SCP
+from bootstrap import PSCP
+from bootstrap import SSH
+from bootstrap import PSSH
+from unittest import TestCase
+
+class TestBootstrap(TestCase):
+
+  #Timout is specified in bootstrap.HOST_BOOTSTRAP_TIMEOUT, default is 300 seconds
+  def test_return_failed_status_for_hanging_ssh_threads_after_timeout(self):
+    bootstrap.HOST_BOOTSTRAP_TIMEOUT = 1
+    forever_hanging_timeout = 5
+    SSH.run = lambda self: time.sleep(forever_hanging_timeout)
+    pssh = PSSH(["hostname"], "sshKeyFile", "command", "bootdir")
+    self.assertTrue(pssh.ret == {})
+    starttime = time.time()
+    pssh.run()
+    self.assertTrue(pssh.ret != {})
+    self.assertTrue(time.time() - starttime < forever_hanging_timeout)
+    self.assertTrue(pssh.ret["hostname"]["log"] == "FAILED")
+    self.assertTrue(pssh.ret["hostname"]["exitstatus"] == -1)
+
+  #Timout is specified in bootstrap.HOST_BOOTSTRAP_TIMEOUT, default is 300 seconds
+  def test_return_failed_status_for_hanging_scp_threads_after_timeout(self):
+    bootstrap.HOST_BOOTSTRAP_TIMEOUT = 1
+    forever_hanging_timeout = 5
+    SCP.run = lambda self: time.sleep(forever_hanging_timeout)
+    pscp = PSCP(["hostname"], "sshKeyFile", "inputfile", "remote", "bootdir")
+    self.assertTrue(pscp.ret == {})
+    starttime = time.time()
+    pscp.run()
+    self.assertTrue(pscp.ret != {})
+    self.assertTrue(time.time() - starttime < forever_hanging_timeout)
+    self.assertTrue(pscp.ret["hostname"]["log"] == "FAILED")
+    self.assertTrue(pscp.ret["hostname"]["exitstatus"] == -1)