You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2014/08/21 19:15:46 UTC
git commit: AMBARI-6978. Uncatched exception at ambari agent - it may
die on connection error (dlysnichenko)
Repository: ambari
Updated Branches:
refs/heads/trunk 0822a54b2 -> 8f02714e5
AMBARI-6978. Uncatched exception at ambari agent - it may die on connection error (dlysnichenko)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8f02714e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8f02714e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8f02714e
Branch: refs/heads/trunk
Commit: 8f02714e5bff2c943caa309f020d7060e509735e
Parents: 0822a54
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Thu Aug 21 16:04:55 2014 +0300
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Thu Aug 21 20:14:31 2014 +0300
----------------------------------------------------------------------
.../src/main/python/ambari_agent/Controller.py | 55 +++++++-------
.../test/python/ambari_agent/TestController.py | 76 ++++++++++++++++----
2 files changed, 90 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/8f02714e/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 87af939..7859a2d 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -78,17 +78,20 @@ class Controller(threading.Thread):
def __del__(self):
logger.info("Server connection disconnected.")
pass
-
+
def registerWithServer(self):
+ """
+ :return: returning from current method without setting self.isRegistered
+ to True will lead to agent termination.
+ """
LiveStatus.SERVICES = []
LiveStatus.CLIENT_COMPONENTS = []
LiveStatus.COMPONENTS = []
- id = -1
ret = {}
while not self.isRegistered:
try:
- data = json.dumps(self.register.build(id))
+ data = json.dumps(self.register.build())
prettyData = pprint.pformat(data)
try:
@@ -111,8 +114,7 @@ class Controller(threading.Thread):
# log - message, which will be printed to agents log
if 'log' in ret.keys():
log = ret['log']
-
- logger.error(log)
+ logger.error(log)
self.isRegistered = False
self.repeatRegistration = False
return ret
@@ -122,23 +124,22 @@ class Controller(threading.Thread):
self.responseId = int(ret['responseId'])
self.isRegistered = True
if 'statusCommands' in ret.keys():
- logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) )
+ logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']))
self.addToStatusQueue(ret['statusCommands'])
pass
else:
self.hasMappedComponents = False
pass
except ssl.SSLError:
- self.repeatRegistration=False
+ self.repeatRegistration = False
self.isRegistered = False
return
except Exception:
# try a reconnect only after a certain amount of random time
delay = randint(0, self.range)
- logger.error("Unable to connect to: " + self.registerUrl, exc_info = True)
+ logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
""" Sleeping for {0} seconds and then retrying again """.format(delay)
time.sleep(delay)
- pass
pass
return ret
@@ -147,7 +148,7 @@ class Controller(threading.Thread):
if commands:
self.actionQueue.cancel(commands)
pass
-
+
def addToQueue(self, commands):
"""Add to the queue for running the commands """
""" Put the required actions into the Queue """
@@ -178,11 +179,8 @@ class Controller(threading.Thread):
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
retry = False
certVerifFailed = False
-
hb_interval = self.config.get('heartbeat', 'state_interval')
- #TODO make sure the response id is monotonically increasing
- id = 0
while not self.DEBUG_STOP_HEARTBEATING:
try:
if not retry:
@@ -212,7 +210,7 @@ class Controller(threading.Thread):
logger.info('Heartbeat response received (id = %s)', serverId)
if 'hasMappedComponents' in response.keys():
- self.hasMappedComponents = response['hasMappedComponents'] != False
+ self.hasMappedComponents = response['hasMappedComponents'] is not False
if 'registrationCommand' in response.keys():
# check if the registration command is None. If none skip
@@ -226,7 +224,7 @@ class Controller(threading.Thread):
logger.error("Error in responseId sequence - restarting")
self.restartAgent()
else:
- self.responseId=serverId
+ self.responseId = serverId
if 'cancelCommands' in response.keys():
self.cancelCommandInQueue(response['cancelCommands'])
@@ -250,7 +248,7 @@ class Controller(threading.Thread):
if retry:
logger.info("Reconnected to %s", self.heartbeatUrl)
- retry=False
+ retry = False
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
@@ -260,10 +258,6 @@ class Controller(threading.Thread):
self.isRegistered = False
return
except Exception, err:
- #randomize the heartbeat
- delay = randint(0, self.range)
- time.sleep(delay)
-
if "code" in err:
logger.error(err.code)
else:
@@ -283,13 +277,17 @@ class Controller(threading.Thread):
logger.warn("Server certificate verify failed. Did you regenerate server certificate?")
certVerifFailed = True
- self.cachedconnect = None # Previous connection is broken now
- retry=True
+ self.cachedconnect = None # Previous connection is broken now
+ retry = True
+
+ #randomize the heartbeat
+ delay = randint(0, self.range)
+ time.sleep(delay)
# Sleep for some time
timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
- self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
- self.heartbeat_wait_event.wait(timeout = timeout)
+ self.heartbeat_wait_event.wait(timeout=timeout)
# Sleep a bit more to allow STATUS_COMMAND results to be collected
# and sent in one heartbeat. Also avoid server overload with heartbeats
time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
@@ -345,17 +343,16 @@ class Controller(threading.Thread):
return json.loads(response)
except Exception, exception:
if response is None:
- err_msg = 'Request to {0} failed due to {1}'.format(url, str(exception))
- return {'exitstatus': 1, 'log': err_msg}
+ raise IOError('Request to {0} failed due to {1}'.format(url, str(exception)))
else:
- err_msg = ('Response parsing failed! Request data: ' + str(data)
- + '; Response: ' + str(response))
- logger.warn(err_msg)
- return {'exitstatus': 1, 'log': err_msg}
+ raise IOError('Response parsing failed! Request data: ' + str(data)
+ + '; Response: ' + str(response))
+
def updateComponents(self, cluster_name):
logger.info("Updating components map of cluster " + cluster_name)
+ # May throw IOError on server connection error
response = self.sendRequest(self.componentsUrl + cluster_name, None)
logger.debug("Response from %s was %s", self.serverHostname, str(response))
http://git-wip-us.apache.org/repos/asf/ambari/blob/8f02714e/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 9ec23db..ad8303f 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -30,7 +30,7 @@ from threading import Event
import json
with patch("platform.linux_distribution", return_value = ('Suse','11','Final')):
- from ambari_agent import Controller, ActionQueue
+ from ambari_agent import Controller, ActionQueue, Register
from ambari_agent import hostname
from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE
from ambari_commons import OSCheck
@@ -247,9 +247,9 @@ class TestController(unittest.TestCase):
heartbeatWithServer.assert_called_once_with()
self.controller.registerWithServer =\
- Controller.Controller.registerWithServer
+ Controller.Controller.registerWithServer
self.controller.heartbeatWithServer =\
- Controller.Controller.registerWithServer
+ Controller.Controller.registerWithServer
@patch("time.sleep")
def test_registerAndHeartbeat(self, sleepMock):
@@ -300,6 +300,33 @@ class TestController(unittest.TestCase):
Controller.Controller.registerWithServer
+ @patch("time.sleep")
+ @patch.object(Controller.Controller, "sendRequest")
+ def test_registerWithIOErrors(self, sendRequestMock, sleepMock):
+ # Check that server continues to heartbeat after connection errors
+ registerMock = MagicMock(name="Register")
+ registerMock.build.return_value = {}
+ actionQueue = MagicMock()
+ actionQueue.isIdle.return_value = True
+ self.controller.actionQueue = actionQueue
+ self.controller.register = registerMock
+ self.controller.responseId = 1
+ self.controller.TEST_IOERROR_COUNTER = 1
+ self.controller.isRegistered = False
+ def util_throw_IOErrors(*args, **kwargs):
+ """
+ Throws IOErrors 10 times and then stops heartbeats/registrations
+ """
+ if self.controller.TEST_IOERROR_COUNTER == 10:
+ self.controller.isRegistered = True
+ self.controller.TEST_IOERROR_COUNTER += 1
+ raise IOError("Sample error")
+ actionQueue.isIdle.return_value = False
+ sendRequestMock.side_effect = util_throw_IOErrors
+ self.controller.registerWithServer()
+ self.assertTrue(sendRequestMock.call_count > 5)
+
+
@patch("os._exit")
def test_restartAgent(self, os_exit_mock):
@@ -331,18 +358,22 @@ class TestController(unittest.TestCase):
{'Content-Type': 'application/json'})
conMock.request.return_value = '{invalid_object}'
- actual = self.controller.sendRequest(url, data)
- expected = {'exitstatus': 1, 'log': ('Response parsing failed! Request data: ' + data
- + '; Response: {invalid_object}')}
- self.assertEqual(actual, expected)
+
+ try:
+ self.controller.sendRequest(url, data)
+ self.fail("Should throw exception!")
+ except IOError, e: # Expected
+ self.assertEquals('Response parsing failed! Request data: ' + data +
+ '; Response: {invalid_object}', e.message)
exceptionMessage = "Connection Refused"
conMock.request.side_effect = Exception(exceptionMessage)
- actual = self.controller.sendRequest(url, data)
- expected = {'exitstatus': 1, 'log': 'Request to ' + url + ' failed due to ' + exceptionMessage}
-
- self.assertEqual(actual, expected)
-
+ try:
+ self.controller.sendRequest(url, data)
+ self.fail("Should throw exception!")
+ except IOError, e: # Expected
+ self.assertEquals('Request to ' + url + ' failed due to ' +
+ exceptionMessage, e.message)
@patch.object(threading._Event, "wait")
@@ -483,6 +514,27 @@ class TestController(unittest.TestCase):
sleepMock.assert_called_with(
self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
+ # Check that server continues to heartbeat after connection errors
+ self.controller.responseId = 1
+ self.controller.TEST_IOERROR_COUNTER = 1
+ sendRequest.reset()
+ def util_throw_IOErrors(*args, **kwargs):
+ """
+ Throws IOErrors 100 times and then stops heartbeats/registrations
+ """
+ if self.controller.TEST_IOERROR_COUNTER == 10:
+ self.controller.DEBUG_STOP_HEARTBEATING = True
+ self.controller.TEST_IOERROR_COUNTER += 1
+ raise IOError("Sample error")
+ self.controller.DEBUG_STOP_HEARTBEATING = False
+ actionQueue.isIdle.return_value = False
+ sendRequest.side_effect = util_throw_IOErrors
+ self.controller.heartbeatWithServer()
+ self.assertTrue(sendRequest.call_count > 5)
+
+ sleepMock.assert_called_with(
+ self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
+
sys.stdout = sys.__stdout__
self.controller.sendRequest = Controller.Controller.sendRequest
self.controller.sendRequest = Controller.Controller.addToQueue