You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/12/13 20:00:59 UTC

svn commit: r1213865 - in /incubator/ambari/trunk: ./ agent/src/main/python/ambari_agent/ agent/src/test/python/ client/src/main/java/org/apache/ambari/common/rest/agent/ controller/src/main/java/org/apache/ambari/controller/ controller/src/main/java/o...

Author: ddas
Date: Tue Dec 13 19:00:58 2011
New Revision: 1213865

URL: http://svn.apache.org/viewvc?rev=1213865&view=rev
Log:
AMBARI-157. Enhances the agent to make it puppet aware.

Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/Controller.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py
    incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py
    incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
    incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/FSMDriver.java
    incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Tue Dec 13 19:00:58 2011
@@ -1,6 +1,8 @@
 Ambari Change log
 
 Release 0.1.0 - unreleased
+  
+  AMBARI-157. Enhances the agent to make it puppet aware (ddas)
 
   AMBARI-156. Clean up the puppet example stack. (omalley)
 

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py Tue Dec 13 19:00:58 2011
@@ -23,11 +23,12 @@ import logging.handlers
 import Queue
 import threading
 from shell import shellRunner
-from FileUtil import writeFile, createStructure, deleteStructure
+from FileUtil import writeFile, createStructure, deleteStructure, getFilePath, appendToFile
 from shell import shellRunner
 import json
 import os
 import time
+import subprocess
 
 logger = logging.getLogger()
 installScriptHash = -1
@@ -56,6 +57,7 @@ class ActionQueue(threading.Thread):
   def put(self, response):
     if 'actions' in response:
       actions = response['actions']
+      print(actions)
       # for the servers, take a diff of what's running, and what the controller
       # asked the agent to start. Kill all those servers that the controller
       # didn't ask us to start
@@ -90,9 +92,12 @@ class ActionQueue(threading.Thread):
                      'CREATE_STRUCTURE_ACTION'   : self.createStructureAction,
                      'DELETE_STRUCTURE_ACTION'   : self.deleteStructureAction,
                      'WRITE_FILE_ACTION'         : self.writeFileAction,
-                     'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction
+                     'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction,
+                     'NO_OP_ACTION'              : self.noOpAction
                    }
         try:
+          print("ACTION KIND")
+          print(action['kind'])
           result = switches.get(action['kind'], self.unknownAction)(action)
         except Exception, err:
           logger.info(err)
@@ -112,7 +117,13 @@ class ActionQueue(threading.Thread):
 
   # Generate default action response
   def genResult(self, action):
-    result = { 
+    result={}
+    if (action['kind'] == 'INSTALL_AND_CONFIG_ACTION' or action['kind'] == 'NO_OP_ACTION'):
+      result = {
+               'id'                        : action['id'],
+             }
+    else:
+      result = { 
                'id'                        : action['id'],
                'clusterId'                 : action['clusterId'],
                'kind'                      : action['kind'],
@@ -135,24 +146,43 @@ class ActionQueue(threading.Thread):
       action['user'], result)
 
   # Write file action
-  def writeFileAction(self, action):
+  def writeFileAction(self, action, fileName=""):
     result = self.genResult(action)
-    return writeFile(action, result)
+    return writeFile(action, result, fileName)
+
+  # get the install file
+  def getInstallFilename(self,id):
+    return "ambari-install-file-"+id
 
   # Install and configure action
   def installAndConfigAction(self, action):
-    w = self.writeFileAction(action)
+    w = self.writeFileAction(action,self.getInstallFilename(action['id']))
     commandResult = {}
     if w['exitCode']!=0:
-      commandResult['output'] = out
-      commandResult['error'] = err
-      commandResult['exitCode'] = exitCode
+      commandResult['error'] = w['error'] 
+      commandResult['exitCode'] = w['exitCode']
       r['commandResult'] = commandResult
       return r
+    logger.info("Command to run ")
+     
+    if 'command' not in action:
+      # this is hardcoded to do puppet specific stuff for now
+      # append the content of the puppet file to the file written above
+      filepath = getFilePath(action,self.getInstallFilename(action['id'])) 
+      p = self.sh.run(['/bin/cat',AmbariConfig.config.get('puppet','driver')])
+      if p['exitCode']!=0:
+        commandResult['error'] = p['error']
+        commandResult['exitCode'] = p['exitCode']
+        r['commandResult'] = commandResult
+        return r
+      print("The contents of the static file " + p['output'])
+      appendToFile(p['output'],filepath) 
+      action['command']=[AmbariConfig.config.get('puppet','commandpath'), filepath]
+      print ("PUPPET COMMAND : " + action['command'])
+    logger.info(action['command'])
     r = self.sh.run(action['command'])
     if r['exitCode'] != 0:
-      commandResult['output'] = out
-      commandResult['error'] = err
+      commandResult['error'] = r['error']
     else:
       installScriptHash = action['id'] 
     commandResult['exitCode'] = r['exitCode']
@@ -182,6 +212,10 @@ class ActionQueue(threading.Thread):
     result['exitCode'] = 0
     return deleteStructure(action, result)
 
+  def noOpAction(self, action):
+    r = {'exitCode' : 0 }
+    return r
+
   # Handle unknown action
   def unknownAction(self, action):
     logger.error('Unknown action: %s' % action['id'])

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py Tue Dec 13 19:00:58 2011
@@ -32,6 +32,12 @@ password=controller
 
 [agent]
 prefix=/tmp/ambari
+
+[puppet]
+prefix=/homes/ddas/puppet
+commandpath='/usr/local/bin/puppet apply --modulepath /home/puppet/puppet-ambari/modules'
+driver=/home/puppet/puppet-ambari/manifests/site.pp
+
 """
 s = StringIO.StringIO(content)
 config.readfp(s)

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/Controller.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/Controller.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/Controller.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/Controller.py Tue Dec 13 19:00:58 2011
@@ -89,6 +89,7 @@ class Controller(threading.Thread):
           logger.error(err.code)
         else:
           logger.error("Unable to connect to: "+self.url)
+          traceback.print_exc()
       if self.actionQueue.isIdle():
         time.sleep(30)
       else:

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py Tue Dec 13 19:00:58 2011
@@ -32,20 +32,52 @@ import AmbariConfig
 
 logger = logging.getLogger()
 
-def writeFile(action, result):
+def getFilePath(action, fileName=""):
+  #Change the method signature to take the individual action fields
+  pathComp=""
+  if 'clusterId' in action:
+    pathComp = action['clusterId']
+  if 'role' in action:
+    pathComp = pathComp + "-" + action['role'] 
+  path = os.path.join(AmbariConfig.config.get('agent','prefix'),
+                      "clusters", 
+                      pathComp)
+  fullPathName=""
+  if fileName != "":
+    fullPathName=os.path.join(path, fileName)
+  else:
+    fileInfo = action['file']
+    fullPathName=os.path.join(path, fileInfo['path'])
+  return fullPathName
+  
+def appendToFile(data,absolutePath):
+  f = open(absolutePath, 'w')
+  f.write(data)
+  f.close()
+
+def writeFile(action, result, fileName=""):
   fileInfo = action['file']
+  pathComp=""
+  if 'clusterId' in action:
+    pathComp = action['clusterId']
+  if 'role' in action:
+    pathComp = pathComp + "-" + action['role'] 
   try:
     path = os.path.join(AmbariConfig.config.get('agent','prefix'),
                         "clusters", 
-                        action['clusterId']+"-"+action['role'])
-    logger.info("path: %s" % path)
-    user=fileInfo['owner']
-    if user is None:
-      user=getpass.getuser()
-    group=fileInfo['group']
-    if group is None:
-      group=getpass.getgroup()
-    filename=os.path.join(path, fileInfo['path'])
+                        pathComp)
+    user=getpass.getuser()
+    if 'owner' in fileInfo:
+      user=fileInfo['owner']
+    group=os.getgid()
+    if 'group' in fileInfo:
+      group=fileInfo['group']
+    fullPathName=""
+    if fileName != "":
+      fullPathName=os.path.join(path, fileName)
+    else:
+      fullPathName=os.path.join(path, fileInfo['path'])
+    logger.info("path in writeFile: %s" % fullPathName)
     content=fileInfo['data']
     try:
       if isinstance(user, int)!=True:
@@ -53,18 +85,20 @@ def writeFile(action, result):
       if isinstance(group, int)!=True:
         group=getgrnam(group)[2]
     except Exception:
-      logger.warn("can not find user uid/gid: (%s/%s) for writing %s" % (user, group, filename))
-    if fileInfo['permission'] is not None:
-      permission=fileInfo['permission']
+      logger.warn("can not find user uid/gid: (%s/%s) for writing %s" % (user, group, fullPathName))
+    if 'permission' in fileInfo:
+      if fileInfo['permission'] is not None:
+        permission=fileInfo['permission']
     else:
       permission=0750
     oldMask = os.umask(0)
-    if fileInfo['umask'] is not None: 
-      umask=int(fileInfo['umask'])
+    if 'umask' in fileInfo:
+      if fileInfo['umask'] is not None: 
+        umask=int(fileInfo['umask'])
     else:
       umask=oldMask 
     os.umask(int(umask))
-    prefix = os.path.dirname(filename)
+    prefix = os.path.dirname(fullPathName)
     try:
       os.makedirs(prefix)
     except OSError as err:
@@ -72,12 +106,12 @@ def writeFile(action, result):
         pass
       else:
         raise
-    f = open(filename, 'w')
+    f = open(fullPathName, 'w')
     f.write(content)
     f.close()
     if os.getuid()==0:
-      os.chmod(filename, permission)
-      os.chown(filename, user, group)
+      os.chmod(fullPathName, permission)
+      os.chown(fullPathName, user, group)
     os.umask(oldMask)
     result['exitCode'] = 0
   except Exception, err:

Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py Tue Dec 13 19:00:58 2011
@@ -48,6 +48,7 @@ class Heartbeat:
       heartbeat['actionResults'] = queueResult
     if len(installedRoleStates)!=0:
       heartbeat['installedRoleStates'] = installedRoleStates
+    print json.dumps(heartbeat)
     return heartbeat
 
 def main(argv=None):

Modified: incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py (original)
+++ incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py Tue Dec 13 19:00:58 2011
@@ -22,10 +22,13 @@ from unittest import TestCase
 import os, errno
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.AmbariConfig import AmbariConfig
+from ambari_agent.FileUtil import getFilePath
 
 class TestAgentActions(TestCase):
   def test_installAndConfigAction(self):
-    path = "/tmp/ambari_file_test/_file_write_test_1"
+    action={'id' : 'tttt'}
+    actionQueue = ActionQueue(AmbariConfig().getConfig())
+    path = actionQueue.getInstallFilename(action['id'])
     configFile = {
       "data"       : "test",
       "owner"      : os.getuid(),
@@ -39,12 +42,11 @@ class TestAgentActions(TestCase):
     #we just want to ensure that 'ls' can run on the data file (in the actual world
     #this 'ls' would be a puppet or a chef command that would work on a data
     #file
+    path=getFilePath(action,path)
+    print ("path : " + path)
     action = { 
       'id' : 'tttt',
       'kind' : 'INSTALL_AND_CONFIG_ACTION',
-      'clusterId' : 'abc', 
-      'role' : 'namenode', 
-      'component' : 'hdfs', 
       'workDirComponent' : 'abc-hdfs',
       'file' : configFile,
       'clusterDefinitionRevision' : 12,
@@ -53,5 +55,6 @@ class TestAgentActions(TestCase):
     result = { }
     actionQueue = ActionQueue(AmbariConfig().getConfig())
     result = actionQueue.installAndConfigAction(action)
+    print(result)
     self.assertEqual(result['exitCode'], 0, "installAndConfigAction test failed. Returned %d " % result['exitCode'])
     self.assertEqual(result['output'], path + "\n", "installAndConfigAction test failed Returned %s " % result['output'])

Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java (original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java Tue Dec 13 19:00:58 2011
@@ -162,7 +162,7 @@ public class Action {
   public static enum Kind {
     RUN_ACTION, START_ACTION, STOP_ACTION, STATUS_ACTION, 
     CREATE_STRUCTURE_ACTION, DELETE_STRUCTURE_ACTION, WRITE_FILE_ACTION,
-    INSTALL_AND_CONFIG_ACTION;
+    INSTALL_AND_CONFIG_ACTION,NO_OP_ACTION;
     public static class KindAdaptor extends XmlAdapter<String, Kind> {
       @Override
       public String marshal(Kind obj) throws Exception {

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java Tue Dec 13 19:00:58 2011
@@ -79,11 +79,11 @@ public class HeartbeatHandler {
     Date heartbeatTime = new Date(System.currentTimeMillis());
     nodes.checkAndUpdateNode(hostname, heartbeatTime);
     
-    ControllerResponse response = 
+    ControllerResponse prevResponse = 
         agentToHeartbeatResponseMap.get(heartbeat.getHostname());
-    if (response != null) {
-      if (response.getResponseId() == heartbeat.getResponseId()) {
-        return response; //duplicate heartbeat
+    if (prevResponse != null) {
+      if (prevResponse.getResponseId() != heartbeat.getResponseId()) {
+        return prevResponse; //duplicate heartbeat or the agent restarted
       }
     }
 
@@ -152,7 +152,7 @@ public class HeartbeatHandler {
                 //check the expected state of the agent and whether the start
                 //was successful
                 if (wasStartRoleSuccessful(clusterIdAndRev, 
-                    service.getServiceName(), role.getRoleName(), response, 
+                    service.getServiceName(), role.getRoleName(), prevResponse, 
                     heartbeat)) {
                   //raise an event to the state machine for a successful 
                   //role-start
@@ -168,7 +168,7 @@ public class HeartbeatHandler {
                 //raise an event to the state machine for a successful 
                 //role-stop instance
                 if (wasStopRoleSuccessful(clusterIdAndRev, 
-                    service.getServiceName(), role.getRoleName(), response, 
+                    service.getServiceName(), role.getRoleName(), prevResponse, 
                     heartbeat)) {
                   stateMachineInvoker.getAMBARIEventHandler()
                   .handle(new RoleEvent(RoleEventType.STOP_SUCCESS, role));
@@ -190,6 +190,11 @@ public class HeartbeatHandler {
       List<Action> allActions, HeartBeat heartbeat) {
     ControllerResponse r = new ControllerResponse();
     r.setResponseId(responseId);
+    if (allActions.size() > 0) {//TODO: REMOVE THIS
+      Action a = new Action();
+      a.setKind(Kind.NO_OP_ACTION);
+      allActions.add(a);
+    }
     r.setActions(allActions);
     agentToHeartbeatResponseMap.put(heartbeat.getHostname(), r);
     return r;
@@ -229,11 +234,6 @@ public class HeartbeatHandler {
       List<Action> allActions) {
     ConfigFile file = new ConfigFile();
     file.setData(script);
-    //TODO: this should be written in Ambari's scratch space directory
-    //this file is the complete install/config script. Note that the
-    //script includes install/config snippets for all clusters the 
-    //node belongs to
-    file.setPath("/tmp/ambari_install_script" + script.hashCode());
     
     Action action = new Action();
     action.setFile(file);

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/FSMDriver.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/FSMDriver.java?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/FSMDriver.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/FSMDriver.java Tue Dec 13 19:00:58 2011
@@ -19,7 +19,9 @@
 package org.apache.ambari.resource.statemachine;
 
 import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.ambari.controller.Cluster;
 
@@ -27,35 +29,45 @@ import com.google.inject.Singleton;
 
 @Singleton
 public class FSMDriver implements FSMDriverInterface {
-  private ConcurrentMap<String, ClusterFSM> clusters;
-  
+  private Map<String, ClusterFSM> clusters = 
+      Collections.synchronizedMap(new HashMap<String,ClusterFSM>());
+  @Override
   public ClusterFSM createCluster(Cluster cluster, int revision) 
       throws IOException {
     ClusterFSM clusterFSM = new ClusterImpl(cluster, revision);
     clusters.put(cluster.getName(), clusterFSM);
     return clusterFSM;
   }
-  
+  @Override
   public void startCluster(String clusterId) {
     ClusterFSM clusterFSM = clusters.get(clusterId);
-    clusterFSM.activate();
+    if (clusterFSM != null) {
+      clusterFSM.activate();
+    }
   }
-  
+  @Override
   public void stopCluster(String clusterId) {
     ClusterFSM clusterFSM = clusters.get(clusterId);
-    clusterFSM.deactivate();
+    if (clusterFSM != null) {
+      clusterFSM.deactivate();
+    }
   }
-  
+  @Override
   public void deleteCluster(String clusterId) {
-    ClusterFSM clusterFSM = clusters.get(clusterId);
-    clusterFSM.deactivate();
-    clusterFSM.terminate();
-    clusters.remove(clusterId);
+    ClusterFSM clusterFSM = clusters.remove(clusterId);
+    if (clusterFSM != null) { 
+      clusterFSM.deactivate();
+      clusterFSM.terminate();
+    }
   }
-  
+  @Override
   public String getClusterState(String clusterId,
       long clusterDefinitionRev) {
-    return clusters.get(clusterId).getClusterState();
+    ClusterFSM clusterFSM = clusters.get(clusterId);
+    if (clusterFSM != null) {
+      return clusterFSM.getClusterState();
+    }
+    return null;
   }
 
   @Override

Modified: incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java (original)
+++ incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java Tue Dec 13 19:00:58 2011
@@ -170,7 +170,7 @@ public class TestHeartbeat {
         driver, invoker);
     ControllerResponse response = handler.processHeartBeat(heartbeat);
     List<Action> actions = response.getActions();
-    assert(actions.size() == 1);
+    assert(actions.size() == 2);
     assert(actions.get(0).getKind() == Action.Kind.INSTALL_AND_CONFIG_ACTION);
   }