You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2014/04/25 19:36:36 UTC

git commit: AMBARI-5574 Need some logging when JSON parsing throws an error (dsen)

Repository: ambari
Updated Branches:
  refs/heads/trunk f0c31d29a -> bc5d82cad


AMBARI-5574 Need some logging when JSON parsing throws an error (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/bc5d82ca
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/bc5d82ca
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/bc5d82ca

Branch: refs/heads/trunk
Commit: bc5d82cad01f484195ddb30bdef60aa684856da4
Parents: f0c31d2
Author: Dmitry Sen <ds...@hortonworks.com>
Authored: Fri Apr 25 20:16:03 2014 +0300
Committer: Dmitry Sen <ds...@hortonworks.com>
Committed: Fri Apr 25 20:36:20 2014 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/Controller.py  | 29 ++++++----
 .../src/main/python/ambari_agent/security.py    |  8 ++-
 .../test/python/ambari_agent/TestController.py  | 58 +++++++++++++-------
 .../test/python/ambari_agent/TestSecurity.py    | 39 ++++++++++++-
 4 files changed, 100 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/bc5d82ca/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index f1099ae..9839313 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -88,8 +88,7 @@ class Controller(threading.Thread):
       try:
         data = json.dumps(self.register.build(id))
         logger.info("Registering with the server " + pprint.pformat(data))
-        response = self.sendRequest(self.registerUrl, data)
-        ret = json.loads(response)
+        ret = self.sendRequest(self.registerUrl, data)
         exitstatus = 0
         # exitstatus is a code of error which was rised on server side.
         # exitstatus = 0 (OK - Default)
@@ -177,7 +176,6 @@ class Controller(threading.Thread):
         else:
           self.DEBUG_HEARTBEAT_RETRIES += 1
         response = self.sendRequest(self.heartbeatUrl, data)
-        response = json.loads(response)
 
         logger.debug('Got server response: ' + pprint.pformat(response))
         
@@ -283,17 +281,28 @@ class Controller(threading.Thread):
     pass
 
   def sendRequest(self, url, data):
-    if self.cachedconnect is None: # Lazy initialization
-      self.cachedconnect = security.CachedHTTPSConnection(self.config)
-    req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
-    response = self.cachedconnect.request(req)
-    return response
+    try:
+      if self.cachedconnect is None: # Lazy initialization
+        self.cachedconnect = security.CachedHTTPSConnection(self.config)
+      req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
+      response = None
+      response = self.cachedconnect.request(req)
+      return json.loads(response)
+    except Exception:
+      if response is None:
+        err_msg = 'Request failed! Data: ' + str(data)
+        logger.warn(err_msg)
+        return {'exitstatus': 1, 'log': err_msg}
+      else:
+        err_msg = ('Response parsing failed! Request data: ' + str(data)
+            + '; Response: ' + str(response))
+        logger.warn(err_msg)
+        return {'exitstatus': 1, 'log': err_msg}
 
   def updateComponents(self, cluster_name):
     logger.info("Updating components map of cluster " + cluster_name)
     response = self.sendRequest(self.componentsUrl + cluster_name, None)
-    logger.debug("Response from server = " + response)
-    response = json.loads(response)
+    logger.debug("Response from server = " + str(response))
     for service, components in response['components'].items():
       LiveStatus.SERVICES.append(service)
       for component, category in components.items():

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc5d82ca/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py
index 3052245..9801dec 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -208,8 +208,12 @@ class CertificateManager():
     f = urllib2.urlopen(req)
     response = f.read()
     f.close()
-    data = json.loads(response)
-    logger.debug("Sign response from Server: \n" + pprint.pformat(data))
+    try:
+      data = json.loads(response)
+      logger.debug("Sign response from Server: \n" + pprint.pformat(data))
+    except Exception:
+      logger.warn("Malformed response! data: " + str(data))
+      data = {'result': 'ERROR'}
     result=data['result']
     if result == 'OK':
       agentCrtContent=data['signedCa']

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc5d82ca/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 000cb3f..daa7b82 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -80,18 +80,18 @@ class TestController(unittest.TestCase):
 
     self.controller.sendRequest = MagicMock()
 
-    dumpsMock.return_value = "request"
-    self.controller.sendRequest.return_value = '{"log":"Error text", "exitstatus":"1"}'
+    dumpsMock.return_value = '{"valid_object": true}'
+    self.controller.sendRequest.return_value = {"log":"Error text", "exitstatus":"1"}
 
     self.assertEqual({u'exitstatus': u'1', u'log': u'Error text'}, self.controller.registerWithServer())
     self.assertEqual(LiveStatus_mock.SERVICES, [])
     self.assertEqual(LiveStatus_mock.CLIENT_COMPONENTS, [])
     self.assertEqual(LiveStatus_mock.COMPONENTS, [])
 
-    self.controller.sendRequest.return_value = '{"responseId":1}'
+    self.controller.sendRequest.return_value = {"responseId":1}
     self.assertEqual({"responseId":1}, self.controller.registerWithServer())
 
-    self.controller.sendRequest.return_value = '{"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"}'
+    self.controller.sendRequest.return_value = {"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"}
     self.controller.addToStatusQueue = MagicMock(name="addToStatusQueue")
     self.controller.isRegistered = False
     self.assertEqual({'exitstatus': '0', 'responseId': 1, 'log': '', 'statusCommands': 'commands'}, self.controller.registerWithServer())
@@ -105,7 +105,7 @@ class TestController(unittest.TestCase):
         raise Exception("test")
       return "request"
 
-    self.controller.sendRequest.return_value = '{"responseId":1}'
+    self.controller.sendRequest.return_value = {"responseId":1}
 
     dumpsMock.side_effect = side_effect
     self.controller.isRegistered = False
@@ -306,7 +306,6 @@ class TestController(unittest.TestCase):
   def test_sendRequest(self, security_mock, requestMock):
 
     conMock = MagicMock()
-    conMock.request.return_value = "response"
     security_mock.CachedHTTPSConnection.return_value = conMock
     url = "url"
     data = "data"
@@ -314,19 +313,34 @@ class TestController(unittest.TestCase):
 
     self.controller.cachedconnect = None
 
-    self.assertEqual("response", self.controller.sendRequest(url, data))
+    conMock.request.return_value = '{"valid_object": true}'
+    actual = self.controller.sendRequest(url, data)
+    expected = json.loads('{"valid_object": true}')
+    self.assertEqual(actual, expected)
+    
     security_mock.CachedHTTPSConnection.assert_called_once_with(
       self.controller.config)
     requestMock.called_once_with(url, data,
       {'Content-Type': 'application/json'})
 
+    conMock.request.return_value = '{invalid_object}'
+    actual = self.controller.sendRequest(url, data)
+    expected = {'exitstatus': 1, 'log': ('Response parsing failed! Request data: ' + data
+                                         + '; Response: {invalid_object}')}
+    self.assertEqual(actual, expected)
+
+    conMock.request.side_effect = Exception()
+    actual = self.controller.sendRequest(url, data)
+    expected = {'exitstatus': 1, 'log': 'Request failed! Data: ' + data}
+
+    self.assertEqual(actual, expected)
+
+
 
   @patch.object(threading._Event, "wait")
   @patch("time.sleep")
-  @patch("json.loads")
   @patch("json.dumps")
-  def test_heartbeatWithServer(self, dumpsMock, loadsMock, sleepMock, event_mock):
-
+  def test_heartbeatWithServer(self, dumpsMock, sleepMock, event_mock):
     out = StringIO.StringIO()
     sys.stdout = out
 
@@ -340,11 +354,11 @@ class TestController(unittest.TestCase):
 
     self.controller.responseId = 1
     response = {"responseId":"2", "restartAgent":"false"}
-    loadsMock.return_value = response
+    sendRequest.return_value = response
 
     def one_heartbeat(*args, **kwargs):
       self.controller.DEBUG_STOP_HEARTBEATING = True
-      return "data"
+      return response
 
     sendRequest.side_effect = one_heartbeat
 
@@ -364,7 +378,7 @@ class TestController(unittest.TestCase):
         raise Exception()
       if len(calls) > 0:
         self.controller.DEBUG_STOP_HEARTBEATING = True
-      return "data"
+      return response
 
     # exception, retry, successful and stop
     sendRequest.side_effect = retry
@@ -374,6 +388,7 @@ class TestController(unittest.TestCase):
     self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS)
 
     # retry registration
+    self.controller.responseId = 2
     response["registrationCommand"] = "true"
     sendRequest.side_effect = one_heartbeat
     self.controller.DEBUG_STOP_HEARTBEATING = False
@@ -382,6 +397,7 @@ class TestController(unittest.TestCase):
     self.assertTrue(self.controller.repeatRegistration)
 
     # components are not mapped
+    self.controller.responseId = 2
     response["registrationCommand"] = "false"
     response["hasMappedComponents"] = False
     sendRequest.side_effect = one_heartbeat
@@ -391,6 +407,7 @@ class TestController(unittest.TestCase):
     self.assertFalse(self.controller.hasMappedComponents)
 
     # components are mapped
+    self.controller.responseId = 2
     response["hasMappedComponents"] = True
     sendRequest.side_effect = one_heartbeat
     self.controller.DEBUG_STOP_HEARTBEATING = False
@@ -399,6 +416,7 @@ class TestController(unittest.TestCase):
     self.assertTrue(self.controller.hasMappedComponents)
 
     # components are mapped
+    self.controller.responseId = 2
     del response["hasMappedComponents"]
     sendRequest.side_effect = one_heartbeat
     self.controller.DEBUG_STOP_HEARTBEATING = False
@@ -407,8 +425,8 @@ class TestController(unittest.TestCase):
     self.assertTrue(self.controller.hasMappedComponents)
 
     # wrong responseId => restart
+    self.controller.responseId = 2
     response = {"responseId":"2", "restartAgent":"false"}
-    loadsMock.return_value = response
 
     restartAgent = MagicMock(name="restartAgent")
     self.controller.restartAgent = restartAgent
@@ -491,12 +509,12 @@ class TestController(unittest.TestCase):
     self.controller.componentsUrl = "foo_url/"
     sendRequest = Mock()
     self.controller.sendRequest = sendRequest
-    self.controller.sendRequest.return_value = ('{"clusterName":"dummy_cluster_name",'
-                                                '"stackName":"dummy_stack_name",'
-                                                '"stackVersion":"dummy_stack_version",'
-                                                '"components":{"PIG":{"PIG":"CLIENT"},'
-                                                '"MAPREDUCE":{"MAPREDUCE_CLIENT":"CLIENT",'
-                                                '"JOBTRACKER":"MASTER","TASKTRACKER":"SLAVE"}}}')
+    self.controller.sendRequest.return_value = {"clusterName":"dummy_cluster_name",
+                                                "stackName":"dummy_stack_name",
+                                                "stackVersion":"dummy_stack_version",
+                                                "components":{"PIG":{"PIG":"CLIENT"},
+                                                "MAPREDUCE":{"MAPREDUCE_CLIENT":"CLIENT",
+                                                "JOBTRACKER":"MASTER","TASKTRACKER":"SLAVE"}}}
     self.controller.updateComponents("dummy_cluster_name")
     sendRequest.assert_called_with('foo_url/dummy_cluster_name', None)
     services_expected = [u'MAPREDUCE', u'PIG']

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc5d82ca/ambari-agent/src/test/python/ambari_agent/TestSecurity.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestSecurity.py b/ambari-agent/src/test/python/ambari_agent/TestSecurity.py
index db4b25e..d8955cf 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestSecurity.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestSecurity.py
@@ -325,8 +325,15 @@ class TestSecurity(unittest.TestCase):
       # expected
       pass
 
-
-
+    # Test malformed JSON response
+    open_mock.return_value.write.reset_mock()
+    loads_mock.side_effect = Exception()
+    try:
+      man.reqSignCrt()
+      self.fail("Expected exception here")
+    except ssl.SSLError:
+      pass
+    self.assertFalse(open_mock.return_value.write.called)
 
   @patch("subprocess.Popen")
   @patch("subprocess.Popen.communicate")
@@ -339,6 +346,34 @@ class TestSecurity(unittest.TestCase):
     self.assertTrue(popen_mock.called)
     self.assertTrue(communicate_mock.called)
 
+  @patch("ambari_agent.hostname.hostname")
+  @patch('__builtin__.open', create=True, autospec=True)
+  @patch('urllib2.urlopen')
+  @patch.dict('os.environ', {'DUMMY_PASSPHRASE': 'dummy-passphrase'})
+  def test_reqSignCrt_malformedJson(self, urlopen_mock, open_mock, hostname_mock):
+    hostname_mock.return_value = "dummy-hostname"
+    open_mock.return_value.read.return_value = "dummy_request"
+    self.config.set('security', 'keysdir', '/dummy-keysdir')
+    self.config.set('security', 'passphrase_env_var_name', 'DUMMY_PASSPHRASE')
+    man = CertificateManager(self.config)
+
+    # test valid JSON response
+    urlopen_mock.return_value.read.return_value = '{"result": "OK", "signedCa":"dummy"}'
+    try:
+      man.reqSignCrt()
+    except ssl.SSLError:
+      self.fail("Unexpected exception!")
+    open_mock.return_value.write.assert_called_with(u'dummy')
+
+    # test malformed JSON response
+    open_mock.return_value.write.reset_mock()
+    urlopen_mock.return_value.read.return_value = '{malformed_object}'
+    try:
+      man.reqSignCrt()
+      self.fail("Expected exception!")
+    except ssl.SSLError:
+      pass
+    self.assertFalse(open_mock.return_value.write.called)
 
   @patch.object(security.CertificateManager, "checkCertExists")
   def test_initSecurity(self, checkCertExists_method):