You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dd...@apache.org on 2011/12/13 20:00:59 UTC
svn commit: r1213865 - in /incubator/ambari/trunk: ./
agent/src/main/python/ambari_agent/ agent/src/test/python/
client/src/main/java/org/apache/ambari/common/rest/agent/
controller/src/main/java/org/apache/ambari/controller/
controller/src/main/java/o...
Author: ddas
Date: Tue Dec 13 19:00:58 2011
New Revision: 1213865
URL: http://svn.apache.org/viewvc?rev=1213865&view=rev
Log:
AMBARI-157. Enhances the agent to make it puppet aware.
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/Controller.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py
incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py
incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/FSMDriver.java
incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Tue Dec 13 19:00:58 2011
@@ -1,6 +1,8 @@
Ambari Change log
Release 0.1.0 - unreleased
+
+ AMBARI-157. Enhances the agent to make it puppet aware (ddas)
AMBARI-156. Clean up the puppet example stack. (omalley)
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/ActionQueue.py Tue Dec 13 19:00:58 2011
@@ -23,11 +23,12 @@ import logging.handlers
import Queue
import threading
from shell import shellRunner
-from FileUtil import writeFile, createStructure, deleteStructure
+from FileUtil import writeFile, createStructure, deleteStructure, getFilePath, appendToFile
from shell import shellRunner
import json
import os
import time
+import subprocess
logger = logging.getLogger()
installScriptHash = -1
@@ -56,6 +57,7 @@ class ActionQueue(threading.Thread):
def put(self, response):
if 'actions' in response:
actions = response['actions']
+ print(actions)
# for the servers, take a diff of what's running, and what the controller
# asked the agent to start. Kill all those servers that the controller
# didn't ask us to start
@@ -90,9 +92,12 @@ class ActionQueue(threading.Thread):
'CREATE_STRUCTURE_ACTION' : self.createStructureAction,
'DELETE_STRUCTURE_ACTION' : self.deleteStructureAction,
'WRITE_FILE_ACTION' : self.writeFileAction,
- 'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction
+ 'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction,
+ 'NO_OP_ACTION' : self.noOpAction
}
try:
+ print("ACTION KIND")
+ print(action['kind'])
result = switches.get(action['kind'], self.unknownAction)(action)
except Exception, err:
logger.info(err)
@@ -112,7 +117,13 @@ class ActionQueue(threading.Thread):
# Generate default action response
def genResult(self, action):
- result = {
+ result={}
+ if (action['kind'] == 'INSTALL_AND_CONFIG_ACTION' or action['kind'] == 'NO_OP_ACTION'):
+ result = {
+ 'id' : action['id'],
+ }
+ else:
+ result = {
'id' : action['id'],
'clusterId' : action['clusterId'],
'kind' : action['kind'],
@@ -135,24 +146,43 @@ class ActionQueue(threading.Thread):
action['user'], result)
# Write file action
- def writeFileAction(self, action):
+ def writeFileAction(self, action, fileName=""):
result = self.genResult(action)
- return writeFile(action, result)
+ return writeFile(action, result, fileName)
+
+ # get the install file
+ def getInstallFilename(self,id):
+ return "ambari-install-file-"+id
# Install and configure action
def installAndConfigAction(self, action):
- w = self.writeFileAction(action)
+ w = self.writeFileAction(action,self.getInstallFilename(action['id']))
commandResult = {}
if w['exitCode']!=0:
- commandResult['output'] = out
- commandResult['error'] = err
- commandResult['exitCode'] = exitCode
+ commandResult['error'] = w['error']
+ commandResult['exitCode'] = w['exitCode']
r['commandResult'] = commandResult
return r
+ logger.info("Command to run ")
+
+ if 'command' not in action:
+ # this is hardcoded to do puppet specific stuff for now
+ # append the content of the puppet file to the file written above
+ filepath = getFilePath(action,self.getInstallFilename(action['id']))
+ p = self.sh.run(['/bin/cat',AmbariConfig.config.get('puppet','driver')])
+ if p['exitCode']!=0:
+ commandResult['error'] = p['error']
+ commandResult['exitCode'] = p['exitCode']
+ r['commandResult'] = commandResult
+ return r
+ print("The contents of the static file " + p['output'])
+ appendToFile(p['output'],filepath)
+ action['command']=[AmbariConfig.config.get('puppet','commandpath'), filepath]
+ print ("PUPPET COMMAND : " + action['command'])
+ logger.info(action['command'])
r = self.sh.run(action['command'])
if r['exitCode'] != 0:
- commandResult['output'] = out
- commandResult['error'] = err
+ commandResult['error'] = r['error']
else:
installScriptHash = action['id']
commandResult['exitCode'] = r['exitCode']
@@ -182,6 +212,10 @@ class ActionQueue(threading.Thread):
result['exitCode'] = 0
return deleteStructure(action, result)
+ def noOpAction(self, action):
+ r = {'exitCode' : 0 }
+ return r
+
# Handle unknown action
def unknownAction(self, action):
logger.error('Unknown action: %s' % action['id'])
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/AmbariConfig.py Tue Dec 13 19:00:58 2011
@@ -32,6 +32,12 @@ password=controller
[agent]
prefix=/tmp/ambari
+
+[puppet]
+prefix=/homes/ddas/puppet
+commandpath='/usr/local/bin/puppet apply --modulepath /home/puppet/puppet-ambari/modules'
+driver=/home/puppet/puppet-ambari/manifests/site.pp
+
"""
s = StringIO.StringIO(content)
config.readfp(s)
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/Controller.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/Controller.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/Controller.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/Controller.py Tue Dec 13 19:00:58 2011
@@ -89,6 +89,7 @@ class Controller(threading.Thread):
logger.error(err.code)
else:
logger.error("Unable to connect to: "+self.url)
+ traceback.print_exc()
if self.actionQueue.isIdle():
time.sleep(30)
else:
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/FileUtil.py Tue Dec 13 19:00:58 2011
@@ -32,20 +32,52 @@ import AmbariConfig
logger = logging.getLogger()
-def writeFile(action, result):
+def getFilePath(action, fileName=""):
+ #Change the method signature to take the individual action fields
+ pathComp=""
+ if 'clusterId' in action:
+ pathComp = action['clusterId']
+ if 'role' in action:
+ pathComp = pathComp + "-" + action['role']
+ path = os.path.join(AmbariConfig.config.get('agent','prefix'),
+ "clusters",
+ pathComp)
+ fullPathName=""
+ if fileName != "":
+ fullPathName=os.path.join(path, fileName)
+ else:
+ fileInfo = action['file']
+ fullPathName=os.path.join(path, fileInfo['path'])
+ return fullPathName
+
+def appendToFile(data,absolutePath):
+ f = open(absolutePath, 'w')
+ f.write(data)
+ f.close()
+
+def writeFile(action, result, fileName=""):
fileInfo = action['file']
+ pathComp=""
+ if 'clusterId' in action:
+ pathComp = action['clusterId']
+ if 'role' in action:
+ pathComp = pathComp + "-" + action['role']
try:
path = os.path.join(AmbariConfig.config.get('agent','prefix'),
"clusters",
- action['clusterId']+"-"+action['role'])
- logger.info("path: %s" % path)
- user=fileInfo['owner']
- if user is None:
- user=getpass.getuser()
- group=fileInfo['group']
- if group is None:
- group=getpass.getgroup()
- filename=os.path.join(path, fileInfo['path'])
+ pathComp)
+ user=getpass.getuser()
+ if 'owner' in fileInfo:
+ user=fileInfo['owner']
+ group=os.getgid()
+ if 'group' in fileInfo:
+ group=fileInfo['group']
+ fullPathName=""
+ if fileName != "":
+ fullPathName=os.path.join(path, fileName)
+ else:
+ fullPathName=os.path.join(path, fileInfo['path'])
+ logger.info("path in writeFile: %s" % fullPathName)
content=fileInfo['data']
try:
if isinstance(user, int)!=True:
@@ -53,18 +85,20 @@ def writeFile(action, result):
if isinstance(group, int)!=True:
group=getgrnam(group)[2]
except Exception:
- logger.warn("can not find user uid/gid: (%s/%s) for writing %s" % (user, group, filename))
- if fileInfo['permission'] is not None:
- permission=fileInfo['permission']
+ logger.warn("can not find user uid/gid: (%s/%s) for writing %s" % (user, group, fullPathName))
+ if 'permission' in fileInfo:
+ if fileInfo['permission'] is not None:
+ permission=fileInfo['permission']
else:
permission=0750
oldMask = os.umask(0)
- if fileInfo['umask'] is not None:
- umask=int(fileInfo['umask'])
+ if 'umask' in fileInfo:
+ if fileInfo['umask'] is not None:
+ umask=int(fileInfo['umask'])
else:
umask=oldMask
os.umask(int(umask))
- prefix = os.path.dirname(filename)
+ prefix = os.path.dirname(fullPathName)
try:
os.makedirs(prefix)
except OSError as err:
@@ -72,12 +106,12 @@ def writeFile(action, result):
pass
else:
raise
- f = open(filename, 'w')
+ f = open(fullPathName, 'w')
f.write(content)
f.close()
if os.getuid()==0:
- os.chmod(filename, permission)
- os.chown(filename, user, group)
+ os.chmod(fullPathName, permission)
+ os.chown(fullPathName, user, group)
os.umask(oldMask)
result['exitCode'] = 0
except Exception, err:
Modified: incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py (original)
+++ incubator/ambari/trunk/agent/src/main/python/ambari_agent/Heartbeat.py Tue Dec 13 19:00:58 2011
@@ -48,6 +48,7 @@ class Heartbeat:
heartbeat['actionResults'] = queueResult
if len(installedRoleStates)!=0:
heartbeat['installedRoleStates'] = installedRoleStates
+ print json.dumps(heartbeat)
return heartbeat
def main(argv=None):
Modified: incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py (original)
+++ incubator/ambari/trunk/agent/src/test/python/TestAgentActions.py Tue Dec 13 19:00:58 2011
@@ -22,10 +22,13 @@ from unittest import TestCase
import os, errno
from ambari_agent.ActionQueue import ActionQueue
from ambari_agent.AmbariConfig import AmbariConfig
+from ambari_agent.FileUtil import getFilePath
class TestAgentActions(TestCase):
def test_installAndConfigAction(self):
- path = "/tmp/ambari_file_test/_file_write_test_1"
+ action={'id' : 'tttt'}
+ actionQueue = ActionQueue(AmbariConfig().getConfig())
+ path = actionQueue.getInstallFilename(action['id'])
configFile = {
"data" : "test",
"owner" : os.getuid(),
@@ -39,12 +42,11 @@ class TestAgentActions(TestCase):
#we just want to ensure that 'ls' can run on the data file (in the actual world
#this 'ls' would be a puppet or a chef command that would work on a data
#file
+ path=getFilePath(action,path)
+ print ("path : " + path)
action = {
'id' : 'tttt',
'kind' : 'INSTALL_AND_CONFIG_ACTION',
- 'clusterId' : 'abc',
- 'role' : 'namenode',
- 'component' : 'hdfs',
'workDirComponent' : 'abc-hdfs',
'file' : configFile,
'clusterDefinitionRevision' : 12,
@@ -53,5 +55,6 @@ class TestAgentActions(TestCase):
result = { }
actionQueue = ActionQueue(AmbariConfig().getConfig())
result = actionQueue.installAndConfigAction(action)
+ print(result)
self.assertEqual(result['exitCode'], 0, "installAndConfigAction test failed. Returned %d " % result['exitCode'])
self.assertEqual(result['output'], path + "\n", "installAndConfigAction test failed Returned %s " % result['output'])
Modified: incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java (original)
+++ incubator/ambari/trunk/client/src/main/java/org/apache/ambari/common/rest/agent/Action.java Tue Dec 13 19:00:58 2011
@@ -162,7 +162,7 @@ public class Action {
public static enum Kind {
RUN_ACTION, START_ACTION, STOP_ACTION, STATUS_ACTION,
CREATE_STRUCTURE_ACTION, DELETE_STRUCTURE_ACTION, WRITE_FILE_ACTION,
- INSTALL_AND_CONFIG_ACTION;
+ INSTALL_AND_CONFIG_ACTION,NO_OP_ACTION;
public static class KindAdaptor extends XmlAdapter<String, Kind> {
@Override
public String marshal(Kind obj) throws Exception {
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java Tue Dec 13 19:00:58 2011
@@ -79,11 +79,11 @@ public class HeartbeatHandler {
Date heartbeatTime = new Date(System.currentTimeMillis());
nodes.checkAndUpdateNode(hostname, heartbeatTime);
- ControllerResponse response =
+ ControllerResponse prevResponse =
agentToHeartbeatResponseMap.get(heartbeat.getHostname());
- if (response != null) {
- if (response.getResponseId() == heartbeat.getResponseId()) {
- return response; //duplicate heartbeat
+ if (prevResponse != null) {
+ if (prevResponse.getResponseId() != heartbeat.getResponseId()) {
+ return prevResponse; //duplicate heartbeat or the agent restarted
}
}
@@ -152,7 +152,7 @@ public class HeartbeatHandler {
//check the expected state of the agent and whether the start
//was successful
if (wasStartRoleSuccessful(clusterIdAndRev,
- service.getServiceName(), role.getRoleName(), response,
+ service.getServiceName(), role.getRoleName(), prevResponse,
heartbeat)) {
//raise an event to the state machine for a successful
//role-start
@@ -168,7 +168,7 @@ public class HeartbeatHandler {
//raise an event to the state machine for a successful
//role-stop instance
if (wasStopRoleSuccessful(clusterIdAndRev,
- service.getServiceName(), role.getRoleName(), response,
+ service.getServiceName(), role.getRoleName(), prevResponse,
heartbeat)) {
stateMachineInvoker.getAMBARIEventHandler()
.handle(new RoleEvent(RoleEventType.STOP_SUCCESS, role));
@@ -190,6 +190,11 @@ public class HeartbeatHandler {
List<Action> allActions, HeartBeat heartbeat) {
ControllerResponse r = new ControllerResponse();
r.setResponseId(responseId);
+ if (allActions.size() > 0) {//TODO: REMOVE THIS
+ Action a = new Action();
+ a.setKind(Kind.NO_OP_ACTION);
+ allActions.add(a);
+ }
r.setActions(allActions);
agentToHeartbeatResponseMap.put(heartbeat.getHostname(), r);
return r;
@@ -229,11 +234,6 @@ public class HeartbeatHandler {
List<Action> allActions) {
ConfigFile file = new ConfigFile();
file.setData(script);
- //TODO: this should be written in Ambari's scratch space directory
- //this file is the complete install/config script. Note that the
- //script includes install/config snippets for all clusters the
- //node belongs to
- file.setPath("/tmp/ambari_install_script" + script.hashCode());
Action action = new Action();
action.setFile(file);
Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/FSMDriver.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/FSMDriver.java?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/FSMDriver.java (original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/FSMDriver.java Tue Dec 13 19:00:58 2011
@@ -19,7 +19,9 @@
package org.apache.ambari.resource.statemachine;
import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.ambari.controller.Cluster;
@@ -27,35 +29,45 @@ import com.google.inject.Singleton;
@Singleton
public class FSMDriver implements FSMDriverInterface {
- private ConcurrentMap<String, ClusterFSM> clusters;
-
+ private Map<String, ClusterFSM> clusters =
+ Collections.synchronizedMap(new HashMap<String,ClusterFSM>());
+ @Override
public ClusterFSM createCluster(Cluster cluster, int revision)
throws IOException {
ClusterFSM clusterFSM = new ClusterImpl(cluster, revision);
clusters.put(cluster.getName(), clusterFSM);
return clusterFSM;
}
-
+ @Override
public void startCluster(String clusterId) {
ClusterFSM clusterFSM = clusters.get(clusterId);
- clusterFSM.activate();
+ if (clusterFSM != null) {
+ clusterFSM.activate();
+ }
}
-
+ @Override
public void stopCluster(String clusterId) {
ClusterFSM clusterFSM = clusters.get(clusterId);
- clusterFSM.deactivate();
+ if (clusterFSM != null) {
+ clusterFSM.deactivate();
+ }
}
-
+ @Override
public void deleteCluster(String clusterId) {
- ClusterFSM clusterFSM = clusters.get(clusterId);
- clusterFSM.deactivate();
- clusterFSM.terminate();
- clusters.remove(clusterId);
+ ClusterFSM clusterFSM = clusters.remove(clusterId);
+ if (clusterFSM != null) {
+ clusterFSM.deactivate();
+ clusterFSM.terminate();
+ }
}
-
+ @Override
public String getClusterState(String clusterId,
long clusterDefinitionRev) {
- return clusters.get(clusterId).getClusterState();
+ ClusterFSM clusterFSM = clusters.get(clusterId);
+ if (clusterFSM != null) {
+ return clusterFSM.getClusterState();
+ }
+ return null;
}
@Override
Modified: incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java?rev=1213865&r1=1213864&r2=1213865&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java (original)
+++ incubator/ambari/trunk/controller/src/test/java/org/apache/ambari/controller/TestHeartbeat.java Tue Dec 13 19:00:58 2011
@@ -170,7 +170,7 @@ public class TestHeartbeat {
driver, invoker);
ControllerResponse response = handler.processHeartBeat(heartbeat);
List<Action> actions = response.getActions();
- assert(actions.size() == 1);
+ assert(actions.size() == 2);
assert(actions.get(0).getKind() == Action.Kind.INSTALL_AND_CONFIG_ACTION);
}