You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2012/11/07 09:13:24 UTC
svn commit: r1406489 [2/19] - in /incubator/ambari/branches/AMBARI-666: ./
ambari-agent/ ambari-agent/conf/ ambari-agent/conf/unix/
ambari-agent/src/main/puppet/manifestloader/
ambari-agent/src/main/puppet/modules/configgenerator/manifests/
ambari-agen...
Modified: incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/main.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/main.py?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/main.py (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/main.py Wed Nov 7 08:13:12 2012
@@ -32,6 +32,7 @@ from shell import getTempFiles
from shell import killstaleprocesses
import AmbariConfig
from security import CertificateManager
+from NetUtil import NetUtil
logger = logging.getLogger()
agentPid = os.getpid()
@@ -79,6 +80,9 @@ def debug(sig, frame):
message += ''.join(traceback.format_stack(frame))
logger.info(message)
+
+
+
def main():
global config
default_cfg = { 'agent' : { 'prefix' : '/home/ambari' } }
@@ -111,8 +115,7 @@ def main():
#retCode = createDaemon()
pid = str(os.getpid())
file(pidfile, 'w').write(pid)
-
-
+
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - %(message)s")
rotateLog = logging.handlers.RotatingFileHandler(logfile, "a", 10000000, 10)
@@ -123,7 +126,7 @@ def main():
# Check for ambari configuration file.
try:
config = AmbariConfig.config
- if(os.path.exists('/etc/ambari/ambari.ini')):
+ if os.path.exists('/etc/ambari/ambari.ini'):
config.read('/etc/ambari/ambari.ini')
AmbariConfig.setConfig(config)
else:
@@ -132,8 +135,13 @@ def main():
logger.warn(err)
killstaleprocesses()
- logger.info("Connecting to Server at: "+config.get('server', 'url'))
+ server_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'url_port')
+ logger.info('Connecting to Server at: ' + server_url)
+
+ # Wait until server is reachable
+ netutil = NetUtil()
+ netutil.try_to_connect(server_url, -1, logger)
#Initiate security
""" Check if security is enable if not then disable it"""
@@ -142,8 +150,9 @@ def main():
certMan.initSecurity()
# Launch Controller communication
- controller = Controller(config)
+ controller = Controller(config)
controller.start()
+ # TODO: is run() call necessary?
controller.run()
logger.info("finished")
Modified: incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/manifestGenerator.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/manifestGenerator.py?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/manifestGenerator.py (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/manifestGenerator.py Wed Nov 7 08:13:12 2012
@@ -1,236 +1,267 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-import json
-import os.path
-import logging
-
-logger = logging.getLogger()
-
-xml_configurations_keys= ["hdfs_site", "hdfs_site", "core_site",
- "mapred_queue_acls",
- "hadoop_policy", "mapred_site",
- "capacity_scheduler", "hbase_site",
- "hbase_policy", "hive_site", "oozie_site",
- "templeton_site"]
-
-#read static imports from file and write them to manifest
-def writeImports(outputFile, modulesdir, inputFileName='imports.txt'):
- inputFile = open(inputFileName, 'r')
- logger.info("Modules dir is " + modulesdir)
- for line in inputFile:
- modulename = line.rstrip()
- line = "import '" + modulesdir + os.sep + modulename + "'" + os.linesep
- outputFile.write(line)
-
- inputFile.close()
-
-def generateManifest(parsedJson, fileName, modulesdir):
-#reading json
- hostname = parsedJson['hostname']
- clusterHostInfo = parsedJson['clusterHostInfo']
- params = parsedJson['hostLevelParams']
- configurations = parsedJson['configurations']
- xmlConfigurationsKeys = xml_configurations_keys
- #hostAttributes = parsedJson['hostAttributes']
- roles = [{'role' : parsedJson['role'],
- 'cmd' : parsedJson['roleCommand'],
- 'roleParams' : parsedJson['roleParams']}]
- #writing manifest
- manifest = open(fileName, 'w')
-
- #writing imports from external static file
- writeImports(outputFile=manifest, modulesdir=modulesdir)
-
- #writing nodes
- writeNodes(manifest, clusterHostInfo)
-
- #writing params from map
- writeParams(manifest, params)
-
-
- xmlConfigurations = {}
- flatConfigurations = {}
-
- for configKey in configurations.iterkeys():
- if configKey in xmlConfigurationsKeys:
- xmlConfigurations[configKey] = configurations[configKey]
- else:
- flatConfigurations[configKey] = configurations[configKey]
-
- #writing config maps
- writeXmlConfigurations(manifest, xmlConfigurations)
- writeFlatConfigurations(manifest, flatConfigurations)
-
- #writing host attributes
- #writeHostAttributes(manifest, hostAttributes)
-
- #writing task definitions
- writeTasks(manifest, roles)
-
- manifest.close()
-
-
- #read dictionary
-def readDict(file, separator='='):
- result = dict()
-
- for line in file :
- dictTuple = line.partition(separator)
- result[dictTuple[0].strip()] = dictTuple[2].strip()
-
- return result
-
-
- #write nodes
-def writeNodes(outputFile, clusterHostInfo):
- for node in clusterHostInfo.iterkeys():
- outputFile.write('$' + node + '= [')
- coma = ''
-
- for value in clusterHostInfo[node]:
- outputFile.write(coma + '\'' + value + '\'')
- coma = ', '
-
- outputFile.write(']\n')
-
-#write params
-def writeParams(outputFile, params):
-
- for paramName in params.iterkeys():
-
- param = params[paramName]
- if type(param) is dict:
-
- outputFile.write('$' + paramName + '= {\n')
-
- coma = ''
-
- for subParam in param.iterkeys():
- outputFile.write(coma + '"' + subParam + '" => "' + param[subParam] + '"')
- coma = ',\n'
-
- outputFile.write('\n}\n')
- else:
- outputFile.write('$' + paramName + '="' + param + '"\n')
-
-
-#write host attributes
-def writeHostAttributes(outputFile, hostAttributes):
- outputFile.write('$hostAttributes={\n')
-
- coma = ''
- for attribute in hostAttributes.iterkeys():
- outputFile.write(coma + '"' + attribute + '" => "{' + hostAttributes[attribute] + '"}')
- coma = ',\n'
-
- outputFile.write('}\n')
-
-#write flat configurations
-def writeFlatConfigurations(outputFile, flatConfigs):
- for flatConfigName in flatConfigs.iterkeys():
- for flatConfig in flatConfigs[flatConfigName].iterkeys():
- outputFile.write('$' + flatConfig + ' = "' + flatConfigs[flatConfigName][flatConfig] + '"' + os.linesep)
-
-#write xml configurations
-def writeXmlConfigurations(outputFile, xmlConfigs):
- outputFile.write('$configuration = {\n')
-
- for configName in xmlConfigs.iterkeys():
-
- config = xmlConfigs[configName]
-
- outputFile.write(configName + '=> {\n')
- coma = ''
- for configParam in config.iterkeys():
- outputFile.write(coma + '"' + configParam + '" => "' + config[configParam] + '"')
- coma = ',\n'
-
- outputFile.write('\n},\n')
-
- outputFile.write('\n}\n')
-
-#write node tasks
-def writeTasks(outputFile, roles):
- #reading dictionaries
- rolesToClassFile = open('rolesToClass.dict', 'r')
- rolesToClass = readDict(rolesToClassFile)
- rolesToClassFile.close()
-
- serviceStatesFile = open('serviceStates.dict', 'r')
- serviceStates = readDict(serviceStatesFile)
- serviceStatesFile.close()
-
- outputFile.write('node /default/ {\n ')
- writeStages(outputFile, len(roles))
- stageNum = 1
-
- for role in roles :
- rolename = role['role']
- command = role['cmd']
- taskParams = role['roleParams']
- taskParamsNormalized = normalizeTaskParams(taskParams)
- taskParamsPostfix = ''
-
- if len(taskParamsNormalized) > 0 :
- taskParamsPostfix = ', ' + taskParamsNormalized
-
- className = rolesToClass[rolename]
- serviceState = serviceStates[command]
-
- outputFile.write('class {\'' + className + '\':' + ' stage => ' + str(stageNum) +
- ', service_state => ' + serviceState + taskParamsPostfix + '}\n')
- stageNum = stageNum + 1
- outputFile.write('}\n')
-def normalizeTaskParams(taskParams):
- result = ''
- coma = ''
-
- for paramName in taskParams.iterkeys():
- result = coma + result + paramName + ' => ' + taskParams[paramName]
- coma = ','
-
- return result
-
-def writeStages(outputFile, numStages):
- arrow = ''
-
- for i in range(numStages):
- outputFile.write(arrow + 'stage{' + str(i + 1) + ' :}')
- arrow = ' -> '
-
- outputFile.write('\n')
-
-
-def main():
- logging.basicConfig(level=logging.DEBUG)
- #test code
- jsonFile = open('test.json', 'r')
- jsonStr = jsonFile.read()
- modulesdir = os.path.abspath(os.getcwd() + ".." + os.sep + ".." +
- os.sep + ".." + os.sep + "puppet" +
- os.sep + "modules" + os.sep)
- inputJsonStr = jsonStr
- parsedJson = json.loads(inputJsonStr)
- generateManifest(parsedJson, 'site.pp', modulesdir)
-
-if __name__ == '__main__':
- main()
-
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import json
+import os.path
+import logging
+from uuid import getnode as get_mac
+
+logger = logging.getLogger()
+
+xml_configurations_keys= ["hdfs-site", "core-site",
+ "mapred-queue-acls",
+ "hadoop-policy", "mapred-site",
+ "capacity-scheduler", "hbase-site",
+ "hbase-policy", "hive-site", "oozie-site",
+ "templeton-site"]
+
+#read static imports from file and write them to manifest
+def writeImports(outputFile, modulesdir, inputFileName='imports.txt'):
+ inputFile = open(inputFileName, 'r')
+ logger.info("Modules dir is " + modulesdir)
+ for line in inputFile:
+ modulename = line.rstrip()
+ line = "import '" + modulesdir + os.sep + modulename + "'" + os.linesep
+ outputFile.write(line)
+
+ inputFile.close()
+
+def generateManifest(parsedJson, fileName, modulesdir):
+ logger.info("JSON Received:")
+ logger.info(json.dumps(parsedJson, sort_keys=True, indent=4))
+#reading json
+ hostname = parsedJson['hostname']
+ clusterHostInfo = {}
+ if 'clusterHostInfo' in parsedJson:
+ if parsedJson['clusterHostInfo']:
+ clusterHostInfo = parsedJson['clusterHostInfo']
+ params = {}
+ if 'hostLevelParams' in parsedJson:
+ if parsedJson['hostLevelParams']:
+ params = parsedJson['hostLevelParams']
+ configurations = {}
+ if 'configurations' in parsedJson:
+ if parsedJson['configurations']:
+ configurations = parsedJson['configurations']
+ xmlConfigurationsKeys = xml_configurations_keys
+ #hostAttributes = parsedJson['hostAttributes']
+ roleParams = {}
+ if 'roleParams' in parsedJson:
+ if parsedJson['roleParams']:
+ roleParams = parsedJson['roleParams']
+ roles = [{'role' : parsedJson['role'],
+ 'cmd' : parsedJson['roleCommand'],
+ 'roleParams' : roleParams}]
+ #writing manifest
+ manifest = open(fileName, 'w')
+
+ #writing imports from external static file
+ writeImports(outputFile=manifest, modulesdir=modulesdir)
+
+ #writing nodes
+ writeNodes(manifest, clusterHostInfo)
+
+ #writing params from map
+ writeParams(manifest, params)
+
+
+ xmlConfigurations = {}
+ flatConfigurations = {}
+
+ if configurations:
+ for configKey in configurations.iterkeys():
+ if configKey in xmlConfigurationsKeys:
+ xmlConfigurations[configKey] = configurations[configKey]
+ else:
+ flatConfigurations[configKey] = configurations[configKey]
+
+ #writing config maps
+ if (xmlConfigurations):
+ writeXmlConfigurations(manifest, xmlConfigurations)
+ if (flatConfigurations):
+ writeFlatConfigurations(manifest, flatConfigurations)
+
+ #writing host attributes
+ #writeHostAttributes(manifest, hostAttributes)
+
+ #writing task definitions
+ writeTasks(manifest, roles)
+
+ manifest.close()
+
+
+ #read dictionary
+def readDict(file, separator='='):
+ result = dict()
+
+ for line in file :
+ dictTuple = line.partition(separator)
+ result[dictTuple[0].strip()] = dictTuple[2].strip()
+
+ return result
+
+
+ #write nodes
+def writeNodes(outputFile, clusterHostInfo):
+ for node in clusterHostInfo.iterkeys():
+ outputFile.write('$' + node + '= [')
+ coma = ''
+
+ for value in clusterHostInfo[node]:
+ outputFile.write(coma + '\'' + value + '\'')
+ coma = ', '
+
+ outputFile.write(']\n')
+
+#write params
+def writeParams(outputFile, params):
+
+ for paramName in params.iterkeys():
+ # todo handle repo information properly
+ if paramName == 'repo_info':
+ continue
+
+ param = params[paramName]
+ if type(param) is dict:
+
+ outputFile.write('$' + paramName + '= {\n')
+
+ coma = ''
+
+ for subParam in param.iterkeys():
+ outputFile.write(coma + '"' + subParam + '" => "' + param[subParam] + '"')
+ coma = ',\n'
+
+ outputFile.write('\n}\n')
+ else:
+ outputFile.write('$' + paramName + '="' + param + '"\n')
+
+
+#write host attributes
+def writeHostAttributes(outputFile, hostAttributes):
+ outputFile.write('$hostAttributes={\n')
+
+ coma = ''
+ for attribute in hostAttributes.iterkeys():
+ outputFile.write(coma + '"' + attribute + '" => "{' + hostAttributes[attribute] + '"}')
+ coma = ',\n'
+
+ outputFile.write('}\n')
+
+#write flat configurations
+def writeFlatConfigurations(outputFile, flatConfigs):
+ for flatConfigName in flatConfigs.iterkeys():
+ for flatConfig in flatConfigs[flatConfigName].iterkeys():
+ outputFile.write('$' + flatConfig + ' = "' + flatConfigs[flatConfigName][flatConfig] + '"' + os.linesep)
+
+#write xml configurations
+def writeXmlConfigurations(outputFile, xmlConfigs):
+ outputFile.write('$configuration = {\n')
+
+ for configName in xmlConfigs.iterkeys():
+
+ config = xmlConfigs[configName]
+
+ outputFile.write(configName + '=> {\n')
+ coma = ''
+ for configParam in config.iterkeys():
+ outputFile.write(coma + '"' + configParam + '" => "' + config[configParam] + '"')
+ coma = ',\n'
+
+ outputFile.write('\n},\n')
+
+ outputFile.write('\n}\n')
+
+#write node tasks
+def writeTasks(outputFile, roles):
+ #reading dictionaries
+ rolesToClassFile = open('rolesToClass.dict', 'r')
+ rolesToClass = readDict(rolesToClassFile)
+ rolesToClassFile.close()
+
+ serviceStatesFile = open('serviceStates.dict', 'r')
+ serviceStates = readDict(serviceStatesFile)
+ serviceStatesFile.close()
+
+ outputFile.write('node /default/ {\n ')
+ writeStages(outputFile, len(roles) + 1)
+ stageNum = 1
+ outputFile.write('class {\'hdp\': stage => ' + str(stageNum) + '}\n')
+ stageNum = stageNum + 1
+
+ for role in roles :
+ rolename = role['role']
+ command = role['cmd']
+ taskParams = role['roleParams']
+ if (rolename == 'ZOOKEEPER_SERVER'):
+ taskParams['myid'] = str(get_mac())
+ taskParamsNormalized = normalizeTaskParams(taskParams)
+ taskParamsPostfix = ''
+
+ if len(taskParamsNormalized) > 0 :
+ taskParamsPostfix = ', ' + taskParamsNormalized
+
+ className = rolesToClass[rolename]
+
+ if command in serviceStates:
+ serviceState = serviceStates[command]
+ outputFile.write('class {\'' + className + '\':' + ' stage => ' + str(stageNum) +
+ ', service_state => ' + serviceState + taskParamsPostfix + '}\n')
+ else:
+ outputFile.write('class {\'' + className + '\':' + ' stage => ' + str(stageNum) +
+ taskParamsPostfix + '}\n')
+
+ stageNum = stageNum + 1
+ outputFile.write('}\n')
+def normalizeTaskParams(taskParams):
+ result = ''
+ coma = ''
+
+ for paramName in taskParams.iterkeys():
+ result = coma + result + paramName + ' => ' + taskParams[paramName]
+ coma = ','
+
+ return result
+
+def writeStages(outputFile, numStages):
+ arrow = ''
+
+ for i in range(numStages):
+ outputFile.write(arrow + 'stage{' + str(i + 1) + ' :}')
+ arrow = ' -> '
+
+ outputFile.write('\n')
+
+
+def main():
+ logging.basicConfig(level=logging.DEBUG)
+ #test code
+ jsonFile = open('test.json', 'r')
+ jsonStr = jsonFile.read()
+ modulesdir = os.path.abspath(os.getcwd() + ".." + os.sep + ".." +
+ os.sep + ".." + os.sep + "puppet" +
+ os.sep + "modules" + os.sep)
+ inputJsonStr = jsonStr
+ parsedJson = json.loads(inputJsonStr)
+ generateManifest(parsedJson, 'site.pp', modulesdir)
+
+if __name__ == '__main__':
+ main()
+
Modified: incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/puppetExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/puppetExecutor.py?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/puppetExecutor.py (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/puppetExecutor.py Wed Nov 7 08:13:12 2012
@@ -82,15 +82,19 @@ class puppetExecutor:
env=puppetEnv)
stderr_out = puppet.communicate()
error = "none"
- if puppet.returncode != 0:
+ returncode = 0
+ if (puppet.returncode != 0 and puppet.returncode != 2) :
+ returncode = puppet.returncode
error = stderr_out[1]
- result["stderr"] = error
- logging.error("Error running puppet: " + stderr_out[1])
+ logging.error("Error running puppet: \n" + stderr_out[1])
pass
+ result["stderr"] = error
puppetOutput = stderr_out[0]
- result["exitcode"] = puppet.returncode
- result["stdout"] = puppetOutput
- logger.info("ExitCode : \n" + str(result["exitcode"]))
+ logger.info("Output from puppet :\n" + puppetOutput)
+ result["exitcode"] = returncode
+
+ result["stdout"] = "Output"
+ logger.info("ExitCode : " + str(result["exitcode"]))
return result
def main():
Modified: incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/rolesToClass.dict
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/rolesToClass.dict?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/rolesToClass.dict (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/rolesToClass.dict Wed Nov 7 08:13:12 2012
@@ -1,6 +1,6 @@
NAMENODE = hdp-hadoop::namenode
DATANODE = hdp-hadoop::datanode
-SNAMENODE = hdp-hadoop::snamenode
+SECONDARY_NAMENODE = hdp-hadoop::snamenode
JOBTRACKER = hdp-hadoop::jobtracker
TASKTRACKER = hdp-hadoop::tasktracker
HDFS_CLIENT = hdp-hadoop::client
@@ -10,8 +10,8 @@ ZOOKEEPER_CLIENT = hdp-zookeeper::client
HBASE_MASTER = hdp-hbase::master
HBASE_REGIONSERVER = hdp-hbase::regionserver
HBASE_CLIENT = hdp-hbase::client
-PIG_CLIENT = hdp-pig
-SQOOP_CLIENT = hdp-sqoop
+PIG = hdp-pig
+SQOOP = hdp-sqoop
OOZIE_SERVER = hdp-oozie::server
OOZIE_CLIENT = hdp-oozie::client
HIVE_CLIENT = hdp-hive::client
@@ -23,6 +23,18 @@ TEMPLETON_SERVER = hdp-templeton::server
TEMPLETON_CLIENT = hdp-templeton::client
DASHBOARD = hdp-dashboard
NAGIOS_SERVER = hdp-nagios::server
-GANGLIA_MONITOR_SERVER = hdp-ganglia::server
+GANGLIA_SERVER = hdp-ganglia::server
GANGLIA_MONITOR = hdp-ganglia::monitor
-HTTPD = hdp-monitor-webserver
\ No newline at end of file
+HTTPD = hdp-monitor-webserver
+HDFS_SERVICE_CHECK = hdp-hadoop::hdfs::service_check
+MAPREDUCE_SERVICE_CHECK = hdp-hadoop::mapred::service_check
+ZOOKEEPER_SERVICE_CHECK = hdp-zookeeper::zookeeper::service_check
+ZOOKEEPER_QUORUM_SERVICE_CHECK = hdp-zookeeper::quorum::service_check
+HBASE_SERVICE_CHECK = hdp-hbase::hbase::service_check
+HIVE_SERVICE_CHECK = hdp-hive::hive::service_check
+HCAT_SERVICE_CHECK = hdp-hcat::hcat::service_check
+OOZIE_SERVICE_CHECK = hdp-oozie::oozie::service_check
+PIG_SERVICE_CHECK = hdp-pig::pig::service_check
+SQOOP_SERVICE_CHECK = hdp-sqoop::sqoop::service_check
+TEMPLETON_SERVICE_CHECK = hdp-templeton::templeton::service_check
+DASHBOARD_SERVICE_CHECK = hdp-dashboard::dashboard::service_check
Modified: incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/security.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/security.py?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/security.py (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/security.py Wed Nov 7 08:13:12 2012
@@ -55,6 +55,7 @@ class CertificateManager():
self.config = config
self.keysdir = self.config.get('security', 'keysdir')
self.server_crt=self.config.get('security', 'server_crt')
+ self.server_url = 'https://' + self.config.get('server', 'hostname') + ':' + self.config.get('server', 'url_port')
def getAgentKeyName(self):
keysdir = self.config.get('security', 'keysdir')
@@ -98,7 +99,7 @@ class CertificateManager():
logger.info("Agent certificate exists, ok")
def loadSrvrCrt(self):
- get_ca_url = self.config.get('server', 'url') + '/cert/ca/'
+ get_ca_url = self.server_url + '/cert/ca/'
stream = urllib2.urlopen(get_ca_url)
response = stream.read()
stream.close()
@@ -106,7 +107,7 @@ class CertificateManager():
srvr_crt_f.write(response)
def reqSignCrt(self):
- sign_crt_req_url = self.config.get('server', 'url') + '/certs/' + socket.gethostname()
+ sign_crt_req_url = self.server_url + '/certs/' + socket.gethostname()
agent_crt_req_f = open(self.getAgentCrtReqName())
agent_crt_req_content = agent_crt_req_f.read()
passphrase_env_var = self.config.get('security', 'passphrase_env_var_name')
Modified: incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/serviceStates.dict
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/serviceStates.dict?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/serviceStates.dict (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/serviceStates.dict Wed Nov 7 08:13:12 2012
@@ -1,2 +1,3 @@
START = running
-INSTALL = installed_and_configured
\ No newline at end of file
+INSTALL = installed_and_configured
+STOP = stopped
Modified: incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/test.json
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/test.json?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/test.json (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/main/python/ambari_agent/test.json Wed Nov 7 08:13:12 2012
@@ -19,26 +19,26 @@
"configurations" : {
-"hdfs_site" : { "dfs.block.size" : "256000000", "dfs.replication" : "1" } ,
-"core_site" : { "fs.default.name" : "hrt8n36.cc1.ygridcore.net" } ,
-"mapred_queue_acls" : {"mapred.queue.default.acl-submit-job" : "*",
+"hdfs-site" : { "dfs.block.size" : "256000000", "dfs.replication" : "1" } ,
+"core-site" : { "fs.default.name" : "hrt8n36.cc1.ygridcore.net" } ,
+"mapred-queue-acls" : {"mapred.queue.default.acl-submit-job" : "*",
"mapred.queue.default.acl-administer-jobs" : "*"},
-"hadoop_policy" : {"security.client.protocol.acl" : "*",
+"hadoop-policy" : {"security.client.protocol.acl" : "*",
"security.client.datanode.protocol.acl" : "*"},
-"mapred_site" : {"mapred.jobtracker.taskScheduler" : "org.apache.hadoop.mapred.CapacityTaskScheduler",
+"mapred-site" : {"mapred.jobtracker.taskScheduler" : "org.apache.hadoop.mapred.CapacityTaskScheduler",
"mapred.queue.names" : "hive,pig,default"},
-"capacity_scheduler" : {"mapred.capacity-scheduler.queue.default.capacity" : "100",
+"capacity-scheduler" : {"mapred.capacity-scheduler.queue.default.capacity" : "100",
"mapred.capacity-scheduler.queue.default.supports-priorit" : "false"},
"health_check" : {"security_enabled" : "true",
"task_bin_exe" : "ls"},
"hadoop_env" : {"hadoop_piddirprefix" : "/tmp"},
-"hbase_site" : {"hbase.cluster.distributed" : "true"},
-"hbase_policy" : {"security.client.protocol.acl" : "*"},
+"hbase-site" : {"hbase.cluster.distributed" : "true"},
+"hbase-policy" : {"security.client.protocol.acl" : "*"},
"hadoop_metrics" : {"ganglia_server_host" : "localhost"},
-"hive_site" : {"hive.exec.scratchdir" : "/tmp"},
-"oozie_site" : {"oozie.service.ActionService.executor.ext.classes" : "org.apache.oozie.action.hadoop.HiveActionExecutor, org.apache.oozie.action.hadoop.SqoopActionExecutor,org.apache.oozie.action.email.EmailActionExecutor,"},
-"templeton_site" : {"templeton.override.enabled" : "true"}
+"hive-site" : {"hive.exec.scratchdir" : "/tmp"},
+"oozie-site" : {"oozie.service.ActionService.executor.ext.classes" : "org.apache.oozie.action.hadoop.HiveActionExecutor, org.apache.oozie.action.hadoop.SqoopActionExecutor,org.apache.oozie.action.email.EmailActionExecutor,"},
+"templeton-site" : {"templeton.override.enabled" : "true"}
},
"role": "NAMENODE",
Modified: incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestCertGeneration.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestCertGeneration.py?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestCertGeneration.py (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestCertGeneration.py Wed Nov 7 08:13:12 2012
@@ -30,6 +30,9 @@ class TestCertGeneration(TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
config = ConfigParser.RawConfigParser()
+ config.add_section('server')
+ config.set('server', 'hostname', 'example.com')
+ config.set('server', 'url_port', '777')
config.add_section('security')
config.set('security', 'keysdir', self.tmpdir)
config.set('security', 'server_crt', 'ca.crt')
Added: incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestConnectionRetries.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestConnectionRetries.py?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestConnectionRetries.py (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestConnectionRetries.py Wed Nov 7 08:13:12 2012
@@ -0,0 +1,115 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+from ambari_agent.ServerStatus import ServerStatus
+from ambari_agent.NetUtil import NetUtil
+import ambari_agent.main
+from threading import Thread
+import time
+from ambari_agent.Heartbeat import Heartbeat
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent import AmbariConfig
+import socket
+import os
+import logging
+from ambari_agent.Controller import Controller
+import logging
+
+NON_EXISTING_DOMAIN = 'non-existing-domain43342432.com'
+BAD_URL = 'http://www.iana.org/domains/ex222ample/'
+
+class TestConnectionRetries(TestCase):
+
+ logger = logging.getLogger()
+
+ def setUp(self):
+ self.logger.disabled = True
+
+
+ def test_url_checks(self):
+ netutil = NetUtil()
+ self.assertEquals(netutil.checkURL('http://www.iana.org/domains/example/'), True, "Good url - HTTP code 200")
+ self.assertEquals(netutil.checkURL('https://www.iana.org/domains/example/'), True, "Good HTTPS url - HTTP code 200")
+ self.assertEquals(netutil.checkURL('http://' + NON_EXISTING_DOMAIN), False, "Not existing domain")
+ self.assertEquals(netutil.checkURL(BAD_URL), False, "Bad url")
+ self.assertEquals(netutil.checkURL('http://192.168.253.177'), False, "Not reachable IP")
+
+ def test_registration_retries(self):
+ netutil = NetUtil()
+ netutil.CONNECT_SERVER_RETRY_INTERVAL_SEC=0.1
+ retries = netutil.try_to_connect(BAD_URL, 3)
+ self.assertEquals(retries, 3)
+
+ def test_infinit_registration_retries(self):
+ netutil = NetUtil()
+ netutil.CONNECT_SERVER_RETRY_INTERVAL_SEC=0.1
+ thread = Thread(target = netutil.try_to_connect, args = (BAD_URL, -1))
+ thread.start()
+ time.sleep(0.5)
+ # I have to stop the thread anyway, so I'll check results later
+ threadWasAlive = thread.isAlive()
+ netutil.DEBUG_STOP_RETRIES_FLAG = True
+ time.sleep(1)
+ # Checking results before thread stop
+ self.assertEquals(threadWasAlive, True, "Thread should still be retrying to connect")
+ # Checking results after thread stop
+ self.assertEquals(thread.isAlive(), False, "Thread should stop now")
+
+ def test_heartbeat_retries(self):
+ netutil = NetUtil()
+ netutil.HEARTBEAT_IDDLE_INTERVAL_SEC=0.1
+ netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC=0.1
+ #building heartbeat object
+ testsPath = os.path.dirname(os.path.realpath(__file__))
+ dictPath = testsPath + os.sep + '..' + os.sep + '..' + os.sep + 'main' + os.sep + 'python' + os.sep + 'ambari_agent' + os.sep + 'servicesToPidNames.dict'
+ AmbariConfig.config.set('services','serviceToPidMapFile', dictPath)
+ actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
+ heartbeat = Heartbeat(actionQueue)
+ # testing controller with our heartbeat and wrong url
+ controller = Controller(AmbariConfig.config)
+ controller.heartbeat = heartbeat
+ controller.heartbeatUrl = BAD_URL
+ controller.actionQueue = actionQueue
+ controller.logger = self.logger
+ controller.netutil = netutil
+ thread = Thread(target = controller.heartbeatWithServer)
+ thread.start()
+ time.sleep(1)
+
+ # I have to stop the thread anyway, so I'll check results later
+ threadWasAlive = thread.isAlive()
+ successfull_heartbits0 = controller.DEBUG_SUCCESSFULL_HEARTBEATS
+ heartbeat_retries0 = controller.DEBUG_HEARTBEAT_RETRIES
+ # Stopping thread
+ controller.DEBUG_STOP_HEARTBITTING = True
+ time.sleep(1)
+ # Checking results before thread stop
+ self.assertEquals(threadWasAlive, True, "Heartbeat should be alive now")
+ self.assertEquals(successfull_heartbits0, 0, "Heartbeat should not have any success")
+ self.assertGreater(heartbeat_retries0, 1, "Heartbeat should retry connecting")
+ # Checking results after thread stop
+ self.assertEquals(thread.isAlive(), False, "Heartbeat should stop now")
+ self.assertEquals(controller.DEBUG_SUCCESSFULL_HEARTBEATS, 0, "Heartbeat should not have any success")
+
+ def tearDown(self):
+ self.logger.disabled = False
+
+
Modified: incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestHeartbeat.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestHeartbeat.py?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestHeartbeat.py (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestHeartbeat.py Wed Nov 7 08:13:12 2012
@@ -21,12 +21,16 @@ limitations under the License.
from unittest import TestCase
from ambari_agent.Heartbeat import Heartbeat
from ambari_agent.ActionQueue import ActionQueue
-from ambari_agent.AmbariConfig import AmbariConfig
+from ambari_agent import AmbariConfig
import socket
+import os
class TestHeartbeat(TestCase):
def test_build(self):
- actionQueue = ActionQueue(AmbariConfig().getConfig())
+ testsPath = os.path.dirname(os.path.realpath(__file__))
+ dictPath = testsPath + os.sep + '..' + os.sep + '..' + os.sep + 'main' + os.sep + 'python' + os.sep + 'ambari_agent' + os.sep + 'servicesToPidNames.dict'
+ AmbariConfig.config.set('services','serviceToPidMapFile', dictPath)
+ actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
heartbeat = Heartbeat(actionQueue)
result = heartbeat.build(100)
-
\ No newline at end of file
+
Added: incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestStatusCheck.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestStatusCheck.py?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestStatusCheck.py (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-agent/src/test/python/TestStatusCheck.py Wed Nov 7 08:13:12 2012
@@ -0,0 +1,95 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import tempfile
+import shutil
+import os
+from unittest import TestCase
+from ambari_agent.StatusCheck import StatusCheck
+import subprocess
+import signal
+from shell import shellRunner
+
+
+MAPPING_FILE_NAME='map.dict'
+
+COMPONENT_LIVE = 'LIVE_COMPONENT'
+COMPONENT_LIVE_PID = 'live_comp.pid'
+COMPONENT_LIVE_CMD='''
+while [ 1==1 ]
+do
+ echo ok
+done
+'''
+
+COMPONENT_DEAD = 'DEAD_COMPONENT'
+COMPONENT_DEAD_PID = 'dead_comp.pid'
+DEAD_PID=0
+
+
+class TestStatusCheck(TestCase):
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+ self.tmpdict = tempfile.NamedTemporaryFile(dir=self.tmpdir)
+ self.tmpdict = open(self.tmpdir + os.sep + MAPPING_FILE_NAME, 'w')
+
+ self.sh = shellRunner()
+
+ #Launch eternal process
+ p = subprocess.Popen([COMPONENT_LIVE_CMD], stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, shell=True, close_fds=True)
+
+ #Write mapping for pid files for both live and dead process
+ self.tmpdict.write(COMPONENT_LIVE + '=' + COMPONENT_LIVE_PID + os.linesep)
+ self.tmpdict.write(COMPONENT_DEAD + '=' + COMPONENT_DEAD_PID + os.linesep)
+ self.tmpdict.close()
+
+ #Write pid of live process to file
+ live_pid_file = open(self.tmpdir + os.sep + COMPONENT_LIVE_PID, 'w')
+ self.live_pid = p.pid
+ live_pid_file.write(str(self.live_pid))
+ live_pid_file.close()
+
+ #Write pid of dead process to file
+ dead_pid_file = open(self.tmpdir + os.sep + COMPONENT_DEAD_PID, 'w')
+ dead_pid_file.write(str(DEAD_PID))
+ dead_pid_file.close()
+
+ #Init status checker
+ self.statusCheck = StatusCheck(self.tmpdir, self.tmpdict.name)
+
+ # Ensure that status checker throws exceptions on invalid params
+ def test_exceptions(self):
+ self.assertRaises(ValueError,StatusCheck,"tmp","tmp")
+ self.assertRaises(IOError, StatusCheck,self.tmpdir,"tmp")
+
+ # Ensure that status checker return True for running process
+ def test_live(self):
+ status = self.statusCheck.getStatus(COMPONENT_LIVE)
+ self.assertEqual(status, True)
+
+ # Ensure that status checker return False for dead process
+ def test_dead(self):
+ status = self.statusCheck.getStatus(COMPONENT_DEAD)
+ self.assertEqual(status, False)
+
+ def tearDown(self):
+ os.kill(self.live_pid, signal.SIGKILL)
+ shutil.rmtree(self.tmpdir)
Added: incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/ambari-server
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/ambari-server?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/ambari-server (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/ambari-server Wed Nov 7 08:13:12 2012
@@ -0,0 +1,37 @@
+# description: ambari-server daemon
+# processname: ambari-server
+
+# /etc/init.d/ambari-server
+
+export PATH=/usr/lib/ambari-server/*:$PATH
+export AMBARI_CONF_DIR=/etc/ambari-server/conf:$PATH
+
+case "$1" in
+ start)
+ echo -e "Starting ambari-server"
+ export AMBARI_PASSPHRASE=pass_phrase
+ python /usr/sbin/ambari-server.py start
+ ;;
+ stop)
+ echo -e "Stopping ambari-server"
+ python /usr/sbin/ambari-server.py stop
+ ;;
+ restart)
+ echo -e "Restarting ambari-server"
+ $0 stop
+ $0 start
+ ;;
+ setup)
+ echo -e "Run postgresql initdb"
+ service postgresql initdb
+ echo -e "Run postgresql start"
+ service postgresql start
+ echo -e "Setup ambari-server"
+ python /usr/sbin/ambari-server.py setup -d postgres -f /var/lib/ambari-server/resources/Ambari-DDL.sql
+ ;;
+ *)
+ echo "Usage: /usr/sbin/ambari-server {start|stop|restart|setup}"
+ exit 1
+esac
+
+exit 0
Added: incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/ambari.properties
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/ambari.properties?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/ambari.properties (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/ambari.properties Wed Nov 7 08:13:12 2012
@@ -0,0 +1,3 @@
+security.server.keys_dir = /var/lib/ambari-server/keys
+resources.dir = /var/lib/ambari-server/resources
+jdk.url=http://public-repo-1.hortonworks.com/ARTIFACTS/jdk-6u31-linux-x64.bin
Added: incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/log4j.properties?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/log4j.properties (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/conf/unix/log4j.properties Wed Nov 7 08:13:12 2012
@@ -0,0 +1,36 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=DEBUG, stdout, file
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/var/log/ambari/ambari-server.log
+log4j.appender.file.MaxFileSize=1MB
+log4j.appender.file.MaxBackupIndex=1
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
+
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/pom.xml?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/pom.xml (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/pom.xml Wed Nov 7 08:13:12 2012
@@ -52,10 +52,143 @@
</includes>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>rpm-maven-plugin</artifactId>
+ <version>2.0.1</version>
+ <executions>
+ <execution>
+ <!-- unbinds rpm creation from maven lifecycle -->
+ <phase>none</phase>
+ <goals>
+ <goal>rpm</goal>
+ </goals>
+ </execution>
+
+ </executions>
+ <configuration>
+ <!-- places rpm to specified folder -->
+ <!--
+ <workarea>
+ rpm-target
+ </workarea>
+ -->
+ <copyright>2012, Apache Software Foundation</copyright>
+ <group>Development</group>
+ <description>Maven Recipe: RPM Package.</description>
+ <mappings>
+
+ <mapping>
+ <directory>/usr/lib/ambari-server</directory>
+ <dependency>
+ </dependency>
+ </mapping>
+
+ <mapping>
+ <directory>/usr/lib/ambari-server</directory>
+ <sources>
+ <source>
+ <location>${project.build.directory}/${project.artifactId}-${project.version}.jar</location>
+ </source>
+ </sources>
+ </mapping>
+
+
+ <mapping>
+ <directory>/usr/sbin</directory>
+ <sources>
+ <source>
+ <location>src/main/python/ambari-server.py</location>
+ </source>
+ </sources>
+ </mapping>
+
+ <mapping>
+ <directory>/usr/sbin</directory>
+ <filemode>744</filemode>
+ <sources>
+ <source>
+ <location>conf/unix/ambari-server</location>
+ </source>
+ </sources>
+ </mapping>
+
+ <mapping>
+ <directory>/etc/ambari-server/conf</directory>
+ <configuration>true</configuration>
+ <sources>
+ <source>
+ <location>conf/unix/ambari.properties</location>
+ </source>
+ <source>
+ <location>conf/unix/log4j.properties</location>
+ </source>
+ </sources>
+ </mapping>
+
+ <mapping>
+ <directory>/var/lib/ambari-server/keys</directory>
+ <sources>
+ <source>
+ <location>src/main/resources/ca.config</location>
+ </source>
+ </sources>
+ </mapping>
+
+ <mapping>
+ <directory>/var/lib/ambari-server/keys/db</directory>
+ <sources>
+ <source>
+ <location>src/main/resources/db</location>
+ </source>
+ </sources>
+ </mapping>
+
+ <mapping>
+ <directory>/var/log/ambari</directory>
+ </mapping>
+
+ <mapping>
+ <directory>/var/lib/ambari-server/resources</directory>
+ <sources>
+ <source>
+ <location>src/main/resources/Ambari-DDL.sql</location>
+ </source>
+ </sources>
+ </mapping>
+
+ <mapping>
+ <directory>/var/run/ambari-server</directory>
+ </mapping>
+
+ </mappings>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.5.2</version>
+ <configuration>
+ <failOnError>false</failOnError>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
<profiles>
</profiles>
+
+
<dependencies>
<dependency>
<groupId>commons-io</groupId>
@@ -78,10 +211,6 @@
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
</dependency>
@@ -260,5 +389,25 @@
<artifactId>guice</artifactId>
<version>3.0</version>
</dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>8.3-603.jdbc4</version>
+ </dependency>
</dependencies>
+
+ <!--<reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.5.2</version>
+ </plugin>
+ </plugins>
+ </reporting>-->
</project>
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/assemblies/server.xml
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/assemblies/server.xml?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/assemblies/server.xml (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/assemblies/server.xml Wed Nov 7 08:13:12 2012
@@ -75,11 +75,11 @@
<outputDirectory>/ambari-server-${project.version}/etc/ambari-server/conf</outputDirectory>
</fileSet>
<fileSet>
- <directory>src/main/assemblies</directory>
- <outputDirectory>/ambari-server-${project.version}/res</outputDirectory>
- <excludes>
- <exclude>*</exclude>
- </excludes>
+ <directory>src/main/resources</directory>
+ <outputDirectory>/ambari-server-${project.version}/var/lib/ambari-server/resources/</outputDirectory>
+ <includes>
+ <include>stacks/**</include>
+ </includes>
</fileSet>
</fileSets>
<dependencySets>
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/HostNotFoundException.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/HostNotFoundException.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/HostNotFoundException.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/HostNotFoundException.java Wed Nov 7 08:13:12 2012
@@ -21,7 +21,7 @@ package org.apache.ambari.server;
@SuppressWarnings("serial")
public class HostNotFoundException extends AmbariException {
- public HostNotFoundException(String hostName) {
- super("Host not found, hostName=" + hostName);
+ public HostNotFoundException(String hostname) {
+ super("Host not found, hostname=" + hostname);
}
}
Added: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/StackNotFoundException.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/StackNotFoundException.java?rev=1406489&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/StackNotFoundException.java (added)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/StackNotFoundException.java Wed Nov 7 08:13:12 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server;
+
+@SuppressWarnings("serial")
+public class StackNotFoundException extends AmbariException {
+
+ public StackNotFoundException (String stackName,
+ String stackVersion) {
+ super("Stack Information not found"
+ + ", stackName=" + stackName
+ + ", stackVersion=" + stackVersion);
+ }
+
+}
\ No newline at end of file
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java Wed Nov 7 08:13:12 2012
@@ -17,7 +17,9 @@
*/
package org.apache.ambari.server.actionmanager;
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.agent.CommandReport;
@@ -62,4 +64,13 @@ public interface ActionDBAccessor {
*/
public void hostRoleScheduled(Stage s, String hostname, String roleStr);
+ public List<HostRoleCommand> getRequestTasks(long requestId);
+
+ public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds);
+
+ public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses);
+
+ public List<Long> getRequests();
+
+ public HostRoleCommand getTask(long taskId);
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java Wed Nov 7 08:13:12 2012
@@ -21,22 +21,33 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.persist.Transactional;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.agent.CommandReport;
-
-import com.google.inject.Singleton;
-import org.apache.ambari.server.orm.dao.*;
-import org.apache.ambari.server.orm.entities.*;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.RoleSuccessCriteriaDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+import com.google.inject.persist.Transactional;
+
@Singleton
public class ActionDBAccessorImpl implements ActionDBAccessor {
private static final Logger LOG = LoggerFactory.getLogger(ActionDBAccessorImpl.class);
@@ -56,6 +67,8 @@ public class ActionDBAccessorImpl implem
@Inject
private StageFactory stageFactory;
@Inject
+ private HostRoleCommandFactory hostRoleCommandFactory;
+ @Inject
private Clusters clusters;
private final long requestId;
@@ -192,7 +205,11 @@ public class ActionDBAccessorImpl implem
@Transactional
public void updateHostRoleState(String hostname, long requestId,
long stageId, String role, CommandReport report) {
- List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(hostname, requestId, stageId, Role.valueOf(role));
+ LOG.info("Update HostRoleState: "
+ + "HostName " + hostname + " requestId " + requestId + " stageId "
+ + stageId + " role " + role + " report " + report);
+ List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(
+ hostname, requestId, stageId, Role.valueOf(role));
for (HostRoleCommandEntity command : commands) {
command.setStatus(HostRoleStatus.valueOf(report.getStatus()));
command.setStdOut(report.getStdOut());
@@ -233,4 +250,44 @@ public class ActionDBAccessorImpl implem
}
}
+
+ @Override
+ public List<HostRoleCommand> getRequestTasks(long requestId) {
+ List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
+ for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequest(requestId)) {
+ tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
+ }
+ return tasks;
+ }
+
+ @Override
+ public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds) {
+ List<HostRoleCommand> commands = new ArrayList<HostRoleCommand>();
+ for (HostRoleCommandEntity commandEntity : hostRoleCommandDAO.findByPKs(taskIds)) {
+ commands.add(hostRoleCommandFactory.createExisting(commandEntity));
+ }
+ return commands;
+ }
+
+ @Override
+ public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
+ List<Stage> stages = new ArrayList<Stage>();
+ for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) {
+ stages.add(stageFactory.createExisting(stageEntity));
+ }
+ return stages;
+ }
+
+ @Override
+ public List<Long> getRequests() {
+ return hostRoleCommandDAO.getRequests();
+ }
+
+ public HostRoleCommand getTask(long taskId) {
+ HostRoleCommandEntity commandEntity = hostRoleCommandDAO.findByPK((int)taskId);
+ if (commandEntity == null) {
+ return null;
+ }
+ return hostRoleCommandFactory.createExisting(commandEntity);
+ }
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java Wed Nov 7 08:13:12 2012
@@ -18,7 +18,10 @@
package org.apache.ambari.server.actionmanager;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.agent.CommandReport;
@@ -117,7 +120,7 @@ public class ActionDBInMemoryImpl implem
}
}
}
-
+
@Override
public void abortHostRole(String host, long requestId, long stageId, Role role) {
CommandReport report = new CommandReport();
@@ -132,9 +135,53 @@ public class ActionDBInMemoryImpl implem
public synchronized long getLastPersistedRequestIdWhenInitialized() {
return lastRequestId;
}
-
+
@Override
public void hostRoleScheduled(Stage s, String hostname, String roleStr) {
//Nothing needed for in-memory implementation
}
+
+ @Override
+ public List<HostRoleCommand> getRequestTasks(long requestId) {
+ return null;
+ }
+
+ @Override
+ public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds) {
+ return null;
+ }
+
+ @Override
+ public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
+ List<Stage> l = new ArrayList<Stage>();
+ for (Stage s: stageList) {
+ if (s.doesStageHaveHostRoleStatus(statuses)) {
+ l.add(s);
+ }
+ }
+ return l;
+ }
+ @Override
+ public synchronized List<Long> getRequests() {
+ Set<Long> requestIds = new HashSet<Long>();
+ for (Stage s: stageList) {
+ requestIds.add(s.getRequestId());
+ }
+ List<Long> ids = new ArrayList<Long>();
+ ids.addAll(requestIds);
+ return ids;
+ }
+
+ public HostRoleCommand getTask(long taskId) {
+ for (Stage s : stageList) {
+ for (String host : s.getHosts()) {
+ for (ExecutionCommand cmd : s.getExecutionCommands(host)) {
+ if (cmd.getTaskId() == taskId) {
+ return s.getHostRoleCommand(host, cmd.getRole().toString());
+ }
+ }
+ }
+ }
+ return null;
+ }
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java Wed Nov 7 08:13:12 2012
@@ -17,7 +17,9 @@
*/
package org.apache.ambari.server.actionmanager;
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ambari.server.agent.ActionQueue;
@@ -84,12 +86,34 @@ public class ActionManager {
}
//persist the action response into the db.
for (CommandReport report : reports) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing command report : " + report.toString());
+ }
String actionId = report.getActionId();
long [] requestStageIds = StageUtils.getRequestStage(actionId);
long requestId = requestStageIds[0];
long stageId = requestStageIds[1];
+ HostRoleCommand command = db.getTask(report.getTaskId());
+ if (command == null) {
+ LOG.warn("The task " + report.getTaskId()
+ + " is invalid");
+ continue;
+ }
+ if (!command.getStatus().equals(HostRoleStatus.IN_PROGRESS)
+ && !command.getStatus().equals(HostRoleStatus.QUEUED)) {
+ LOG.warn("The task " + command.getTaskId()
+ + " is not in progress, ignoring update");
+ continue;
+ }
db.updateHostRoleState(hostname, requestId, stageId, report.getRole(),
report);
+ List<HostRoleCommand> commands = db.getRequestTasks(requestId);
+ LOG.debug("List of commands " + (commands == null ? 0: commands.size()));
+ if (commands != null) {
+ for (HostRoleCommand cmd : commands) {
+ LOG.info("******COMMAND DUMP*****" + cmd);
+ }
+ }
}
}
@@ -104,4 +128,31 @@ public class ActionManager {
return requestCounter.incrementAndGet();
}
+ public List<HostRoleCommand> getRequestTasks(long requestId) {
+ List<HostRoleCommand> commands = db.getRequestTasks(requestId);
+ LOG.debug("GETTING List of commands for request Id " + requestId + " : " +
+ (commands == null ? 0: commands.size()));
+ if (commands != null) {
+ for (HostRoleCommand command : commands) {
+ LOG.info("******GETTING COMMAND DUMP*****" + command);
+ }
+ }
+ return commands;
+ }
+
+ public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds) {
+ return db.getTasks(taskIds);
+ }
+
+ public List<Stage> getRequestsByHostRoleStatus(Set<HostRoleStatus> statuses) {
+ return db.getStagesByHostRoleStatus(statuses);
+ }
+
+ /**
+ * Returns last 20 requests
+ * @return
+ */
+ public List<Long> getRequests() {
+ return db.getRequests();
+ }
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java Wed Nov 7 08:13:12 2012
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.state.Cluster;
@@ -89,10 +90,14 @@ class ActionScheduler implements Runnabl
private void doWork() throws AmbariException {
List<Stage> stages = db.getStagesInProgress();
- LOG.info("Scheduler wakes up");
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Scheduler wakes up");
+ }
if (stages == null || stages.isEmpty()) {
//Nothing to do
- LOG.info("No stage in progress..nothing to do");
+ if (LOG.isDebugEnabled()) {
+ LOG.info("No stage in progress..nothing to do");
+ }
return;
}
@@ -186,6 +191,8 @@ class ActionScheduler implements Runnabl
ServiceComponentHost svcCompHost =
svcComp.getServiceComponentHost(host);
svcCompHost.handleEvent(timeoutEvent);
+ } catch (ServiceComponentNotFoundException scnex) {
+ LOG.info("Not a service component, assuming its an action", scnex);
} catch (InvalidStateTransitionException e) {
LOG.info("Transition failed for host: "+host+", role: "+roleStr, e);
} catch (AmbariException ex) {
@@ -235,8 +242,8 @@ class ActionScheduler implements Runnabl
ServiceComponentHost svcCompHost =
svcComp.getServiceComponentHost(hostname);
svcCompHost.handleEvent(s.getFsmEvent(hostname, roleStr));
- s.setStartTime(hostname,roleStr, now);
- s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);
+ } catch (ServiceComponentNotFoundException scnex) {
+ LOG.info("Not a service component, assuming its an action", scnex);
} catch (InvalidStateTransitionException e) {
LOG.info(
"Transition failed for host: " + hostname + ", role: "
@@ -247,6 +254,8 @@ class ActionScheduler implements Runnabl
e);
throw e;
}
+ s.setStartTime(hostname,roleStr, now);
+ s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);
}
s.setLastAttemptTime(hostname, roleStr, now);
s.incrementAttemptCount(hostname, roleStr);
@@ -279,6 +288,9 @@ class ActionScheduler implements Runnabl
break;
case ABORTED:
rs.numAborted++;
+ break;
+ default:
+ LOG.error("Unknown status " + status.name());
}
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java Wed Nov 7 08:13:12 2012
@@ -20,7 +20,6 @@ package org.apache.ambari.server.actionm
import java.util.ArrayList;
import java.util.List;
-import org.apache.ambari.server.agent.AgentCommand;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.utils.StageUtils;
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java Wed Nov 7 08:13:12 2012
@@ -42,7 +42,8 @@ import java.io.IOException;
public class HostRoleCommand {
private static final Logger log = LoggerFactory.getLogger(HostRoleCommand.class);
- private int taskId = -1;
+ private long taskId = -1;
+ private long stageId = -1;
private String hostName;
private final Role role;
private HostRoleStatus status = HostRoleStatus.PENDING;
@@ -66,6 +67,7 @@ public class HostRoleCommand {
@AssistedInject
public HostRoleCommand(@Assisted HostRoleCommandEntity hostRoleCommandEntity, Injector injector) {
taskId = hostRoleCommandEntity.getTaskId();
+ stageId = hostRoleCommandEntity.getStage().getStageId();
this.hostName = hostRoleCommandEntity.getHostName();
role = hostRoleCommandEntity.getRole();
status = hostRoleCommandEntity.getStatus();
@@ -126,11 +128,11 @@ public class HostRoleCommand {
}
- public int getTaskId() {
+ public long getTaskId() {
return taskId;
}
- public void setTaskId(int taskId) {
+ public void setTaskId(long taskId) {
this.taskId = taskId;
executionCommand.setTaskId(taskId);
}
@@ -211,6 +213,14 @@ public class HostRoleCommand {
this.executionCommand = executionCommand;
}
+ public long getStageId() {
+ return stageId;
+ }
+
+ public void setStageId(long stageId) {
+ this.stageId = stageId;
+ }
+
@Override
public int hashCode() {
return role.hashCode();
@@ -229,6 +239,7 @@ public class HostRoleCommand {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("HostRoleCommand State:\n");
+ builder.append(" TaskId: " + taskId + "\n");
builder.append(" Role: " + role + "\n");
builder.append(" Status: " + status + "\n");
builder.append(" Event: " + event + "\n");
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java Wed Nov 7 08:13:12 2012
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import com.google.inject.Injector;
@@ -54,10 +55,11 @@ public class Stage {
//Map of host to host-roles
Map<String, Map<String, HostRoleCommand>> hostRoleCommands =
new TreeMap<String, Map<String, HostRoleCommand>>();
- private Map<String, List<ExecutionCommand>> commandsToSend =
+ private Map<String, List<ExecutionCommand>> commandsToSend =
new TreeMap<String, List<ExecutionCommand>>();
- public Stage(long requestId, String logDir, String clusterName) {
+ @AssistedInject
+ public Stage(@Assisted long requestId, @Assisted("logDir") String logDir, @Assisted("clusterName") String clusterName) {
this.requestId = requestId;
this.logDir = logDir;
this.clusterName = clusterName;
@@ -106,7 +108,7 @@ public class Stage {
public StageEntity constructNewPersistenceEntity() {
StageEntity stageEntity = new StageEntity();
stageEntity.setRequestId(requestId);
- stageEntity.setStageId(stageId);
+ stageEntity.setStageId(getStageId());
stageEntity.setLogInfo(logDir);
stageEntity.setHostRoleCommands(new ArrayList<HostRoleCommandEntity>());
stageEntity.setRoleSuccessCriterias(new ArrayList<RoleSuccessCriteriaEntity>());
@@ -153,7 +155,7 @@ public class Stage {
}
public String getActionId() {
- return StageUtils.getActionId(requestId, stageId);
+ return StageUtils.getActionId(requestId, getStageId());
}
/**
@@ -162,7 +164,7 @@ public class Stage {
* adds them to the Stage. This should be called only once for a host-role
* for a given stage.
*/
- public synchronized void addHostRoleExecutionCommand(String host, Role role, RoleCommand command,
+ public synchronized void addHostRoleExecutionCommand(String host, Role role, RoleCommand command,
ServiceComponentHostEvent event, String clusterName, String serviceName) {
Log.info("Adding host role command for role: "+role+", command: "+command
+", event: "+event+", clusterName: "+clusterName+", serviceName: "+serviceName);
@@ -198,12 +200,12 @@ public class Stage {
}
execCmdList.add(cmd);
}
-
+
/**
- *
+ *
* @return list of hosts
*/
- public synchronized List<String> getHosts() {
+ public synchronized List<String> getHosts() { // TODO: Check whether method should be synchronized
List<String> hlist = new ArrayList<String>();
for (String h : this.hostRoleCommands.keySet()) {
hlist.add(h);
@@ -219,11 +221,11 @@ public class Stage {
return f;
}
}
-
+
public synchronized void setSuccessFactors(Map<Role, Float> suc) {
successFactors = suc;
}
-
+
public synchronized Map<Role, Float> getSuccessFactors() {
return successFactors;
}
@@ -260,7 +262,7 @@ public class Stage {
}
return null;
}
-
+
public List<ExecutionCommand> getExecutionCommands(String hostname) {
return this.commandsToSend.get(hostname);
}
@@ -268,29 +270,29 @@ public class Stage {
public long getStartTime(String hostname, String role) {
return this.hostRoleCommands.get(hostname).get(role).getStartTime();
}
-
+
public void setStartTime(String hostname, String role, long startTime) {
this.hostRoleCommands.get(hostname).get(role).setStartTime(startTime);
}
-
+
public HostRoleStatus getHostRoleStatus(String hostname, String role) {
return this.hostRoleCommands.get(hostname).get(role).getStatus();
}
-
+
public void setHostRoleStatus(String host, String role,
HostRoleStatus status) {
this.hostRoleCommands.get(host).get(role).setStatus(status);
}
-
+
public ServiceComponentHostEvent getFsmEvent(String hostname, String roleStr) {
return this.hostRoleCommands.get(hostname).get(roleStr).getEvent();
}
-
+
public void setExitCode(String hostname, String role, int exitCode) {
this.hostRoleCommands.get(hostname).get(role).setExitCode(exitCode);
}
-
+
public int getExitCode(String hostname, String role) {
return this.hostRoleCommands.get(hostname).get(role).getExitCode();
}
@@ -302,7 +304,7 @@ public class Stage {
public void setStdout(String hostname, String role, String stdOut) {
this.hostRoleCommands.get(hostname).get(role).setStdout(stdOut);
}
-
+
public synchronized boolean isStageInProgress() {
for(String host: hostRoleCommands.keySet()) {
for (String role : hostRoleCommands.get(host).keySet()) {
@@ -311,7 +313,7 @@ public class Stage {
return false;
}
if (hrc.getStatus().equals(HostRoleStatus.PENDING) ||
- hrc.getStatus().equals(HostRoleStatus.QUEUED) ||
+ hrc.getStatus().equals(HostRoleStatus.QUEUED) ||
hrc.getStatus().equals(HostRoleStatus.IN_PROGRESS)) {
return true;
}
@@ -320,6 +322,23 @@ public class Stage {
return false;
}
+ public synchronized boolean doesStageHaveHostRoleStatus(
+ Set<HostRoleStatus> statuses) {
+ for(String host: hostRoleCommands.keySet()) {
+ for (String role : hostRoleCommands.get(host).keySet()) {
+ HostRoleCommand hrc = hostRoleCommands.get(host).get(role);
+ if (hrc == null) {
+ return false;
+ }
+ for (HostRoleStatus status : statuses)
+ if (hrc.getStatus().equals(status)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
public Map<String, List<ExecutionCommand>> getExecutionCommands() {
return this.commandsToSend;
}
@@ -331,7 +350,7 @@ public class Stage {
/**
* This method should be used only in stage planner. To add
* a new execution command use
- * {@link #addHostRoleExecutionCommand(String, Role, RoleCommand,
+ * {@link #addHostRoleExecutionCommand(String, Role, RoleCommand,
* ServiceComponentHostEvent, String, String)}
*/
public synchronized void addExecutionCommand(Stage origStage,
@@ -353,7 +372,7 @@ public class Stage {
HostRoleCommand getHostRoleCommand(String hostname, String role) {
return hostRoleCommands.get(hostname).get(role);
}
-
+
@Override //Object
public String toString() {
StringBuilder builder = new StringBuilder();
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java Wed Nov 7 08:13:12 2012
@@ -18,10 +18,13 @@
package org.apache.ambari.server.actionmanager;
+import com.google.inject.assistedinject.Assisted;
import org.apache.ambari.server.orm.entities.StageEntity;
public interface StageFactory {
+ Stage createNew(long requestId, @Assisted("logDir") String logDir, @Assisted("clusterName") String clusterName);
+
Stage createExisting(String actionId);
Stage createExisting(StageEntity stageEntity);
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java Wed Nov 7 08:13:12 2012
@@ -124,6 +124,6 @@ public class CommandReport {
@Override
public String toString() {
- return actionId + "-" + role;
+ return taskId + " " + role + " " + status + " " + exitCode;
}
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java Wed Nov 7 08:13:12 2012
@@ -41,7 +41,7 @@ public class ExecutionCommand extends Ag
super(AgentCommandType.EXECUTION_COMMAND);
}
private String clusterName;
- private int taskId;
+ private long taskId;
private String commandId;
private String hostname;
private Role role;
@@ -91,12 +91,12 @@ public class ExecutionCommand extends Ag
}
@JsonProperty("taskId")
- public int getTaskId() {
+ public long getTaskId() {
return taskId;
}
@JsonProperty("taskId")
- public void setTaskId(int taskId) {
+ public void setTaskId(long taskId) {
this.taskId = taskId;
}
@@ -189,4 +189,5 @@ public class ExecutionCommand extends Ag
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
+
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java?rev=1406489&r1=1406488&r2=1406489&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java Wed Nov 7 08:13:12 2012
@@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.HostNotFoundException;
+import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.state.AgentVersion;
@@ -76,16 +77,13 @@ public class HeartBeatHandler {
public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
throws AmbariException {
HeartBeatResponse response = new HeartBeatResponse();
- response.setResponseId(0L);
+ response.setResponseId(0L);
String hostname = heartbeat.getHostname();
LOG.info("Action queue reference = "+actionQueue);
LOG.info("Heartbeat received from host " + heartbeat.getHostname()
+ " responseId=" + heartbeat.getResponseId());
Host hostObject = clusterFsm.getHost(hostname);
- // FIXME need to remove this hack
- hostObject.refresh();
long now = System.currentTimeMillis();
- hostObject.refresh();
try {
if (heartbeat.getNodeStatus().getStatus()
@@ -114,12 +112,10 @@ public class HeartBeatHandler {
if (service == null || "".equals(service)) {
throw new AmbariException("Invalid command report, service: "+service);
}
- Service svc = cl.getService(service);
- ServiceComponent svcComp = svc.getServiceComponent(
- report.getRole());
- ServiceComponentHost scHost = svcComp.getServiceComponentHost(
- hostname);
try {
+ Service svc = cl.getService(service);
+ ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
+ ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
if (report.getStatus().equals("COMPLETED")) {
scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(scHost
.getServiceComponentName(), hostname, now));
@@ -128,6 +124,8 @@ public class HeartBeatHandler {
scHost.handleEvent(new ServiceComponentHostOpFailedEvent(scHost
.getServiceComponentName(), hostname, now));
}
+ } catch (ServiceComponentNotFoundException scnex) {
+ LOG.info("Not a service component, assuming its an action", scnex);
} catch (InvalidStateTransitionException ex) {
LOG.warn("State machine exception", ex);
}