You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2013/02/04 03:24:02 UTC
svn commit: r1442010 [2/29] - in /incubator/ambari/branches/branch-1.2: ./
ambari-agent/ ambari-agent/conf/unix/ ambari-agent/src/examples/
ambari-agent/src/main/puppet/modules/hdp-ganglia/files/
ambari-agent/src/main/puppet/modules/hdp-ganglia/manifes...
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/files/check_name_dir_status.php
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/files/check_name_dir_status.php?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/files/check_name_dir_status.php (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/files/check_name_dir_status.php Mon Feb 4 02:23:55 2013
@@ -36,8 +36,8 @@
$json_array = json_decode($json_string, true);
$object = $json_array['beans'][0];
if ($object['NameDirStatuses'] == "") {
- echo "UNKNOWN: Namenode directory status not available via http://<nn_host>:port/jmx url" . "\n";
- exit(3);
+ echo "WARNING: Namenode directory status not available via http://".$host.":".$port."/jmx url" . "\n";
+ exit(1);
}
$NameDirStatuses = json_decode($object['NameDirStatuses'], true);
$failed_dir_count = count($NameDirStatuses['failed']);
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/manifests/server.pp
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/manifests/server.pp?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/manifests/server.pp (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/manifests/server.pp Mon Feb 4 02:23:55 2013
@@ -162,12 +162,14 @@ class hdp-nagios::server(
notify => Class['hdp-nagios::server::services']
}
+ class { 'hdp-nagios::server::enable_snmp': }
+
class { 'hdp-nagios::server::web_permisssions': }
class { 'hdp-nagios::server::services': ensure => $service_state}
- Class['hdp-nagios::server::packages'] -> Hdp::Directory[$nagios_config_dir] -> Hdp::Directory[$plugins_dir] -> Hdp::Directory_recursive_create[$nagios_pid_dir] ->
+ Class['hdp-nagios::server::packages'] -> Class['hdp-nagios::server::enable_snmp']-> Hdp::Directory[$nagios_config_dir] -> Hdp::Directory[$plugins_dir] -> Hdp::Directory_recursive_create[$nagios_pid_dir] ->
Hdp::Directory[$nagios_obj_dir] -> Hdp::Directory_Recursive_Create[$nagios_var_dir] ->
Hdp::Directory_Recursive_Create[$check_result_path] -> Hdp::Directory_Recursive_Create[$nagios_rw_dir] ->
Class['hdp-nagios::server::config'] -> Class['hdp-nagios::server::web_permisssions'] -> Class['hdp-nagios::server::services'] -> Class['hdp-monitor-webserver']
@@ -217,3 +219,12 @@ class hdp-nagios::server::services($ensu
anchor{'hdp-nagios::server::services::begin':} -> Service['nagios'] -> anchor{'hdp-nagios::server::services::end':}
}
}
+
+class hdp-nagios::server::enable_snmp() {
+
+ exec { "enable_snmp":
+ command => "service snmpd start; chkconfig snmpd on",
+ path => "/usr/local/bin/:/bin/:/sbin/",
+ }
+
+}
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/manifests/server/packages.pp
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/manifests/server/packages.pp?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/manifests/server/packages.pp (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-nagios/manifests/server/packages.pp Mon Feb 4 02:23:55 2013
@@ -28,9 +28,17 @@ class hdp-nagios::server::packages(
ensure => 'uninstalled'
}
} elsif ($service_state in ['running','stopped','installed_and_configured']) {
-
-
+ hdp::package { 'perl':
+ ensure => present,
+ java_needed => false
+ }
+
+ hdp::package { 'perl-Net-SNMP':
+ ensure => present,
+ java_needed => false
+ }
+
hdp::package { 'nagios-server':
ensure => present,
java_needed => false
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/service.pp Mon Feb 4 02:23:55 2013
@@ -29,6 +29,7 @@ class hdp-oozie::service(
$user = "$hdp-oozie::params::oozie_user"
$hadoop_home = $hdp-oozie::params::hadoop_prefix
$oozie_tmp = $hdp-oozie::params::oozie_tmp_dir
+ $oozie_hdfs_user_dir = $hdp::params::oozie_hdfs_user_dir
$cmd = "env HADOOP_HOME=${hadoop_home} /usr/sbin/oozie_server.sh"
$pid_file = "${hdp-oozie::params::oozie_pid_dir}/oozie.pid"
$jar_location = $hdp::params::hadoop_jar_location
@@ -42,11 +43,11 @@ class hdp-oozie::service(
$cmd1 = "cd /usr/lib/oozie && tar -xvf oozie-sharelib.tar.gz"
$cmd2 = "cd /usr/lib/oozie && mkdir -p ${oozie_tmp}"
- $cmd3 = "cd /usr/lib/oozie && chown ${user}:hadoop ${oozie_tmp}"
+ $cmd3 = "cd /usr/lib/oozie && chown ${user}:${hdp::params::user_group} ${oozie_tmp}"
$cmd4 = "cd ${oozie_tmp} && /usr/lib/oozie/bin/oozie-setup.sh -hadoop 0.20.200 $jar_location -extjs $ext_js_path $lzo_jar_suffix"
$cmd5 = "cd ${oozie_tmp} && /usr/lib/oozie/bin/ooziedb.sh create -sqlfile oozie.sql -run ; echo 0"
- $cmd6 = "hadoop dfs -put /usr/lib/oozie/share share ; hadoop dfs -chmod -R 755 /user/${user}/share"
- $cmd7 = "/usr/lib/oozie/bin/oozie-start.sh"
+ $cmd6 = "su - ${user} -c 'hadoop dfs -put /usr/lib/oozie/share ${oozie_hdfs_user_dir} ; hadoop dfs -chmod -R 755 ${oozie_hdfs_user_dir}/share'"
+ #$cmd7 = "/usr/lib/oozie/bin/oozie-start.sh"
if ($ensure == 'installed_and_configured') {
$sh_cmds = [$cmd1, $cmd2, $cmd3]
@@ -75,9 +76,16 @@ class hdp-oozie::service(
hdp-oozie::service::exec_user{$user_cmds:}
Hdp-oozie::Service::Directory<||> -> Hdp-oozie::Service::Exec_sh[$cmd1] -> Hdp-oozie::Service::Exec_sh[$cmd2] ->Hdp-oozie::Service::Exec_sh[$cmd3] -> Hdp-oozie::Service::Exec_user[$cmd4] ->Hdp-oozie::Service::Exec_user[$cmd5] -> Anchor['hdp-oozie::service::end']
} elsif ($ensure == 'running') {
- $user_cmds = [$cmd6, $cmd7]
- hdp-oozie::service::exec_user{$user_cmds:}
- Hdp-oozie::Service::Exec_user[$cmd6] -> Hdp-oozie::Service::Exec_user[$cmd7] -> Anchor['hdp-oozie::service::end']
+ hdp::exec { "exec $cmd6" :
+ command => $cmd6,
+ unless => "hadoop dfs -ls /user/oozie/share | awk 'BEGIN {count=0;} /share/ {count++} END {if (count > 0) {exit 0} else {exit 1}}'"
+ }
+ hdp::exec { "exec $start_cmd":
+ command => $start_cmd,
+ unless => $no_op_test,
+ initial_wait => $initial_wait,
+ require => Exec["exec $cmd6"]
+ }
} elsif ($ensure == 'stopped') {
hdp::exec { "exec $stop_cmd":
command => $stop_cmd,
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-zookeeper/manifests/service.pp
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-zookeeper/manifests/service.pp?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-zookeeper/manifests/service.pp (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp-zookeeper/manifests/service.pp Mon Feb 4 02:23:55 2013
@@ -85,11 +85,10 @@ class hdp-zookeeper::service(
class hdp-zookeeper::set_myid($myid)
{
- $create_file = "${hdp-zookeeper::params::zk_data_dir}/myid"
- $cmd = "echo '${myid}' > ${create_file}"
- hdp::exec{ $cmd:
- command => $cmd,
- creates => $create_file
+ file {"${hdp-zookeeper::params::zk_data_dir}/myid":
+ ensure => file,
+ content => $myid,
+ mode => 0644,
}
}
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp/manifests/init.pp
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp/manifests/init.pp?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp/manifests/init.pp (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp/manifests/init.pp Mon Feb 4 02:23:55 2013
@@ -98,28 +98,26 @@ class hdp::pre_install_pkgs
class hdp::create_smoke_user()
{
+
$smoke_group = $hdp::params::smoke_user_group
$smoke_user = $hdp::params::smokeuser
$security_enabled = $hdp::params::security_enabled
-
if ( $smoke_group != $proxyuser_group) {
group { $smoke_group :
ensure => present
}
}
-
- group { $proxyuser_group :
- ensure => present
+
+ if ($hdp::params::user_group != $proxyuser_group) {
+ group { $proxyuser_group :
+ ensure => present
+ }
}
-
- hdp::user { $smoke_user: gid => $proxyuser_group}
-
- $cmd = "usermod -g $smoke_group $smoke_user"
- $check_group_cmd = "id -gn $smoke_user | grep $smoke_group"
- hdp::exec{ $cmd:
- command => $cmd,
- unless => $check_group_cmd
+
+ hdp::user { $smoke_user:
+ gid => $hdp::params::user_group,
+ groups => ["$proxyuser_group"]
}
if ($security_enabled == true) {
@@ -133,11 +131,7 @@ class hdp::create_smoke_user()
}
}
- if ( $smoke_group != $proxyuser_group) {
- Group[$smoke_group] -> Group[$proxyuser_group] -> Hdp::User[$smoke_user] -> Hdp::Exec[$cmd]
- } else {
- Group[$smoke_group] -> Hdp::User[$smoke_user] -> Hdp::Exec[$cmd]
- }
+ Group<||> -> Hdp::User[$smoke_user]
}
@@ -153,7 +147,8 @@ class hdp::set_selinux()
define hdp::user(
$gid = $hdp::params::user_group,
- $just_validate = undef
+ $just_validate = undef,
+ $groups = undef
)
{
$user_info = $hdp::params::user_info[$name]
@@ -175,7 +170,8 @@ define hdp::user(
ensure => present,
managehome => true,
gid => $gid, #TODO either remove this to support LDAP env or fix it
- shell => '/bin/bash'
+ shell => '/bin/bash',
+ groups => $groups
}
}
}
@@ -187,7 +183,8 @@ define hdp::directory(
$mode = undef,
$ensure = directory,
$force = undef,
- $service_state = 'running'
+ $service_state = 'running',
+ $override_owner = false
)
{
if (($service_state == 'uninstalled') and ($wipeoff_data == true)) {
@@ -199,13 +196,21 @@ define hdp::directory(
force => $force
}
} elsif ($service_state != 'uninstalled') {
- file { $name :
- ensure => present,
- owner => $owner,
- group => $group,
- mode => $mode,
- force => $force
- }
+ if $override_owner == true {
+ file { $name :
+ ensure => present,
+ owner => $owner,
+ group => $group,
+ mode => $mode,
+ force => $force
+ }
+ } else {
+ file { $name :
+ ensure => present,
+ mode => $mode,
+ force => $force
+ }
+ }
}
}
#TODO: check on -R flag and use of recurse
@@ -216,7 +221,8 @@ define hdp::directory_recursive_create(
$context_tag = undef,
$ensure = directory,
$force = undef,
- $service_state = 'running'
+ $service_state = 'running',
+ $override_owner = true
)
{
@@ -231,7 +237,8 @@ define hdp::directory_recursive_create(
mode => $mode,
ensure => $ensure,
force => $force,
- service_state => $service_state
+ service_state => $service_state,
+ override_owner => $override_owner
}
Hdp::Exec["mkdir -p ${name}"] -> Hdp::Directory[$name]
}
@@ -247,9 +254,16 @@ define hdp::directory_recursive_create_i
)
{
hdp::exec {"mkdir -p ${name} ; exit 0" :
- command => "mkdir -p ${name} ; chown ${owner}:${group} ${name}; chmod ${mode} ${name} ; exit 0",
+ command => "mkdir -p ${name} ; exit 0",
creates => $name
}
+ hdp::exec {"chown ${owner}:${group} ${name}; exit 0" :
+ command => "chown ${owner}:${group} ${name}; exit 0"
+ }
+ hdp::exec {"chmod ${mode} ${name} ; exit 0" :
+ command => "chmod ${mode} ${name} ; exit 0"
+ }
+ Hdp::Exec["mkdir -p ${name} ; exit 0"] -> Hdp::Exec["chown ${owner}:${group} ${name}; exit 0"] -> Hdp::Exec["chmod ${mode} ${name} ; exit 0"]
}
### helper to do exec
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp/manifests/params.pp
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp/manifests/params.pp?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp/manifests/params.pp (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/puppet/modules/hdp/manifests/params.pp Mon Feb 4 02:23:55 2013
@@ -110,8 +110,6 @@ class hdp::params()
$public_webhcat_server_host = hdp_default("webhcat_server_host")
}
- ############ Hdfs directories
- $hbase_hdfs_root_dir = hdp_default("hadoop/hbase-site/hbase_hdfs_root_dir","/apps/hbase/data")
############ users
$user_info = hdp_default("user_info",{})
@@ -133,6 +131,13 @@ class hdp::params()
$smokeuser = hdp_default("smokeuser","ambari_qa")
$smoke_user_group = hdp_default("smoke_user_group","users")
+
+ ############ Hdfs directories
+ $hbase_hdfs_root_dir = hdp_default("hadoop/hbase-site/hbase_hdfs_root_dir","/apps/hbase/data")
+ $oozie_hdfs_user_dir = hdp_default("oozie_hdfs_user_dir", "/user/oozie")
+ $hcat_hdfs_user_dir = hdp_default("hcat_hdfs_user_dir", "/user/hcat")
+ $hive_hdfs_user_dir = hdp_default("hive_hdfs_user_dir", "/user/hive")
+ $smoke_hdfs_user_dir = hdp_default("smoke_hdfs_user_dir", "/user/${smokeuser}")
#because of Puppet user resource issue make sure that $hadoop_user is different from user_group
if ($security_enabled == true) {
@@ -496,6 +501,16 @@ class hdp::params()
64 => {'ALL' => 'hdp_mon_dashboard'}
},
+ perl =>
+ {
+ 64 => {'ALL' => 'perl'}
+ },
+
+ perl-Net-SNMP =>
+ {
+ 64 => {'ALL' => 'perl-Net-SNMP'}
+ },
+
nagios-server => {
64 => {'ALL' => 'nagios-3.2.3'}
},
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/ActionQueue.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/ActionQueue.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/ActionQueue.py Mon Feb 4 02:23:55 2013
@@ -109,8 +109,9 @@ class ActionQueue(threading.Thread):
cluster = command['clusterName']
service = command['serviceName']
component = command['componentName']
+ globalConfig = command['configurations']['global']
try:
- livestatus = LiveStatus(cluster, service, component)
+ livestatus = LiveStatus(cluster, service, component, globalConfig)
result = livestatus.build()
logger.info("Got live status for component " + component + " of service " + str(service) +\
" of cluster " + str(cluster) + "\n" + pprint.pformat(result))
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py Mon Feb 4 02:23:55 2013
@@ -35,8 +35,6 @@ secured_url_port=8441
prefix=/tmp/ambari-agent
[services]
-serviceToPidMapFile=servicesToPidNames.dict
-pidLookupPath=/var/run/
[stack]
installprefix=/tmp
@@ -54,19 +52,155 @@ sleepBetweenRetries=1
keysdir=/tmp/ambari-agent
server_crt=ca.crt
passphrase_env_var_name=AMBARI_PASSPHRASE
+
+[heartbeat]
+state_interval = 6
+dirs=/etc/hadoop,/etc/hadoop/conf,/var/run/hadoop,/var/log/hadoop
+rpms=hadoop,openssl,wget,net-snmp,ntpd,ruby,ganglia,nagios
"""
s = StringIO.StringIO(content)
config.readfp(s)
+imports = [
+ "hdp/manifests/*.pp",
+ "hdp-hadoop/manifests/*.pp",
+ "hdp-hbase/manifests/*.pp",
+ "hdp-zookeeper/manifests/*.pp",
+ "hdp-oozie/manifests/*.pp",
+ "hdp-pig/manifests/*.pp",
+ "hdp-sqoop/manifests/*.pp",
+ "hdp-templeton/manifests/*.pp",
+ "hdp-hive/manifests/*.pp",
+ "hdp-hcat/manifests/*.pp",
+ "hdp-mysql/manifests/*.pp",
+ "hdp-monitor-webserver/manifests/*.pp",
+ "hdp-repos/manifests/*.pp"
+]
+
+rolesToClass = {
+ 'NAMENODE': 'hdp-hadoop::namenode',
+ 'DATANODE': 'hdp-hadoop::datanode',
+ 'SECONDARY_NAMENODE': 'hdp-hadoop::snamenode',
+ 'JOBTRACKER': 'hdp-hadoop::jobtracker',
+ 'TASKTRACKER': 'hdp-hadoop::tasktracker',
+ 'HDFS_CLIENT': 'hdp-hadoop::client',
+ 'MAPREDUCE_CLIENT': 'hdp-hadoop::client',
+ 'ZOOKEEPER_SERVER': 'hdp-zookeeper',
+ 'ZOOKEEPER_CLIENT': 'hdp-zookeeper::client',
+ 'HBASE_MASTER': 'hdp-hbase::master',
+ 'HBASE_REGIONSERVER': 'hdp-hbase::regionserver',
+ 'HBASE_CLIENT': 'hdp-hbase::client',
+ 'PIG': 'hdp-pig',
+ 'SQOOP': 'hdp-sqoop',
+ 'OOZIE_SERVER': 'hdp-oozie::server',
+ 'OOZIE_CLIENT': 'hdp-oozie::client',
+ 'HIVE_CLIENT': 'hdp-hive::client',
+ 'HCAT': 'hdp-hcat',
+ 'HIVE_SERVER': 'hdp-hive::server',
+ 'HIVE_METASTORE': 'hdp-hive::metastore',
+ 'MYSQL_SERVER': 'hdp-mysql::server',
+ 'WEBHCAT_SERVER': 'hdp-templeton::server',
+ 'DASHBOARD': 'hdp-dashboard',
+ 'NAGIOS_SERVER': 'hdp-nagios::server',
+ 'GANGLIA_SERVER': 'hdp-ganglia::server',
+ 'GANGLIA_MONITOR': 'hdp-ganglia::monitor',
+ 'HTTPD': 'hdp-monitor-webserver',
+ 'HDFS_SERVICE_CHECK': 'hdp-hadoop::hdfs::service_check',
+ 'MAPREDUCE_SERVICE_CHECK': 'hdp-hadoop::mapred::service_check',
+ 'ZOOKEEPER_SERVICE_CHECK': 'hdp-zookeeper::zookeeper::service_check',
+ 'ZOOKEEPER_QUORUM_SERVICE_CHECK': 'hdp-zookeeper::quorum::service_check',
+ 'HBASE_SERVICE_CHECK': 'hdp-hbase::hbase::service_check',
+ 'HIVE_SERVICE_CHECK': 'hdp-hive::hive::service_check',
+ 'HCAT_SERVICE_CHECK': 'hdp-hcat::hcat::service_check',
+ 'OOZIE_SERVICE_CHECK': 'hdp-oozie::oozie::service_check',
+ 'PIG_SERVICE_CHECK': 'hdp-pig::pig::service_check',
+ 'SQOOP_SERVICE_CHECK': 'hdp-sqoop::sqoop::service_check',
+ 'WEBHCAT_SERVICE_CHECK': 'hdp-templeton::templeton::service_check',
+ 'DASHBOARD_SERVICE_CHECK': 'hdp-dashboard::dashboard::service_check',
+ 'DECOMMISSION_DATANODE': 'hdp-hadoop::hdfs::decommission'
+}
+
+serviceStates = {
+ 'START': 'running',
+ 'INSTALL': 'installed_and_configured',
+ 'STOP': 'stopped'
+}
+
+servicesToPidNames = {
+ 'NAMENODE': 'hadoop-[A-Za-z0-9_]+-namenode.pid$',
+ 'SECONDARY_NAMENODE': 'hadoop-[A-Za-z0-9_]+-secondarynamenode.pid$',
+ 'DATANODE': 'hadoop-[A-Za-z0-9_]+-datanode.pid$',
+ 'JOBTRACKER': 'hadoop-[A-Za-z0-9_]+-jobtracker.pid$',
+ 'TASKTRACKER': 'hadoop-[A-Za-z0-9_]+-tasktracker.pid$',
+ 'OOZIE_SERVER': 'oozie.pid',
+ 'ZOOKEEPER_SERVER': 'zookeeper_server.pid',
+ 'TEMPLETON_SERVER': 'templeton.pid',
+ 'NAGIOS_SERVER': 'nagios.pid',
+ 'GANGLIA_SERVER': 'gmetad.pid',
+ 'GANGLIA_MONITOR': 'gmond.pid',
+ 'HBASE_MASTER': 'hbase-[A-Za-z0-9_]+-master.pid',
+ 'HBASE_REGIONSERVER': 'hbase-[A-Za-z0-9_]+-regionserver.pid',
+ 'HCATALOG_SERVER': 'hcat.pid',
+ 'KERBEROS_SERVER': 'kadmind.pid',
+ 'HIVE_SERVER': 'hive-server.pid',
+ 'HIVE_METASTORE': 'hive.pid',
+ 'MYSQL_SERVER': 'mysqld.pid'
+}
+
+pidPathesVars = [
+ {'var' : 'hadoop_pid_dir_prefix',
+ 'defaultValue' : '/var/run/hadoop'},
+ {'var' : 'hadoop_pid_dir_prefix',
+ 'defaultValue' : '/var/run/hadoop'},
+ {'var' : 'ganglia_runtime_dir',
+ 'defaultValue' : '/var/run/ganglia/hdp'},
+ {'var' : 'hbase_pid_dir',
+ 'defaultValue' : '/var/run/hbase'},
+ {'var' : '',
+ 'defaultValue' : '/var/run/nagios'},
+ {'var' : 'zk_pid_dir',
+ 'defaultValue' : '/var/run/zookeeper'},
+ {'var' : 'oozie_pid_dir',
+ 'defaultValue' : '/var/run/oozie'},
+ {'var' : 'hcat_pid_dir',
+ 'defaultValue' : '/var/run/webhcat'},
+ {'var' : 'hive_pid_dir',
+ 'defaultValue' : '/var/run/hive'},
+ {'var' : 'mysqld_pid_dir',
+ 'defaultValue' : '/var/run/mysqld'}
+]
+
class AmbariConfig:
def getConfig(self):
global config
return config
+ def getImports(self):
+ global imports
+ return imports
+
+ def getRolesToClass(self):
+ global rolesToClass
+ return rolesToClass
+
+ def getServiceStates(self):
+ global serviceStates
+ return serviceStates
+
+ def getServicesToPidNames(self):
+ global servicesToPidNames
+ return servicesToPidNames
+
+ def getPidPathesVars(self):
+ global pidPathesVars
+ return pidPathesVars
+
+
def setConfig(customConfig):
global config
config = customConfig
+
def main():
print config
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Controller.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Controller.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Controller.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Controller.py Mon Feb 4 02:23:55 2013
@@ -22,7 +22,7 @@ import logging
import logging.handlers
import signal
import json
-import socket
+import hostname
import sys, traceback
import time
import threading
@@ -53,7 +53,7 @@ class Controller(threading.Thread):
self.safeMode = True
self.credential = None
self.config = config
- self.hostname = socket.gethostname()
+ self.hostname = hostname.hostname()
server_secured_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'secured_url_port')
self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
@@ -135,12 +135,15 @@ class Controller(threading.Thread):
retry = False
certVerifFailed = False
+ config = AmbariConfig.config
+ hb_interval = config.get('heartbeat', 'state_interval')
+
#TODO make sure the response id is monotonically increasing
id = 0
while not self.DEBUG_STOP_HEARTBITTING:
try:
if not retry:
- data = json.dumps(self.heartbeat.build(self.responseId))
+ data = json.dumps(self.heartbeat.build(self.responseId, int(hb_interval)))
pass
else:
self.DEBUG_HEARTBEAT_RETRIES += 1
@@ -226,8 +229,7 @@ class Controller(threading.Thread):
self.heartbeatWithServer()
def restartAgent(self):
- #stopping for now, restart will be added later
- ProcessHelper.stopAgent()
+ ProcessHelper.restartAgent()
pass
def sendRequest(self, url, data):
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Heartbeat.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Heartbeat.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Heartbeat.py Mon Feb 4 02:23:55 2013
@@ -23,11 +23,13 @@ import logging
from Hardware import Hardware
from ActionQueue import ActionQueue
from ServerStatus import ServerStatus
+import NetUtil
import AmbariConfig
-import socket
+import hostname
import time
import traceback
from pprint import pprint, pformat
+from HostInfo import HostInfo
logger = logging.getLogger()
@@ -38,7 +40,7 @@ class Heartbeat:
self.actionQueue = actionQueue
self.reports = []
- def build(self, id='-1'):
+ def build(self, id='-1', state_interval=-1):
global clusterId, clusterDefinitionRevision, firstContact
timestamp = int(time.time()*1000)
queueResult = self.actionQueue.result()
@@ -49,9 +51,16 @@ class Heartbeat:
heartbeat = { 'responseId' : int(id),
'timestamp' : timestamp,
- 'hostname' : socket.getfqdn(),
+ 'hostname' : hostname.hostname(),
'nodeStatus' : nodeStatus
}
+ if (int(id) >= 0) and state_interval > 0 and (int(id) % state_interval) == 0:
+ hostInfo = HostInfo()
+ nodeInfo = { }
+ # for now, just do the same work as registration
+ hostInfo.register(nodeInfo)
+ heartbeat['agentEnv'] = nodeInfo
+
if len(queueResult) != 0:
heartbeat['reports'] = queueResult['reports']
heartbeat['componentStatus'] = queueResult['componentStatus']
@@ -60,9 +69,9 @@ class Heartbeat:
return heartbeat
def main(argv=None):
- actionQueue = ActionQueue()
+ actionQueue = ActionQueue(AmbariConfig.config)
heartbeat = Heartbeat(actionQueue)
- print json.dumps(heartbeat.build())
+ print json.dumps(heartbeat.build('3',3))
if __name__ == '__main__':
main()
Added: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/HostInfo.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/HostInfo.py?rev=1442010&view=auto
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/HostInfo.py (added)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/HostInfo.py Mon Feb 4 02:23:55 2013
@@ -0,0 +1,168 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import glob
+import pwd
+import subprocess
+import AmbariConfig
+
+class HostInfo:
+
+ def dirType(self, path):
+ if not os.path.exists(path):
+ return 'not_exist'
+ elif os.path.islink(path):
+ return 'sym_link'
+ elif os.path.isdir(path):
+ return 'directory'
+ elif os.path.isfile(path):
+ return 'file'
+ return 'unknown'
+
+ def rpmInfo(self, rpmList):
+ config = AmbariConfig.config
+
+ try:
+ for rpmName in config.get('heartbeat', 'rpms').split(','):
+ rpmName = rpmName.strip()
+ rpm = { }
+ rpm['name'] = rpmName
+
+ try:
+ osStat = subprocess.Popen(["rpm", "-q", rpmName], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ if (0 != osStat.returncode or 0 == len(out.strip())):
+ rpm['installed'] = False
+ else:
+ rpm['installed'] = True
+ rpm['version'] = out.strip()
+ except:
+ rpm['available'] = False
+
+ rpmList.append(rpm)
+ except:
+ pass
+
+ def hadoopVarRunCount(self):
+ if not os.path.exists('/var/run/hadoop'):
+ return 0
+ pids = glob.glob('/var/run/hadoop/*/*.pid')
+ return len(pids)
+
+ def hadoopVarLogCount(self):
+ if not os.path.exists('/var/log/hadoop'):
+ return 0
+ logs = glob.glob('/var/log/hadoop/*/*.log')
+ return len(logs)
+
+ def etcAlternativesConf(self, etcList):
+ if not os.path.exists('/etc/alternatives'):
+ return []
+ confs = glob.glob('/etc/alternatives/*conf')
+
+ for conf in confs:
+ confinfo = { }
+ realconf = conf
+ if os.path.islink(conf):
+ realconf = os.path.realpath(conf)
+ confinfo['name'] = conf
+ confinfo['target'] = realconf
+ etcList.append(confinfo)
+
+ def repos(self):
+ # centos, redhat
+ try:
+ osStat = subprocess.Popen(["yum", "-C", "repolist"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ return out
+ except:
+ pass
+ # suse, only if above failed
+ try:
+ osStat = subprocess.Popen(["zypper", "repos"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ return out
+ except:
+ pass
+
+ # default, never return empty
+ return "could_not_determine"
+
+
+ def register(self, dict):
+ dict['varLogHadoopLogCount'] = self.hadoopVarLogCount()
+ dict['varRunHadoopPidCount'] = self.hadoopVarRunCount()
+
+ etcs = []
+ self.etcAlternativesConf(etcs)
+ dict['etcAlternativesConf'] = etcs
+
+ dirs = []
+ config = AmbariConfig.config
+ try:
+ for dirName in config.get('heartbeat', 'dirs').split(','):
+ obj = { }
+ obj['type'] = self.dirType(dirName.strip())
+ obj['name'] = dirName.strip()
+ dirs.append(obj)
+ except:
+ pass
+
+ dict['paths'] = dirs
+
+ java = []
+ self.javaProcs(java)
+ dict['javaProcs'] = java
+
+ rpms = []
+ self.rpmInfo(rpms)
+ dict['rpms'] = rpms
+
+ dict['repoInfo'] = self.repos()
+
+ def javaProcs(self, list):
+ try:
+ pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
+ for pid in pids:
+ cmd = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
+ cmd = cmd.replace('\0', ' ')
+ if 'java' in cmd:
+ dict = { }
+ dict['pid'] = int(pid)
+ dict['hadoop'] = True if 'hadoop' in cmd else False
+ dict['command'] = cmd.strip()
+ for line in open(os.path.join('/proc', pid, 'status')):
+ if line.startswith('Uid:'):
+ uid = int(line.split()[1])
+ dict['user'] = pwd.getpwuid(uid).pw_name
+ list.append(dict)
+ except:
+ pass
+ pass
+
+def main(argv=None):
+ h = HostInfo()
+ struct = { }
+ h.register(struct)
+ print struct
+
+if __name__ == '__main__':
+ main()
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/LiveStatus.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/LiveStatus.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/LiveStatus.py Mon Feb 4 02:23:55 2013
@@ -91,10 +91,11 @@ class LiveStatus:
LIVE_STATUS = "STARTED"
DEAD_STATUS = "INSTALLED"
- def __init__(self, cluster, service, component):
+ def __init__(self, cluster, service, component, globalConfig):
self.cluster = cluster
self.service = service
self.component = component
+ self.globalConfig = globalConfig
def belongsToService(self, component):
@@ -104,9 +105,7 @@ class LiveStatus:
# Live status was stripped from heartbeat after revision e1718dd
def build(self):
global SERVICES, COMPONENTS, LIVE_STATUS, DEAD_STATUS
- pidLookupPath = AmbariConfig.config.get('services','pidLookupPath')
- serviceToPidMapFile = AmbariConfig.config.get('services','serviceToPidMapFile')
- statusCheck = StatusCheck(pidLookupPath, serviceToPidMapFile)
+ statusCheck = StatusCheck(AmbariConfig.servicesToPidNames, AmbariConfig.pidPathesVars, self.globalConfig)
livestatus = None
for component in self.COMPONENTS:
if component["serviceName"] == self.service and component["componentName"] == self.component:
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/NetUtil.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/NetUtil.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/NetUtil.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/NetUtil.py Mon Feb 4 02:23:55 2013
@@ -29,6 +29,7 @@ class NetUtil:
CONNECT_SERVER_RETRY_INTERVAL_SEC = 10
HEARTBEAT_IDDLE_INTERVAL_SEC = 10
HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 5
+ HEARTBEAT_STATE_INTERVAL = 6 # default one per minute
# Url within server to request during status check. This url
# should return HTTP code 200
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/ProcessHelper.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/ProcessHelper.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/ProcessHelper.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/ProcessHelper.py Mon Feb 4 02:23:55 2013
@@ -21,6 +21,7 @@ limitations under the License.
import os
import logging
import traceback
+import sys
from shell import getTempFiles
logger = logging.getLogger()
@@ -32,20 +33,42 @@ else:
pidfile = "/var/run/ambari-agent/ambari-agent.pid"
-def stopAgent():
+def _clean():
+
+ logger.info("Removing pid file")
try:
os.unlink(pidfile)
- except Exception:
- logger.warn("Unable to remove: "+pidfile)
+ except Exception as ex:
traceback.print_exc()
+ logger.warn("Unable to remove pid file: %s", ex)
- tempFiles = getTempFiles()
- for tempFile in tempFiles:
- if os.path.exists(tempFile):
+ logger.info("Removing temp files")
+ for f in getTempFiles():
+ if os.path.exists(f):
try:
- os.unlink(tempFile)
- except Exception:
- traceback.print_exc()
- logger.warn("Unable to remove: "+tempFile)
+ os.unlink(f)
+ except Exception as ex:
+ traceback.print_exc()
+ logger.warn("Unable to remove: %s, %s", f, ex)
+
+
+def stopAgent():
+
+ _clean()
os._exit(0)
- pass
\ No newline at end of file
+ pass
+
+
+def restartAgent():
+
+ _clean()
+
+ executable = sys.executable
+ args = sys.argv[:]
+ args.insert(0, executable)
+
+ logger.info("Restarting self: %s %s", executable, args)
+
+ os.execvp(executable, args)
+
+
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Register.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Register.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Register.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/Register.py Mon Feb 4 02:23:55 2013
@@ -23,10 +23,11 @@ import json
from Hardware import Hardware
from ActionQueue import ActionQueue
from ServerStatus import ServerStatus
-import socket
+import hostname
import time
import urllib2
import subprocess
+from HostInfo import HostInfo
firstContact = True
@@ -36,23 +37,20 @@ class Register:
def __init__(self):
self.hardware = Hardware()
- def pfqdn(self):
- try:
- handle = urllib2.urlopen('http://169.254.169.254/latest/meta-data/public-hostname', '', 3)
- str = handle.read()
- handle.close()
- return str
- except Exception, e:
- return socket.getfqdn()
-
def build(self, id='-1'):
global clusterId, clusterDefinitionRevision, firstContact
timestamp = int(time.time()*1000)
+
+ hostInfo = HostInfo()
+ agentEnv = { }
+ hostInfo.register(agentEnv)
+
register = { 'responseId' : int(id),
'timestamp' : timestamp,
- 'hostname' : socket.getfqdn(),
- 'publicHostname' : self.pfqdn(),
+ 'hostname' : hostname.hostname(),
+ 'publicHostname' : hostname.public_hostname(),
'hardwareProfile' : self.hardware.get(),
+ 'agentEnv' : agentEnv
}
return register
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/RepoInstaller.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/RepoInstaller.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/RepoInstaller.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/RepoInstaller.py Mon Feb 4 02:23:55 2013
@@ -25,6 +25,7 @@ from shell import shellRunner
from manifestGenerator import writeImports
from pprint import pprint, pformat
import ast
+import AmbariConfig
import urlparse, urllib
import re
@@ -60,10 +61,8 @@ class RepoInstaller:
for repo in self.repoInfoList:
repoFile = open(self.path + os.sep + repo['repoId'] + '-' +
str(self.taskId) + PUPPET_EXT, 'w+')
- importsfile = "imports.txt"
- if self.config.has_option('puppet','imports_file'):
- importsfile = self.config.get('puppet', 'imports_file')
- writeImports(repoFile, self.modulesdir, importsfile)
+
+ writeImports(repoFile, self.modulesdir, AmbariConfig.imports)
baseUrl = ''
mirrorList = ''
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/StatusCheck.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/StatusCheck.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/StatusCheck.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/StatusCheck.py Mon Feb 4 02:23:55 2013
@@ -28,15 +28,9 @@ import re
logger = logging.getLogger()
-
-
-
class StatusCheck:
-
- def get_pair(self, line):
- key, sep, value = line.strip().partition("=")
- return key, value
-
+
+
def listFiles(self, dir):
basedir = dir
logger.debug("Files in " + os.path.abspath(dir) + ": ")
@@ -45,9 +39,9 @@ class StatusCheck:
if os.path.isdir(dir):
for item in os.listdir(dir):
if os.path.isfile(item) and item.endswith('.pid'):
- self.pidFilesDict[item.split(os.sep).pop()] = item
+ self.pidFilesDict[item.split(os.sep).pop()] = os.getcwd() + os.sep + item
else:
- subdirlist.append(os.path.join(basedir, item))
+ subdirlist.append(os.path.join(basedir, item))
for subdir in subdirlist:
self.listFiles(subdir)
else:
@@ -55,24 +49,35 @@ class StatusCheck:
self.pidFilesDict[dir.split(os.sep).pop()] = dir
except OSError as e:
logger.info(e.strerror + ' to ' + e.filename)
-
- def __init__(self, path, mappingFilePath):
- if not os.path.isdir(path):
- raise ValueError("Path argument must be valid directory")
-
- if not os.path.exists(mappingFilePath):
- raise IOError("File with services to pid mapping doesn't exist")
- self.path = path
- self.mappingFilePath = mappingFilePath
+
+ def fillDirValues(self):
+ try:
+ for pidVar in self.pidPathesVars:
+ pidVarName = pidVar['var']
+ pidDefaultvalue = pidVar['defaultValue']
+ if self.globalConfig.has_key(pidVarName):
+ self.pidPathes.append(self.globalConfig[pidVarName])
+ else:
+ self.pidPathes.append(pidDefaultvalue)
+ except Exception as e:
+ logger.error("Error while filling directories values " + str(e))
+
+ def __init__(self, serviceToPidDict, pidPathesVars, globalConfig):
+
+ self.serToPidDict = serviceToPidDict
+ self.pidPathesVars = pidPathesVars
+ self.pidPathes = []
self.sh = shellRunner()
self.pidFilesDict = {}
- self.listFiles(self.path)
-
-
- with open(self.mappingFilePath) as fd:
- self.serToPidDict = dict(self.get_pair(line) for line in fd)
+ self.globalConfig = globalConfig
+
+ self.fillDirValues()
+
+ for pidPath in self.pidPathes:
+ self.listFiles(pidPath)
def getIsLive(self, pidPath):
+
if not pidPath:
return False
@@ -97,7 +102,7 @@ class StatusCheck:
try:
pidPath = None
pidPattern = self.serToPidDict[serviceCode]
- logger.info( 'pidPattern: ' + pidPattern)
+ logger.info('pidPattern: ' + pidPattern)
except KeyError as e:
logger.warn('There is no mapping for ' + serviceCode)
return None
Added: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/hostname.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/hostname.py?rev=1442010&view=auto
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/hostname.py (added)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/hostname.py Mon Feb 4 02:23:55 2013
@@ -0,0 +1,57 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import socket
+import subprocess
+import AmbariConfig
+import urllib2
+
+def hostname():
+ config = AmbariConfig.config
+ try:
+ scriptname = config.get('agent', 'hostname_script')
+ try:
+ osStat = subprocess.Popen([scriptname], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ if (0 == osStat.returncode and 0 != len(out.strip())):
+ return out.strip()
+ else:
+ return socket.getfqdn()
+ except:
+ return socket.getfqdn()
+ except:
+ return socket.getfqdn()
+
+def public_hostname():
+ # future - do an agent entry for this too
+ try:
+ handle = urllib2.urlopen('http://169.254.169.254/latest/meta-data/public-hostname', '', 2)
+ str = handle.read()
+ handle.close()
+ return str
+ except Exception, e:
+ return socket.getfqdn()
+
+def main(argv=None):
+ print hostname()
+ print public_hostname()
+
+if __name__ == '__main__':
+ main()
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/main.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/main.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/main.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/main.py Mon Feb 4 02:23:55 2013
@@ -111,10 +111,28 @@ def main():
os.kill(pid, signal.SIGKILL)
os._exit(1)
+ # Check for ambari configuration file.
+ try:
+ config = AmbariConfig.config
+ if os.path.exists('/etc/ambari-agent/conf/ambari-agent.ini'):
+ config.read('/etc/ambari-agent/conf/ambari-agent.ini')
+ AmbariConfig.setConfig(config)
+ else:
+ raise Exception("No config found, use default")
+ except Exception, err:
+ logger.warn(err)
+
# Check if there is another instance running
if os.path.isfile(ProcessHelper.pidfile):
print("%s already exists, exiting" % ProcessHelper.pidfile)
sys.exit(1)
+ # check if ambari prefix exists
+ elif not os.path.isdir(config.get("agent", "prefix")):
+ msg = "Ambari prefix dir %s does not exists, can't continue" \
+ % config.get("agent", "prefix")
+ logger.error(msg)
+ print(msg)
+ sys.exit(1)
else:
# Daemonize current instance of Ambari Agent
#retCode = createDaemon()
@@ -123,17 +141,6 @@ def main():
credential = None
- # Check for ambari configuration file.
- try:
- config = AmbariConfig.config
- if os.path.exists('/etc/ambari-agent/conf/ambari-agent.ini'):
- config.read('/etc/ambari-agent/conf/ambari-agent.ini')
- AmbariConfig.setConfig(config)
- else:
- raise Exception("No config found, use default")
- except Exception, err:
- logger.warn(err)
-
killstaleprocesses()
server_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'url_port')
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/manifestGenerator.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/manifestGenerator.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/manifestGenerator.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/manifestGenerator.py Mon Feb 4 02:23:55 2013
@@ -24,7 +24,7 @@ import logging
from uuid import getnode as get_mac
from shell import shellRunner
from datetime import datetime
-from AmbariConfig import AmbariConfig
+import AmbariConfig
logger = logging.getLogger()
@@ -37,16 +37,14 @@ non_global_configuration_types = ["hdfs-
"webhcat-site", "hdfs-exclude-file"]
#read static imports from file and write them to manifest
-def writeImports(outputFile, modulesdir, inputFileName='imports.txt'):
- inputFile = open(inputFileName, 'r')
+def writeImports(outputFile, modulesdir, importsList):
logger.info("Modules dir is " + modulesdir)
outputFile.write('#' + datetime.now().strftime('%d.%m.%Y %H:%M:%S') + os.linesep)
- for line in inputFile:
+ for line in importsList:
modulename = line.rstrip()
line = "import '" + modulesdir + os.sep + modulename + "'" + os.linesep
outputFile.write(line)
-
- inputFile.close()
+
def generateManifest(parsedJson, fileName, modulesdir, ambariconfig):
logger.info("JSON Received:")
@@ -77,13 +75,9 @@ def generateManifest(parsedJson, fileNam
#writing manifest
manifest = open(fileName, 'w')
#Check for Ambari Config and make sure you pick the right imports file
- importsfile = "imports.txt"
- if ambariconfig.has_option('puppet', 'imports_file') :
- importsfile = ambariconfig.get('puppet', 'imports_file')
- logger.info("Using imports file " + importsfile)
#writing imports from external static file
- writeImports(outputFile=manifest, modulesdir=modulesdir, inputFileName=importsfile)
+ writeImports(outputFile=manifest, modulesdir=modulesdir, importsList=AmbariConfig.imports)
#writing nodes
writeNodes(manifest, clusterHostInfo)
@@ -116,17 +110,6 @@ def generateManifest(parsedJson, fileNam
manifest.close()
-
- #read dictionary
-def readDict(file, separator='='):
- result = dict()
-
- for line in file :
- dictTuple = line.partition(separator)
- result[dictTuple[0].strip()] = dictTuple[2].strip()
-
- return result
-
#write nodes
def writeNodes(outputFile, clusterHostInfo):
@@ -209,21 +192,9 @@ def writeNonGlobalConfigurations(outputF
def writeTasks(outputFile, roles, ambariconfig, clusterHostInfo=None,
hostname="localhost"):
#reading dictionaries
- rolestoclass = "rolesToClass.dict"
- if ambariconfig.has_option('puppet','roles_to_class'):
- rolestoclass = ambariconfig.get('puppet', 'roles_to_class')
-
- rolesToClassFile = open(rolestoclass, 'r')
- rolesToClass = readDict(rolesToClassFile)
- rolesToClassFile.close()
-
- servicestates = "serviceStates.dict"
- if ambariconfig.has_option('puppet','service_states'):
- servicestates = ambariconfig.get('puppet', 'service_states')
-
- serviceStatesFile = open(servicestates, 'r')
- serviceStates = readDict(serviceStatesFile)
- serviceStatesFile.close()
+ rolesToClass = AmbariConfig.rolesToClass
+
+ serviceStates = AmbariConfig.serviceStates
outputFile.write('node /default/ {\n ')
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/puppetExecutor.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/puppetExecutor.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/puppetExecutor.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/puppetExecutor.py Mon Feb 4 02:23:55 2013
@@ -26,6 +26,7 @@ from RepoInstaller import RepoInstaller
import pprint, threading
from Grep import Grep
from threading import Thread
+import shell
import traceback
logger = logging.getLogger()
@@ -212,10 +213,12 @@ class puppetExecutor:
self.event.wait(self.PUPPET_TIMEOUT_SECONDS)
if puppet.returncode is None:
logger.error("Task timed out and will be killed")
- puppet.terminate()
+ self.runShellKillPgrp(puppet)
self.last_puppet_has_been_killed = True
pass
+ def runShellKillPgrp(self, puppet):
+ shell.killprocessgrp(puppet.pid)
def main():
logging.basicConfig(level=logging.DEBUG)
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/security.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/security.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/security.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/main/python/ambari_agent/security.py Mon Feb 4 02:23:55 2013
@@ -20,6 +20,7 @@ import httplib
import urllib2
from urllib2 import Request
import socket
+import hostname
import ssl
import os
import logging
@@ -52,9 +53,9 @@ class VerifiedHTTPSConnection(httplib.HT
self.sock = sock
self._tunnel()
agent_key = AmbariConfig.config.get('security', 'keysdir') + os.sep + \
- socket.gethostname() + ".key"
+ hostname.hostname() + ".key"
agent_crt = AmbariConfig.config.get('security', 'keysdir') + os.sep \
- + socket.gethostname() + ".crt"
+ + hostname.hostname() + ".crt"
server_crt = AmbariConfig.config.get('security', 'keysdir') + os.sep \
+ "ca.crt"
@@ -112,13 +113,13 @@ class CertificateManager():
def getAgentKeyName(self):
keysdir = self.config.get('security', 'keysdir')
- return keysdir + os.sep + socket.gethostname() + ".key"
+ return keysdir + os.sep + hostname.hostname() + ".key"
def getAgentCrtName(self):
keysdir = self.config.get('security', 'keysdir')
- return keysdir + os.sep + socket.gethostname() + ".crt"
+ return keysdir + os.sep + hostname.hostname() + ".crt"
def getAgentCrtReqName(self):
keysdir = self.config.get('security', 'keysdir')
- return keysdir + os.sep + socket.gethostname() + ".csr"
+ return keysdir + os.sep + hostname.hostname() + ".csr"
def getSrvrCrtName(self):
keysdir = self.config.get('security', 'keysdir')
return keysdir + os.sep + "ca.crt"
@@ -161,7 +162,7 @@ class CertificateManager():
srvr_crt_f.write(response)
def reqSignCrt(self):
- sign_crt_req_url = self.server_url + '/certs/' + socket.gethostname()
+ sign_crt_req_url = self.server_url + '/certs/' + hostname.hostname()
agent_crt_req_f = open(self.getAgentCrtReqName())
agent_crt_req_content = agent_crt_req_f.read()
passphrase_env_var = self.config.get('security', 'passphrase_env_var_name')
@@ -184,7 +185,7 @@ class CertificateManager():
logger.error("Certificate signing failed")
def genAgentCrtReq(self):
- generate_script = GEN_AGENT_KEY % {'hostname': socket.gethostname(),
+ generate_script = GEN_AGENT_KEY % {'hostname': hostname.hostname(),
'keysdir' : self.config.get('security', 'keysdir')}
logger.info(generate_script)
p = Popen([generate_script], shell=True, stdout=PIPE)
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestActionQueue.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestActionQueue.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestActionQueue.py Mon Feb 4 02:23:55 2013
@@ -63,7 +63,8 @@ class TestActionQueue(TestCase):
'clusterHostInfo': "clusterHostInfo",
'roleCommand': "roleCommand",
'configurations': "configurations",
- 'commandType': "EXECUTION_COMMAND"
+ 'commandType': "EXECUTION_COMMAND",
+ 'configurations':{'global' : {}}
}
actionQueue.put(command)
@@ -121,4 +122,4 @@ class FakeExecutor():
"exitcode": 0,
"stdout": "returned stdout",
"stderr": "returned stderr"
- }
\ No newline at end of file
+ }
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestController.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestController.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestController.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestController.py Mon Feb 4 02:23:55 2013
@@ -1,4 +1,5 @@
#!/usr/bin/env python2.6
+# -*- coding: utf-8 -*-
'''
Licensed to the Apache Software Foundation (ASF) under one
@@ -18,308 +19,316 @@ See the License for the specific languag
limitations under the License.
'''
-from unittest import TestCase
-from ambari_agent.Register import Register
-from ambari_agent.Controller import Controller
-from ambari_agent.Heartbeat import Heartbeat
-from ambari_agent.ActionQueue import ActionQueue
-from ambari_agent import AmbariConfig
-from ambari_agent.NetUtil import NetUtil
-import socket, ConfigParser, logging
-import os, pprint, json, sys
-from threading import Thread
-import time
-import Queue
-
-
-BAD_URL = 'http://localhost:54222/badurl/'
-
-logger = logging.getLogger()
-
-class TestController(TestCase):
-
- def setUp(self):
- logger.disabled = True
- self.defaulttimeout = -1.0
- if hasattr(socket, 'getdefaulttimeout'):
- # get the default timeout on sockets
- self.defaulttimeout = socket.getdefaulttimeout()
-
-
- def tearDown(self):
- if self.defaulttimeout is not None and self.defaulttimeout > 0 and hasattr(socket, 'setdefaulttimeout'):
- # Set the default timeout on sockets
- socket.setdefaulttimeout(self.defaulttimeout)
- logger.disabled = False
-
-
- def test_reregister_loop(self):
- class ControllerMock(Controller):
- def __init__(self, config, range=0):
- self.repeatRegistration = False
- self.range = range
-
- callCounter = 0
-
- def registerAndHeartbeat(self):
- if self.callCounter < 3:
- self.repeatRegistration = True;
- self.callCounter += 1
- else:
- self.repeatRegistration = False;
-
- config = ConfigParser.RawConfigParser()
- mock = ControllerMock(config)
- mock.run()
- self.assertEquals(mock.callCounter, 3)
- pass
-
-
- def test_nonincremental_ids1(self):
- '''
- test to make sure nothing we act appropriately on getting non incremental reponse ids
- '''
- #timings adjustment
- netutil = NetUtil()
- netutil.HEARTBEAT_IDDLE_INTERVAL_SEC=0.05
- netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC=0.05
- #building fake responces
- responces = Queue.Queue()
- responce1 = {
- 'responseId':8,
- 'executionCommands':[],
- 'statusCommands':[],
- 'restartAgent':'False',
- }
- responce1 = json.dumps(responce1)
-
- responce2 = {
- 'responseId':11,
- 'executionCommands':[],
- 'statusCommands':[],
- 'restartAgent':'False',
- }
- responce2 = json.dumps(responce2)
- responces.put(responce1)
- responces.put(responce2)
- #building heartbeat object
- testsPath = os.path.dirname(os.path.realpath(__file__))
- dictPath = testsPath + os.sep + '..' + os.sep + '..' + os.sep + 'main' + os.sep + 'python' + os.sep + 'ambari_agent' + os.sep + 'servicesToPidNames.dict'
- AmbariConfig.config.set('services','serviceToPidMapFile', dictPath)
- actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
- heartbeat = Heartbeat(actionQueue)
- # testing controller with our heartbeat
- controller = self.ControllerMock_fake_restartAgent(AmbariConfig.config, responces)
- controller.heartbeat = heartbeat
- controller.actionQueue = actionQueue
- controller.logger = logger
- controller.netutil = netutil
- controller.heartbeatWithServer()
- restarts = controller.restartCount
- self.assertEquals(restarts, 1, "Agent should restart on non incremental responce ids")
- pass
-
-
- def test_nonincremental_ids2(self):
- '''
- test to make sure nothing we act appropriately on getting incremental reponse ids
- '''
- #timings adjustment
- netutil = NetUtil()
- netutil.HEARTBEAT_IDDLE_INTERVAL_SEC=0.05
- netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC=0.05
- #building fake responces
- responces = Queue.Queue()
- responce1 = {
- 'responseId':8,
- 'executionCommands':[],
- 'statusCommands':[],
- 'restartAgent':'False',
- }
- responce1 = json.dumps(responce1)
-
- responce2 = {
- 'responseId':9,
- 'executionCommands':[],
- 'statusCommands':[],
- 'restartAgent':'False',
- }
- responce2 = json.dumps(responce2)
- responces.put(responce1)
- responces.put(responce2)
- #building heartbeat object
- testsPath = os.path.dirname(os.path.realpath(__file__))
- dictPath = testsPath + os.sep + '..' + os.sep + '..' + os.sep + 'main' + os.sep + 'python' + os.sep + 'ambari_agent' + os.sep + 'servicesToPidNames.dict'
- AmbariConfig.config.set('services','serviceToPidMapFile', dictPath)
- actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
- heartbeat = Heartbeat(actionQueue)
- # testing controller with our heartbeat
- controller = self.ControllerMock_fake_restartAgent(AmbariConfig.config, responces)
- controller.heartbeat = heartbeat
- controller.actionQueue = actionQueue
- controller.logger = logger
- controller.netutil = netutil
- controller.heartbeatWithServer()
- restarts = controller.restartCount
- self.assertEquals(restarts, 0, "Agent should not restart on incremental responce ids")
- pass
-
-
- def test_reregister(self):
- '''
- test to make sure if we can get a re register command, we register with the server
- '''
- #timings adjustment
- netutil = NetUtil()
- netutil.HEARTBEAT_IDDLE_INTERVAL_SEC=0.05
- netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC=0.05
- #building fake responces
- responces = Queue.Queue()
- responce1 = {
- 'responseId':8,
- 'executionCommands':[],
- 'statusCommands':[],
- 'restartAgent':'true',
- }
- responce1 = json.dumps(responce1)
- responces.put(responce1)
- #building heartbeat object
- testsPath = os.path.dirname(os.path.realpath(__file__))
- dictPath = testsPath + os.sep + '..' + os.sep + '..' + os.sep + 'main' + os.sep + 'python' + os.sep + 'ambari_agent' + os.sep + 'servicesToPidNames.dict'
- AmbariConfig.config.set('services','serviceToPidMapFile', dictPath)
- actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
- heartbeat = Heartbeat(actionQueue)
- # testing controller with our heartbeat
- controller = self.ControllerMock_fake_restartAgent(AmbariConfig.config, responces)
- controller.heartbeat = heartbeat
- controller.actionQueue = actionQueue
- controller.logger = logger
- controller.netutil = netutil
- controller.heartbeatWithServer()
- restarts = controller.restartCount
- self.assertEquals(restarts, 1, "Agent should restart if we get a re register command")
-
-
- def test_heartbeat_retries(self):
- netutil = NetUtil()
- netutil.HEARTBEAT_IDDLE_INTERVAL_SEC=0.05
- netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC=0.05
- #building heartbeat object
- testsPath = os.path.dirname(os.path.realpath(__file__))
- dictPath = testsPath + os.sep + '..' + os.sep + '..' + os.sep + 'main' + os.sep + 'python' + os.sep + 'ambari_agent' + os.sep + 'servicesToPidNames.dict'
- AmbariConfig.config.set('services','serviceToPidMapFile', dictPath)
- actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
- heartbeat = Heartbeat(actionQueue)
- # testing controller with our heartbeat and wrong url
- controller = self.ControllerMock_failure_sendRequest(AmbariConfig.config)
- controller.heartbeat = heartbeat
- controller.actionQueue = actionQueue
- controller.logger = logger
- controller.netutil = netutil
- thread = Thread(target = controller.heartbeatWithServer)
- thread.start()
- time.sleep(0.5)
-
- # I have to stop the thread anyway, so I'll check results later
- threadWasAlive = thread.isAlive()
- successfull_heartbits0 = controller.DEBUG_SUCCESSFULL_HEARTBEATS
- heartbeat_retries0 = controller.DEBUG_HEARTBEAT_RETRIES
- # Stopping thread
- controller.DEBUG_STOP_HEARTBITTING = True
- time.sleep(0.3)
- # Checking results before thread stop
- self.assertEquals(threadWasAlive, True, "Heartbeat should be alive now")
- self.assertEquals(successfull_heartbits0, 0, "Heartbeat should not have any success")
- self.assertEquals(heartbeat_retries0 > 1, True, "Heartbeat should retry connecting")
- # Checking results after thread stop
- self.assertEquals(thread.isAlive(), False, "Heartbeat should stop now")
- self.assertEquals(controller.DEBUG_SUCCESSFULL_HEARTBEATS, 0, "Heartbeat should not have any success")
-
-
- def test_status_command_on_registration(self):
- '''
- test to make sure if we get a status check command from the server, we are able to evaluate and register at the server
- '''
- #timings adjustment
- netutil = NetUtil()
- netutil.HEARTBEAT_IDDLE_INTERVAL_SEC=0.05
- netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC=0.05
- #building fake registration responce
- responces = Queue.Queue()
- responce1 = {
- 'response':'OK',
- 'responseId':8,
- 'statusCommands':[{
- 'clusterName' : "c1",
- 'commandType' : "STATUS_COMMAND",
- 'componentName' : "NAMENODE",
- 'serviceName' : "HDFS",
- }],
- }
- responce1 = json.dumps(responce1)
- responces.put(responce1)
- #building heartbeat object
- testsPath = os.path.dirname(os.path.realpath(__file__))
- dictPath = testsPath + os.sep + '..' + os.sep + '..' + os.sep + 'main' + os.sep + 'python' + os.sep + 'ambari_agent' + os.sep + 'servicesToPidNames.dict'
- AmbariConfig.config.set('services','serviceToPidMapFile', dictPath)
- actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
- heartbeat = Heartbeat(actionQueue)
- # testing controller with our heartbeat
- controller = self.ControllerMock_fake_restartAgent(AmbariConfig.config, responces)
- controller.heartbeat = heartbeat
- controller.actionQueue = actionQueue
- controller.logger = logger
- controller.netutil = netutil
- controller.registerWithServer()
- # If test does not hang, registration is successful
- # So, checking queue
- queue = controller.actionQueue.getCommandQueue()
- self.assertEquals(queue.qsize(), 1, "Status command should be queued once")
- # Checking parsed status command
- command = queue.get()
- self.assertEquals(command['clusterName'], 'c1')
- self.assertEquals(command['commandType'], 'STATUS_COMMAND')
- self.assertEquals(command['componentName'], 'NAMENODE')
- self.assertEquals(command['serviceName'], 'HDFS')
-
-
- class ControllerMock_fake_restartAgent(Controller):
- def __init__(self, config, responces, range=3):
- self.repeatRegistration = False
- self.responces = responces
- self.heartbeatUrl = "fakeurl"
- self.registerUrl = "fakeregisterurl"
- self.responseId = 7
- self.register = Register()
- self.range = range
-
- def restartAgent(self):
- self.restartCount += 1
- pass
-
- restartCount = 0
-
- def sendRequest(self, url, data):
- responce = self.responces.get(block=False)
- if self.responces.empty():
- self.DEBUG_STOP_HEARTBITTING = True # Because we have nothing to reply next time
- return responce
-
-
- class ControllerMock_failure_sendRequest(Controller):
- def __init__(self, config, range=0):
- self.repeatRegistration = False
- self.heartbeatUrl = "fakeurl"
- self.registerUrl = "fakeregisterurl"
- self.responseId = 7
- self.register = Register()
- self.range = range
-
- def restartAgent(self):
- self.restartCount += 1
- pass
+import StringIO
+import unittest
+from ambari_agent import Controller
+import sys
+from mock.mock import patch, MagicMock, call
+
+
+class TestController(unittest.TestCase):
+
+ @patch("threading.Thread")
+ @patch("threading.Lock")
+ @patch("socket.gethostname")
+ @patch.object(Controller, "NetUtil")
+ def setUp(self, NetUtil_mock, hostnameMock, lockMock, threadMock):
+
+ Controller.logger = MagicMock()
+ hostnameMock.return_value = "test_hostname"
+ lockMock.return_value = MagicMock()
+ NetUtil_mock.return_value = MagicMock()
+
+ config = MagicMock()
+ config.get.return_value = "something"
+
+ self.controller = Controller.Controller(config)
+
+
+ @patch.object(Controller, "Heartbeat")
+ @patch.object(Controller, "Register")
+ @patch.object(Controller, "ActionQueue")
+ def test_start(self, ActionQueue_mock, Register_mock, Heartbeat_mock):
+
+ aq = MagicMock()
+ ActionQueue_mock.return_value = aq
+
+ self.controller.start()
+ self.assertTrue(ActionQueue_mock.called)
+ self.assertTrue(aq.start.called)
+ self.assertTrue(Register_mock.called)
+ self.assertTrue(Heartbeat_mock.called)
+
+ @patch("json.dumps")
+ @patch("json.loads")
+ @patch("time.sleep")
+ @patch("pprint.pformat")
+ @patch.object(Controller, "randint")
+ def test_registerWithServer(self, randintMock, pformatMock, sleepMock,
+ loadsMock, dumpsMock):
+
+ out = StringIO.StringIO()
+ sys.stdout = out
+
+ register = MagicMock()
+ self.controller.register = register
+
+ sendRequest = MagicMock()
+ self.controller.sendRequest = sendRequest
+
+ dumpsMock.return_value = "request"
+ response = {"responseId":1,}
+ loadsMock.return_value = response
+
+ self.assertEqual(response, self.controller.registerWithServer())
+
+ response["statusCommands"] = "commands"
+ self.controller.addToQueue = MagicMock(name="addToQueue")
+
+ self.assertEqual(response, self.controller.registerWithServer())
+ self.controller.addToQueue.assert_called_with("commands")
+
+ calls = []
+
+ def side_effect(*args):
+ if len(calls) == 0:
+ calls.append(1)
+ raise Exception("test")
+ return "request"
+
+ del response["statusCommands"]
+
+ dumpsMock.side_effect = side_effect
+ self.assertEqual(response, self.controller.registerWithServer())
+ self.assertTrue(randintMock.called)
+ self.assertTrue(sleepMock.called)
+
+ sys.stdout = sys.__stdout__
+
+ self.controller.sendRequest = Controller.Controller.sendRequest
+ self.controller.addToQueue = Controller.Controller.addToQueue
+
+
+ @patch("pprint.pformat")
+ def test_addToQueue(self, pformatMock):
+
+ actionQueue = MagicMock()
+ self.controller.actionQueue = actionQueue
+ self.controller.addToQueue(None)
+ self.assertFalse(actionQueue.put.called)
+ self.controller.addToQueue("cmd")
+ self.assertTrue(actionQueue.put.called)
+
+
+ @patch("urllib2.build_opener")
+ @patch("urllib2.install_opener")
+ def test_run(self, installMock, buildMock):
+
+ buildMock.return_value = "opener"
+ registerAndHeartbeat = MagicMock("registerAndHeartbeat")
+ calls = []
+ def side_effect():
+ if len(calls) == 0:
+ self.controller.repeatRegistration = True
+ calls.append(1)
+ registerAndHeartbeat.side_effect = side_effect
+ self.controller.registerAndHeartbeat = registerAndHeartbeat
+
+ # repeat registration
+ self.controller.run()
+
+ self.assertTrue(buildMock.called)
+ installMock.called_once_with("opener")
+ self.assertEqual(2, registerAndHeartbeat.call_count)
+
+ # one call, +1
+ registerAndHeartbeat.side_effect = None
+ self.controller.run()
+ self.assertEqual(3, registerAndHeartbeat.call_count)
+
+
+ def test_heartbeatWithServer(self, installMock, buildMock):
+
+ registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")
+
+ self.controller.registerAndHeartbeat = registerAndHeartbeat
+ self.controller.run()
+ self.assertTrue(installMock.called)
+ self.assertTrue(buildMock.called)
+ self.controller.registerAndHeartbeat.assert_called_once_with()
+
+ calls = []
+ def switchBool():
+ if len(calls) == 0:
+ self.controller.repeatRegistration = True
+ calls.append(1)
+ self.controller.repeatRegistration = False
+
+ registerAndHeartbeat.side_effect = switchBool
+ self.controller.run()
+ self.assertEqual(2, registerAndHeartbeat.call_count)
+
+ self.controller.registerAndHeartbeat = \
+ Controller.Controller.registerAndHeartbeat
+
+
+ @patch("time.sleep")
+ def test_registerAndHeartbeat(self, sleepMock):
+
+ registerWithServer = MagicMock(name="registerWithServer")
+ registerWithServer.return_value = {"response":"resp"}
+ self.controller.registerWithServer = registerWithServer
+ heartbeatWithServer = MagicMock(name="heartbeatWithServer")
+ self.controller.heartbeatWithServer = heartbeatWithServer
+
+ self.controller.registerAndHeartbeat()
+ registerWithServer.assert_called_once_with()
+ heartbeatWithServer.assert_called_once_with()
+
+ self.controller.registerWithServer = \
+ Controller.Controller.registerWithServer
+ self.controller.heartbeatWithServer = \
+ Controller.Controller.registerWithServer
+
+
+ @patch.object(Controller, "ProcessHelper")
+ def test_restartAgent(self, ProcessHelper_mock):
+
+ self.controller.restartAgent()
+ self.assertTrue(ProcessHelper_mock.restartAgent.called)
+
+
+ @patch("urllib2.Request")
+ @patch.object(Controller, "security")
+ def test_sendRequest(self, security_mock, requestMock):
+
+ conMock = MagicMock()
+ conMock.request.return_value = "response"
+ security_mock.CachedHTTPSConnection.return_value = conMock
+ url = "url"
+ data = "data"
+ requestMock.return_value = "request"
+
+ self.controller.cachedconnect = None
+
+ self.assertEqual("response", self.controller.sendRequest(url, data))
+ security_mock.CachedHTTPSConnection.assert_called_once_with(
+ self.controller.config)
+ requestMock.called_once_with(url, data,
+ {'Content-Type': 'application/json'})
+
+
+ @patch("time.sleep")
+ @patch("json.loads")
+ @patch("json.dumps")
+ def test_heartbeatWithServer(self, dumpsMock, loadsMock, sleepMock):
+
+ out = StringIO.StringIO()
+ sys.stdout = out
+
+ hearbeat = MagicMock()
+ self.controller.heartbeat = hearbeat
+
+ dumpsMock.return_value = "data"
+
+ sendRequest = MagicMock(name="sendRequest")
+ self.controller.sendRequest = sendRequest
+
+ self.controller.responseId = 1
+ response = {"responseId":"2", "restartAgent":"false"}
+ loadsMock.return_value = response
+
+ def one_heartbeat(*args, **kwargs):
+ self.controller.DEBUG_STOP_HEARTBITTING = True
+ return "data"
+
+ sendRequest.side_effect = one_heartbeat
+
+ actionQueue = MagicMock()
+ actionQueue.isIdle.return_value = True
+
+ # one successful request, after stop
+ self.controller.actionQueue = actionQueue
+ self.controller.heartbeatWithServer()
+ self.assertTrue(sendRequest.called)
+
+ calls = []
+ def retry(*args, **kwargs):
+ if len(calls) == 0:
+ calls.append(1)
+ response["responseId"] = "3"
+ raise Exception()
+ if len(calls) > 0:
+ self.controller.DEBUG_STOP_HEARTBITTING = True
+ return "data"
+
+ # exception, retry, successful and stop
+ sendRequest.side_effect = retry
+ self.controller.DEBUG_STOP_HEARTBITTING = False
+ self.controller.heartbeatWithServer()
+
+ self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS)
+
+ # retry registration
+ response["registrationCommand"] = "true"
+ sendRequest.side_effect = one_heartbeat
+ self.controller.DEBUG_STOP_HEARTBITTING = False
+ self.controller.heartbeatWithServer()
+
+ self.assertTrue(self.controller.repeatRegistration)
+
+ # wrong responseId => restart
+ response = {"responseId":"2", "restartAgent":"false"}
+ loadsMock.return_value = response
+
+ restartAgent = MagicMock(name="restartAgent")
+ self.controller.restartAgent = restartAgent
+ self.controller.DEBUG_STOP_HEARTBITTING = False
+ self.controller.heartbeatWithServer()
+
+ restartAgent.assert_called_once_with()
+
+ # executionCommands, statusCommands
+ self.controller.responseId = 1
+ addToQueue = MagicMock(name="addToQueue")
+ self.controller.addToQueue = addToQueue
+ response["executionCommands"] = "executionCommands"
+ response["statusCommands"] = "statusCommands"
+ self.controller.DEBUG_STOP_HEARTBITTING = False
+ self.controller.heartbeatWithServer()
+
+ addToQueue.assert_has_calls([call("executionCommands"),
+ call("statusCommands")])
+
+ # restartAgent command
+ self.controller.responseId = 1
+ self.controller.DEBUG_STOP_HEARTBITTING = False
+ response["restartAgent"] = "true"
+ restartAgent = MagicMock(name="restartAgent")
+ self.controller.restartAgent = restartAgent
+ self.controller.heartbeatWithServer()
+
+ restartAgent.assert_called_once_with()
+
+ # actionQueue not idle
+ self.controller.responseId = 1
+ self.controller.DEBUG_STOP_HEARTBITTING = False
+ actionQueue.isIdle.return_value = False
+ response["restartAgent"] = "false"
+ self.controller.heartbeatWithServer()
+
+ sleepMock.assert_called_with(
+ self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC)
+
+ sys.stdout = sys.__stdout__
+ self.controller.sendRequest = Controller.Controller.sendRequest
+ self.controller.sendRequest = Controller.Controller.addToQueue
+
+
+if __name__ == "__main__":
+
+ unittest.main(verbosity=2)
+
+
- restartCount = 0
- def sendRequest(self, url, data):
- raise Exception("Fake exception")
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestGrep.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestGrep.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestGrep.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestGrep.py Mon Feb 4 02:23:55 2013
@@ -80,8 +80,8 @@ debug: Finishing transaction 70171639726
fragment = self.grep.tail(self.string_good, 3)
desired = """
debug: Finishing transaction 70060456663980
-debug: Received report to process from ambari-dmi.cybervisiontech.com.ua
-debug: Processing report from ambari-dmi.cybervisiontech.com.ua with processor Puppet::Reports::Store
+debug: Received report to process from ambari-dmi
+debug: Processing report from ambari-dmi with processor Puppet::Reports::Store
""".strip()
self.assertEquals(fragment, desired, "Grep tail function should return only last 3 lines of file")
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestHeartbeat.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestHeartbeat.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestHeartbeat.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestHeartbeat.py Mon Feb 4 02:23:55 2013
@@ -29,12 +29,7 @@ import time
class TestHeartbeat(TestCase):
- def setUp(self):
- testsPath = os.path.dirname(os.path.realpath(__file__))
- self.dictPath = testsPath + os.sep + '..' + os.sep + '..' + os.sep + 'main' + os.sep + 'python' + os.sep + 'ambari_agent' + os.sep + 'servicesToPidNames.dict'
-
def test_build(self):
- AmbariConfig.config.set('services','serviceToPidMapFile', self.dictPath)
actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
heartbeat = Heartbeat(actionQueue)
result = heartbeat.build(100)
@@ -47,19 +42,20 @@ class TestHeartbeat(TestCase):
self.assertEquals(len(result['nodeStatus']), 2)
self.assertEquals(result['nodeStatus']['cause'], "NONE")
self.assertEquals(result['nodeStatus']['status'], "HEALTHY")
- self.assertEquals(len(result), 6)
+ # result may or may NOT have an agentEnv structure in it
+ self.assertEquals((len(result) is 6) or (len(result) is 7), True)
self.assertEquals(not heartbeat.reports, True, "Heartbeat should not contain task in progress")
def test_heartbeat_with_status(self):
- AmbariConfig.config.set('services','serviceToPidMapFile', self.dictPath)
actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
heartbeat = Heartbeat(actionQueue)
statusCommand = {
"serviceName" : 'HDFS',
"commandType" : "STATUS_COMMAND",
"clusterName" : "",
- "componentName" : "DATANODE"
+ "componentName" : "DATANODE",
+ 'configurations':{'global' : {}}
}
actionQueue.put(statusCommand)
actionQueue.start()
@@ -70,7 +66,6 @@ class TestHeartbeat(TestCase):
self.assertEquals(len(result['componentStatus']) > 0, True, 'Heartbeat should contain status of HDFS components')
def test_heartbeat_with_status_multiple(self):
- AmbariConfig.config.set('services','serviceToPidMapFile', self.dictPath)
actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
actionQueue.IDLE_SLEEP_TIME = 0.01
heartbeat = Heartbeat(actionQueue)
@@ -81,7 +76,8 @@ class TestHeartbeat(TestCase):
"serviceName" : 'HDFS',
"commandType" : "STATUS_COMMAND",
"clusterName" : "",
- "componentName" : "DATANODE"
+ "componentName" : "DATANODE",
+ 'configurations':{'global' : {}}
}
actionQueue.put(statusCommand)
time.sleep(0.1)
@@ -97,7 +93,6 @@ class TestHeartbeat(TestCase):
self.assertEquals(max_number_of_status_entries == NUMBER_OF_COMPONENTS, True)
def test_heartbeat_with_task_in_progress(self):
- AmbariConfig.config.set('services','serviceToPidMapFile', self.dictPath)
actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
actionQueue.commandInProgress= {
'role' : "role",
@@ -108,7 +103,8 @@ class TestHeartbeat(TestCase):
'stderr' : 'none',
'exitCode' : 777,
'serviceName' : "serviceName",
- 'status' : 'IN_PROGRESS'
+ 'status' : 'IN_PROGRESS',
+ 'configurations':{'global' : {}}
}
heartbeat = Heartbeat(actionQueue)
result = heartbeat.build(100)
Added: incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestHostname.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestHostname.py?rev=1442010&view=auto
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestHostname.py (added)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestHostname.py Mon Feb 4 02:23:55 2013
@@ -0,0 +1,55 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import ambari_agent.hostname as hostname
+from ambari_agent.AmbariConfig import AmbariConfig
+import socket
+import tempfile
+import shutil
+import os, pprint, json,stat
+
+class TestHostname(TestCase):
+
+ def test_hostname(self):
+ self.assertEquals(hostname.hostname(), socket.gethostname(), "hostname should equal the socket-based hostname")
+ pass
+
+ def test_hostname_override(self):
+ tmpname = tempfile.mkstemp(text=True)[1]
+ os.chmod(tmpname, os.stat(tmpname).st_mode | stat.S_IXUSR)
+
+ tmpfile = file(tmpname, "w+")
+
+ try:
+ tmpfile.write("#!/bin/sh\n\necho 'test.example.com'")
+ tmpfile.close()
+
+ config = AmbariConfig().getConfig()
+ config.set('agent', 'hostname_script', tmpname)
+
+ self.assertEquals(hostname.hostname(), 'test.example.com', "expected hostname 'test.example.com'")
+ finally:
+ os.remove(tmpname)
+
+ pass
+
+
+
Modified: incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestLiveStatus.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestLiveStatus.py?rev=1442010&r1=1442009&r2=1442010&view=diff
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestLiveStatus.py (original)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestLiveStatus.py Mon Feb 4 02:23:55 2013
@@ -26,11 +26,8 @@ import os
class TestLiveStatus(TestCase):
def test_build(self):
- testsPath = os.path.dirname(os.path.realpath(__file__))
- dictPath = testsPath + os.sep + '..' + os.sep + '..' + os.sep + 'main' + os.sep + 'python' + os.sep + 'ambari_agent' + os.sep + 'servicesToPidNames.dict'
- AmbariConfig.config.set('services','serviceToPidMapFile', dictPath)
for component in LiveStatus.COMPONENTS:
- livestatus = LiveStatus('', component['serviceName'], component['componentName'])
+ livestatus = LiveStatus('', component['serviceName'], component['componentName'], {})
result = livestatus.build()
print "LiveStatus of {0}: {1}".format(component['serviceName'], str(result))
self.assertEquals(len(result) > 0, True, 'Livestatus should not be empty')
Added: incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestManifestGenerator.py
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestManifestGenerator.py?rev=1442010&view=auto
==============================================================================
--- incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestManifestGenerator.py (added)
+++ incubator/ambari/branches/branch-1.2/ambari-agent/src/test/python/TestManifestGenerator.py Mon Feb 4 02:23:55 2013
@@ -0,0 +1,75 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import os
+
+from unittest import TestCase
+from ambari_agent import manifestGenerator
+import ambari_agent.AmbariConfig
+import tempfile
+import json
+import shutil
+from ambari_agent.AmbariConfig import AmbariConfig
+from mock.mock import patch, MagicMock, call
+
+
+class TestManifestGenerator(TestCase):
+
+ def setUp(self):
+ self.dir = tempfile.mkdtemp()
+ self.config = AmbariConfig()
+ jsonCommand = file('../../main/python/ambari_agent/test.json').read()
+ self.parsedJson = json.loads(jsonCommand)
+
+ pass
+
+ def tearDown(self):
+ shutil.rmtree(self.dir)
+ pass
+
+
+ def testWriteImports(self):
+ tmpFileName = tempfile.mkstemp(dir=self.dir, text=True)[1]
+ print tmpFileName
+ tmpFile = file(tmpFileName, 'r+')
+
+ manifestGenerator.writeImports(tmpFile, '../../main/puppet/modules', self.config.getImports())
+ tmpFile.seek(0)
+ print tmpFile.read()
+ tmpFile.close()
+
+
+ pass
+
+ @patch.object(manifestGenerator, 'writeImports')
+ @patch.object(manifestGenerator, 'writeNodes')
+ @patch.object(manifestGenerator, 'writeParams')
+ @patch.object(manifestGenerator, 'writeTasks')
+ def testGenerateManifest(self, writeTasksMock, writeParamsMock, writeNodesMock, writeImportsMock):
+ tmpFileName = tempfile.mkstemp(dir=self.dir, text=True)[1]
+ manifestGenerator.generateManifest(self.parsedJson, tmpFileName, '../../main/puppet/modules', self.config.getConfig())
+
+ self.assertTrue(writeParamsMock.called)
+ self.assertTrue(writeNodesMock.called)
+ self.assertTrue(writeImportsMock.called)
+ self.assertTrue(writeTasksMock.called)
+
+ print file(tmpFileName).read()
+
+ pass
\ No newline at end of file