You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by yu...@apache.org on 2014/11/14 03:20:10 UTC
[28/29] ambari git commit: AMBARI-8269. Merge branch-windows-dev
changes to trunk. (Jayush Luniya via yusaku)
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 6c79b6b..4bae50b 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -18,204 +18,31 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
+import platform
+
import ConfigParser
import StringIO
import json
from NetUtil import NetUtil
-content = """
-
-[server]
-hostname=localhost
-url_port=8440
-secured_url_port=8441
-
-[agent]
-prefix=/tmp/ambari-agent
-tmp_dir=/tmp/ambari-agent/tmp
-data_cleanup_interval=86400
-data_cleanup_max_age=2592000
-data_cleanup_max_size_MB = 100
-ping_port=8670
-cache_dir=/var/lib/ambari-agent/cache
-
-[services]
-
-[python]
-custom_actions_dir = /var/lib/ambari-agent/resources/custom_actions
-
-[command]
-maxretries=2
-sleepBetweenRetries=1
-
-[security]
-keysdir=/tmp/ambari-agent
-server_crt=ca.crt
-passphrase_env_var_name=AMBARI_PASSPHRASE
-
-[heartbeat]
-state_interval = 6
-dirs=/etc/hadoop,/etc/hadoop/conf,/var/run/hadoop,/var/log/hadoop
-log_lines_count=300
-
-"""
-
-imports = [
- "hdp/manifests/*.pp",
- "hdp-hadoop/manifests/*.pp",
- "hdp-hbase/manifests/*.pp",
- "hdp-zookeeper/manifests/*.pp",
- "hdp-oozie/manifests/*.pp",
- "hdp-pig/manifests/*.pp",
- "hdp-sqoop/manifests/*.pp",
- "hdp-templeton/manifests/*.pp",
- "hdp-hive/manifests/*.pp",
- "hdp-hcat/manifests/*.pp",
- "hdp-mysql/manifests/*.pp",
- "hdp-monitor-webserver/manifests/*.pp",
- "hdp-repos/manifests/*.pp"
-]
-
-rolesToClass = {
- 'GLUSTERFS': 'hdp-hadoop::glusterfs',
- 'GLUSTERFS_CLIENT': 'hdp-hadoop::glusterfs_client',
- 'GLUSTERFS_SERVICE_CHECK': 'hdp-hadoop::glusterfs_service_check',
- 'NAMENODE': 'hdp-hadoop::namenode',
- 'DATANODE': 'hdp-hadoop::datanode',
- 'SECONDARY_NAMENODE': 'hdp-hadoop::snamenode',
- 'JOBTRACKER': 'hdp-hadoop::jobtracker',
- 'TASKTRACKER': 'hdp-hadoop::tasktracker',
- 'RESOURCEMANAGER': 'hdp-yarn::resourcemanager',
- 'NODEMANAGER': 'hdp-yarn::nodemanager',
- 'HISTORYSERVER': 'hdp-yarn::historyserver',
- 'YARN_CLIENT': 'hdp-yarn::yarn_client',
- 'HDFS_CLIENT': 'hdp-hadoop::client',
- 'MAPREDUCE_CLIENT': 'hdp-hadoop::client',
- 'MAPREDUCE2_CLIENT': 'hdp-yarn::mapreducev2_client',
- 'ZOOKEEPER_SERVER': 'hdp-zookeeper',
- 'ZOOKEEPER_CLIENT': 'hdp-zookeeper::client',
- 'HBASE_MASTER': 'hdp-hbase::master',
- 'HBASE_REGIONSERVER': 'hdp-hbase::regionserver',
- 'HBASE_CLIENT': 'hdp-hbase::client',
- 'PIG': 'hdp-pig',
- 'SQOOP': 'hdp-sqoop',
- 'OOZIE_SERVER': 'hdp-oozie::server',
- 'OOZIE_CLIENT': 'hdp-oozie::client',
- 'HIVE_CLIENT': 'hdp-hive::client',
- 'HCAT': 'hdp-hcat',
- 'HIVE_SERVER': 'hdp-hive::server',
- 'HIVE_METASTORE': 'hdp-hive::metastore',
- 'MYSQL_SERVER': 'hdp-mysql::server',
- 'WEBHCAT_SERVER': 'hdp-templeton::server',
- 'DASHBOARD': 'hdp-dashboard',
- 'GANGLIA_SERVER': 'hdp-ganglia::server',
- 'GANGLIA_MONITOR': 'hdp-ganglia::monitor',
- 'HTTPD': 'hdp-monitor-webserver',
- 'HUE_SERVER': 'hdp-hue::server',
- 'HDFS_SERVICE_CHECK': 'hdp-hadoop::hdfs::service_check',
- 'MAPREDUCE_SERVICE_CHECK': 'hdp-hadoop::mapred::service_check',
- 'MAPREDUCE2_SERVICE_CHECK': 'hdp-yarn::mapred2::service_check',
- 'ZOOKEEPER_SERVICE_CHECK': 'hdp-zookeeper::zookeeper::service_check',
- 'ZOOKEEPER_QUORUM_SERVICE_CHECK': 'hdp-zookeeper::quorum::service_check',
- 'HBASE_SERVICE_CHECK': 'hdp-hbase::hbase::service_check',
- 'HIVE_SERVICE_CHECK': 'hdp-hive::hive::service_check',
- 'HCAT_SERVICE_CHECK': 'hdp-hcat::hcat::service_check',
- 'OOZIE_SERVICE_CHECK': 'hdp-oozie::oozie::service_check',
- 'PIG_SERVICE_CHECK': 'hdp-pig::pig::service_check',
- 'SQOOP_SERVICE_CHECK': 'hdp-sqoop::sqoop::service_check',
- 'WEBHCAT_SERVICE_CHECK': 'hdp-templeton::templeton::service_check',
- 'DASHBOARD_SERVICE_CHECK': 'hdp-dashboard::dashboard::service_check',
- 'DECOMMISSION_DATANODE': 'hdp-hadoop::hdfs::decommission',
- 'HUE_SERVICE_CHECK': 'hdp-hue::service_check',
- 'RESOURCEMANAGER_SERVICE_CHECK': 'hdp-yarn::resourcemanager::service_check',
- 'HISTORYSERVER_SERVICE_CHECK': 'hdp-yarn::historyserver::service_check',
- 'TEZ_CLIENT': 'hdp-tez::tez_client',
- 'YARN_SERVICE_CHECK': 'hdp-yarn::yarn::service_check',
- 'FLUME_SERVER': 'hdp-flume',
- 'JOURNALNODE': 'hdp-hadoop::journalnode',
- 'ZKFC': 'hdp-hadoop::zkfc'
-}
-
-serviceStates = {
- 'START': 'running',
- 'INSTALL': 'installed_and_configured',
- 'STOP': 'stopped'
-}
-
-servicesToPidNames = {
- 'GLUSTERFS' : 'glusterd.pid$',
- 'NAMENODE': 'hadoop-{USER}-namenode.pid$',
- 'SECONDARY_NAMENODE': 'hadoop-{USER}-secondarynamenode.pid$',
- 'DATANODE': 'hadoop-{USER}-datanode.pid$',
- 'JOBTRACKER': 'hadoop-{USER}-jobtracker.pid$',
- 'TASKTRACKER': 'hadoop-{USER}-tasktracker.pid$',
- 'RESOURCEMANAGER': 'yarn-{USER}-resourcemanager.pid$',
- 'NODEMANAGER': 'yarn-{USER}-nodemanager.pid$',
- 'HISTORYSERVER': 'mapred-{USER}-historyserver.pid$',
- 'JOURNALNODE': 'hadoop-{USER}-journalnode.pid$',
- 'ZKFC': 'hadoop-{USER}-zkfc.pid$',
- 'OOZIE_SERVER': 'oozie.pid',
- 'ZOOKEEPER_SERVER': 'zookeeper_server.pid',
- 'FLUME_SERVER': 'flume-node.pid',
- 'TEMPLETON_SERVER': 'templeton.pid',
- 'GANGLIA_SERVER': 'gmetad.pid',
- 'GANGLIA_MONITOR': 'gmond.pid',
- 'HBASE_MASTER': 'hbase-{USER}-master.pid',
- 'HBASE_REGIONSERVER': 'hbase-{USER}-regionserver.pid',
- 'HCATALOG_SERVER': 'webhcat.pid',
- 'KERBEROS_SERVER': 'kadmind.pid',
- 'HIVE_SERVER': 'hive-server.pid',
- 'HIVE_METASTORE': 'hive.pid',
- 'MYSQL_SERVER': 'mysqld.pid',
- 'HUE_SERVER': '/var/run/hue/supervisor.pid',
- 'WEBHCAT_SERVER': 'webhcat.pid',
-}
-
-#Each service, which's pid depends on user should provide user mapping
-servicesToLinuxUser = {
- 'NAMENODE': 'hdfs_user',
- 'SECONDARY_NAMENODE': 'hdfs_user',
- 'DATANODE': 'hdfs_user',
- 'JOURNALNODE': 'hdfs_user',
- 'ZKFC': 'hdfs_user',
- 'JOBTRACKER': 'mapred_user',
- 'TASKTRACKER': 'mapred_user',
- 'RESOURCEMANAGER': 'yarn_user',
- 'NODEMANAGER': 'yarn_user',
- 'HISTORYSERVER': 'mapred_user',
- 'HBASE_MASTER': 'hbase_user',
- 'HBASE_REGIONSERVER': 'hbase_user',
-}
-
-pidPathesVars = [
- {'var' : 'glusterfs_pid_dir_prefix',
- 'defaultValue' : '/var/run'},
- {'var' : 'hadoop_pid_dir_prefix',
- 'defaultValue' : '/var/run/hadoop'},
- {'var' : 'hadoop_pid_dir_prefix',
- 'defaultValue' : '/var/run/hadoop'},
- {'var' : 'ganglia_runtime_dir',
- 'defaultValue' : '/var/run/ganglia/hdp'},
- {'var' : 'hbase_pid_dir',
- 'defaultValue' : '/var/run/hbase'},
- {'var' : 'zk_pid_dir',
- 'defaultValue' : '/var/run/zookeeper'},
- {'var' : 'oozie_pid_dir',
- 'defaultValue' : '/var/run/oozie'},
- {'var' : 'hcat_pid_dir',
- 'defaultValue' : '/var/run/webhcat'},
- {'var' : 'hive_pid_dir',
- 'defaultValue' : '/var/run/hive'},
- {'var' : 'mysqld_pid_dir',
- 'defaultValue' : '/var/run/mysqld'},
- {'var' : 'hcat_pid_dir',
- 'defaultValue' : '/var/run/webhcat'},
- {'var' : 'yarn_pid_dir_prefix',
- 'defaultValue' : '/var/run/hadoop-yarn'},
- {'var' : 'mapred_pid_dir_prefix',
- 'defaultValue' : '/var/run/hadoop-mapreduce'},
-]
+SETUP_ACTION = "setup"
+START_ACTION = "start"
+STOP_ACTION = "stop"
+RESET_ACTION = "reset"
+STATUS_ACTION = "status"
+DEBUG_ACTION = "debug"
+
+IS_WINDOWS = platform.system() == "Windows"
+
+if not IS_WINDOWS:
+ from AgentConfig_linux import *
+else:
+ from AgentConfig_windows import *
+
+config = ConfigParser.RawConfigParser()
+s = StringIO.StringIO(content)
+config.readfp(s)
class AmbariConfig:
TWO_WAY_SSL_PROPERTY = "security.server.two_way_ssl"
@@ -246,12 +73,31 @@ class AmbariConfig:
def add_section(self, section):
self.config.add_section(section)
+ @staticmethod
+ def getConfigFile():
+ global configFile
+ return configFile
+
+ @staticmethod
+ def getLogFile():
+ global logfile
+ return logfile
+
+ @staticmethod
+ def getOutFile():
+ global outfile
+ return outfile
+
def setConfig(self, customConfig):
self.config = customConfig
def getConfig(self):
return self.config
+ def getImports(self):
+ global imports
+ return imports
+
def getRolesToClass(self):
global rolesToClass
return rolesToClass
@@ -264,13 +110,9 @@ class AmbariConfig:
global servicesToPidNames
return servicesToPidNames
- def getImports(self):
- global imports
- return imports
-
- def getPidPathesVars(self):
- global pidPathesVars
- return pidPathesVars
+ def pidPathVars(self):
+ global pidPathVars
+ return pidPathVars
def has_option(self, section, option):
return self.config.has_option(section, option)
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index dc3a1cf..d985b91 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -22,6 +22,7 @@ import logging
import signal
import json
import sys
+import platform
import os
import socket
import time
@@ -46,11 +47,21 @@ logger = logging.getLogger()
AGENT_AUTO_RESTART_EXIT_CODE = 77
+IS_WINDOWS = platform.system() == "Windows"
+
class Controller(threading.Thread):
- def __init__(self, config, range=30):
+ def __init__(self, config, heartbeat_stop_callback = None, range=30):
threading.Thread.__init__(self)
logger.debug('Initializing Controller RPC thread.')
+
+ if heartbeat_stop_callback is None:
+ if IS_WINDOWS:
+ from HeartbeatHandlers_windows import HeartbeatStopHandler
+ else:
+ from HeartbeatStopHandler_linux import HeartbeatStopHandler
+ heartbeat_stop_callback = HeartbeatStopHandler()
+
self.lock = threading.Lock()
self.safeMode = True
self.credential = None
@@ -62,7 +73,7 @@ class Controller(threading.Thread):
self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
self.componentsUrl = server_secured_url + '/agent/v1/components/'
- self.netutil = NetUtil()
+ self.netutil = NetUtil(heartbeat_stop_callback)
self.responseId = -1
self.repeatRegistration = False
self.isRegistered = False
@@ -71,10 +82,10 @@ class Controller(threading.Thread):
self.hasMappedComponents = True
# Event is used for synchronizing heartbeat iterations (to make possible
# manual wait() interruption between heartbeats )
- self.heartbeat_wait_event = threading.Event()
+ self.heartbeat_stop_callback = heartbeat_stop_callback
# List of callbacks that are called at agent registration
self.registration_listeners = []
-
+
# pull config directory out of config
cache_dir = config.get('agent', 'cache_dir')
if cache_dir is None:
@@ -197,6 +208,9 @@ class Controller(threading.Thread):
DEBUG_SUCCESSFULL_HEARTBEATS = 0
DEBUG_STOP_HEARTBEATING = False
+ def trigger_heartbeat(self):
+ self.heartbeat_stop_callback.set_heartbeat()
+
def heartbeatWithServer(self):
self.DEBUG_HEARTBEAT_RETRIES = 0
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
@@ -261,14 +275,14 @@ class Controller(threading.Thread):
if 'statusCommands' in response.keys():
self.addToStatusQueue(response['statusCommands'])
pass
-
+
if 'alertDefinitionCommands' in response.keys():
self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True)
pass
if 'alertExecutionCommands' in response.keys():
self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
- pass
+ pass
if "true" == response['restartAgent']:
logger.error("Received the restartAgent command")
@@ -284,7 +298,7 @@ class Controller(threading.Thread):
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
- self.heartbeat_wait_event.clear()
+ self.heartbeat_stop_callback.reset_heartbeat()
except ssl.SSLError:
self.repeatRegistration=False
self.isRegistered = False
@@ -319,10 +333,10 @@ class Controller(threading.Thread):
# Sleep for some time
timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
- self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
- self.heartbeat_wait_event.wait(timeout=timeout)
- # Sleep a bit more to allow STATUS_COMMAND results to be collected
- # and sent in one heartbeat. Also avoid server overload with heartbeats
- time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
+ if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
+ # Stop loop when stop event received
+ logger.info("Stop event received")
+ self.DEBUG_STOP_HEARTBEATING=True
pass
def run(self):
@@ -405,7 +419,10 @@ class Controller(threading.Thread):
def main(argv=None):
# Allow Ctrl-C
- signal.signal(signal.SIGINT, signal.SIG_DFL)
+ if IS_WINDOWS:
+ from HeartbeatHandlers_windows import bind_signal_handlers
+ else:
+ from HeartbeatStopHandler_linux import bind_signal_handlers
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
@@ -417,7 +434,8 @@ def main(argv=None):
logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
config = AmbariConfig.config
- collector = Controller(config)
+ heartbeat_stop_callback = bind_signal_handlers()
+ collector = Controller(config, heartbeat_stop_callback)
collector.start()
collector.run()
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index e0c5a28..7b5889c 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -64,7 +64,7 @@ class CustomServiceOrchestrator():
self.public_fqdn = hostname.public_hostname(config)
# cache reset will be called on every agent registration
controller.registration_listeners.append(self.file_cache.reset)
-
+
# Clean up old status command files if any
try:
os.unlink(self.status_commands_stdout)
@@ -88,7 +88,7 @@ class CustomServiceOrchestrator():
"reason - {reason} . Killing process {pid}"
.format(tid = str(task_id), reason = reason, pid = pid))
shell.kill_process_with_children(pid)
- else:
+ else:
logger.warn("Unable to find pid by taskId = %s"%task_id)
def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name = None,
@@ -101,7 +101,7 @@ class CustomServiceOrchestrator():
script_type = command['commandParams']['script_type']
script = command['commandParams']['script']
timeout = int(command['commandParams']['command_timeout'])
-
+
if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']:
server_url_prefix = command['hostLevelParams']['jdk_location']
else:
@@ -149,7 +149,7 @@ class CustomServiceOrchestrator():
handle = command['__handle']
handle.on_background_command_started = self.map_task_to_process
del command['__handle']
-
+
json_path = self.dump_command_to_json(command)
pre_hook_tuple = self.resolve_hook_script_path(hook_dir,
self.PRE_HOOK_PREFIX, command_name, script_type)
@@ -187,7 +187,7 @@ class CustomServiceOrchestrator():
if cancel_reason:
ret['stdout'] += cancel_reason
ret['stderr'] += cancel_reason
-
+
with open(tmpoutfile, "a") as f:
f.write(cancel_reason)
with open(tmperrfile, "a") as f:
@@ -213,7 +213,7 @@ class CustomServiceOrchestrator():
if not isinstance(pid, int):
return '\nCommand aborted. ' + pid
return None
-
+
def requestComponentStatus(self, command):
"""
Component status is determined by exit code, returned by runCommand().
@@ -262,6 +262,8 @@ class CustomServiceOrchestrator():
# Perform few modifications to stay compatible with the way in which
public_fqdn = self.public_fqdn
command['public_hostname'] = public_fqdn
+ # Add cache dir to make it visible for commands
+ command["hostLevelParams"]["agentCacheDir"] = self.config.get('agent', 'cache_dir')
# Now, dump the json file
command_type = command['commandType']
from ActionQueue import ActionQueue # To avoid cyclic dependency
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/Facter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Facter.py b/ambari-agent/src/main/python/ambari_agent/Facter.py
index 947f380..aabc77d 100644
--- a/ambari-agent/src/main/python/ambari_agent/Facter.py
+++ b/ambari-agent/src/main/python/ambari_agent/Facter.py
@@ -27,55 +27,29 @@ import shlex
import socket
import multiprocessing
import subprocess
-
+from shell import shellRunner
import time
import uuid
from ambari_commons import OSCheck
log = logging.getLogger()
-# selinux command
-GET_SE_LINUX_ST_CMD = "/usr/sbin/sestatus"
-GET_IFCONFIG_CMD = "ifconfig"
-GET_UPTIME_CMD = "cat /proc/uptime"
-GET_MEMINFO_CMD = "cat /proc/meminfo"
-
-class Facter():
- def __init__(self):
-
- self.DATA_IFCONFIG_OUTPUT = Facter.setDataIfConfigOutput()
- self.DATA_UPTIME_OUTPUT = Facter.setDataUpTimeOutput()
- self.DATA_MEMINFO_OUTPUT = Facter.setMemInfoOutput()
-
- @staticmethod
- def setDataIfConfigOutput():
-
- try:
- result = os.popen(GET_IFCONFIG_CMD).read()
- return result
- except OSError:
- log.warn("Can't execute {0}".format(GET_IFCONFIG_CMD))
- return ""
-
- @staticmethod
- def setDataUpTimeOutput():
- try:
- result = os.popen(GET_UPTIME_CMD).read()
- return result
- except OSError:
- log.warn("Can't execute {0}".format(GET_UPTIME_CMD))
- return ""
+def run_os_command(cmd):
+ if type(cmd) == str:
+ cmd = shlex.split(cmd)
+ process = subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE
+ )
+ (stdoutdata, stderrdata) = process.communicate()
+ return process.returncode, stdoutdata, stderrdata
- @staticmethod
- def setMemInfoOutput():
- try:
- result = os.popen(GET_MEMINFO_CMD).read()
- return result
- except OSError:
- log.warn("Can't execute {0}".format(GET_MEMINFO_CMD))
- return ""
+class FacterBase():
+ def __init__(self):
+ pass
# Returns the currently running user id
def getId(self):
@@ -87,7 +61,7 @@ class Facter():
# Returns the FQDN of the host
def getFqdn(self):
- return socket.getfqdn()
+ return socket.getfqdn().lower()
# Returns the host's primary DNS domain name
def getDomain(self):
@@ -153,15 +127,211 @@ class Facter():
def getOsFamily(self):
return OSCheck.get_os_family()
+ # Return uptime hours
+ def getUptimeHours(self):
+ return self.getUptimeSeconds() / (60 * 60)
+
+ # Return uptime days
+ def getUptimeDays(self):
+ return self.getUptimeSeconds() / (60 * 60 * 24)
+
+ def facterInfo(self):
+ facterInfo = {}
+ facterInfo['id'] = self.getId()
+ facterInfo['kernel'] = self.getKernel()
+ facterInfo['domain'] = self.getDomain()
+ facterInfo['fqdn'] = self.getFqdn()
+ facterInfo['hostname'] = self.getHostname()
+ facterInfo['macaddress'] = self.getMacAddress()
+ facterInfo['architecture'] = self.getArchitecture()
+ facterInfo['operatingsystem'] = self.getOperatingSystem()
+ facterInfo['operatingsystemrelease'] = self.getOperatingSystemRelease()
+ facterInfo['physicalprocessorcount'] = self.getProcessorcount()
+ facterInfo['processorcount'] = self.getProcessorcount()
+ facterInfo['timezone'] = self.getTimeZone()
+ facterInfo['hardwareisa'] = self.getArchitecture()
+ facterInfo['hardwaremodel'] = self.getArchitecture()
+ facterInfo['kernelrelease'] = self.getKernelRelease()
+ facterInfo['kernelversion'] = self.getKernelVersion()
+ facterInfo['osfamily'] = self.getOsFamily()
+ facterInfo['kernelmajversion'] = self.getKernelMajVersion()
+
+ facterInfo['ipaddress'] = self.getIpAddress()
+ facterInfo['netmask'] = self.getNetmask()
+ facterInfo['interfaces'] = self.getInterfaces()
+
+ facterInfo['uptime_seconds'] = str(self.getUptimeSeconds())
+ facterInfo['uptime_hours'] = str(self.getUptimeHours())
+ facterInfo['uptime_days'] = str(self.getUptimeDays())
+
+ facterInfo['memorysize'] = self.getMemorySize()
+ facterInfo['memoryfree'] = self.getMemoryFree()
+ facterInfo['memorytotal'] = self.getMemoryTotal()
+
+ return facterInfo
+
+ #Convert kB to GB
+ @staticmethod
+ def convertSizeKbToGb(size):
+ return "%0.2f GB" % round(float(size) / (1024.0 * 1024.0), 2)
+
+ #Convert MB to GB
+ @staticmethod
+ def convertSizeMbToGb(size):
+ return "%0.2f GB" % round(float(size) / (1024.0), 2)
+
+
+class FacterWindows(FacterBase):
+ GET_SYSTEM_INFO_CMD = "systeminfo"
+ GET_MEMORY_CMD = '$mem =(Get-WMIObject Win32_OperatingSystem -ComputerName "LocalHost" ); echo "$($mem.FreePhysicalMemory) $($mem.TotalVisibleMemorySize)"'
+ GET_PAGE_FILE_INFO = '$pgo=(Get-WmiObject Win32_PageFileUsage); echo "$($pgo.AllocatedBaseSize) $($pgo.AllocatedBaseSize-$pgo.CurrentUsage)"'
+ GET_UPTIME_CMD = 'echo $([int]((get-date)-[system.management.managementdatetimeconverter]::todatetime((get-wmiobject -class win32_operatingsystem).Lastbootuptime)).TotalSeconds)'
+
+ # Return first ip adress
+ def getIpAddress(self):
+ #TODO check if we need ipconfig
+ return socket.gethostbyname(socket.gethostname().lower())
+
+ # Return netmask
+ def getNetmask(self):
+ #TODO return correct netmask
+ return 'OS NOT SUPPORTED'
+
+ # Return interfaces
+ def getInterfaces(self):
+ #TODO return correct interfaces
+ return 'OS NOT SUPPORTED'
+
+ # Return uptime seconds
+ def getUptimeSeconds(self):
+ try:
+ runner = shellRunner()
+ result = runner.runPowershell(script_block=FacterWindows.GET_UPTIME_CMD).output.replace('\n', '').replace('\r',
+ '')
+ return int(result)
+ except:
+ log.warn("Can not get SwapFree")
+ return 0
+
+ # Return memoryfree
+ def getMemoryFree(self):
+ try:
+ runner = shellRunner()
+ result = runner.runPowershell(script_block=FacterWindows.GET_MEMORY_CMD).output.split(" ")[0].replace('\n',
+ '').replace(
+ '\r', '')
+ return result
+ except:
+ log.warn("Can not get MemoryFree")
+ return 0
+
+ # Return memorytotal
+ def getMemoryTotal(self):
+ try:
+ runner = shellRunner()
+ result = runner.runPowershell(script_block=FacterWindows.GET_MEMORY_CMD).output.split(" ")[-1].replace('\n',
+ '').replace(
+ '\r', '')
+ return result
+ except:
+ log.warn("Can not get MemoryTotal")
+ return 0
+
+ # Return swapfree
+ def getSwapFree(self):
+ try:
+ runner = shellRunner()
+ result = runner.runPowershell(script_block=FacterWindows.GET_PAGE_FILE_INFO).output.split(" ")[-1].replace('\n',
+ '').replace(
+ '\r', '')
+ return result
+ except:
+ log.warn("Can not get SwapFree")
+ return 0
+
+ # Return swapsize
+ def getSwapSize(self):
+ try:
+ runner = shellRunner()
+ result = runner.runPowershell(script_block=FacterWindows.GET_PAGE_FILE_INFO).output.split(" ")[0].replace('\n',
+ '').replace(
+ '\r', '')
+ return result
+ except:
+ log.warn("Can not get SwapFree")
+ return 0
+
+ # Return memorysize
+ def getMemorySize(self):
+ try:
+ runner = shellRunner()
+ result = runner.runPowershell(script_block=FacterWindows.GET_MEMORY_CMD).output.split(" ")[-1].replace('\n',
+ '').replace(
+ '\r', '')
+ return result
+ except:
+ log.warn("Can not get MemorySize")
+ return 0
+
+ def facterInfo(self):
+ facterInfo = FacterBase.facterInfo(self)
+ facterInfo['swapsize'] = FacterBase.convertSizeMbToGb(self.getSwapSize())
+ facterInfo['swapfree'] = FacterBase.convertSizeMbToGb(self.getSwapFree())
+ return facterInfo
+
+
+class FacterLinux(FacterBase):
+ # selinux command
+ GET_SE_LINUX_ST_CMD = "/usr/sbin/sestatus"
+ GET_IFCONFIG_CMD = "ifconfig"
+ GET_UPTIME_CMD = "cat /proc/uptime"
+ GET_MEMINFO_CMD = "cat /proc/meminfo"
+
+ def __init__(self):
+
+ self.DATA_IFCONFIG_OUTPUT = Facter.setDataIfConfigOutput()
+ self.DATA_UPTIME_OUTPUT = Facter.setDataUpTimeOutput()
+ self.DATA_MEMINFO_OUTPUT = Facter.setMemInfoOutput()
+
+ @staticmethod
+ def setDataIfConfigOutput():
+
+ try:
+ result = os.popen(FacterLinux.GET_IFCONFIG_CMD).read()
+ return result
+ except OSError:
+ log.warn("Can't execute {0}".format(FacterLinux.GET_IFCONFIG_CMD))
+ return ""
+
+ @staticmethod
+ def setDataUpTimeOutput():
+
+ try:
+ result = os.popen(FacterLinux.GET_UPTIME_CMD).read()
+ return result
+ except OSError:
+ log.warn("Can't execute {0}".format(FacterLinux.GET_UPTIME_CMD))
+ return ""
+
+ @staticmethod
+ def setMemInfoOutput():
+
+ try:
+ result = os.popen(FacterLinux.GET_MEMINFO_CMD).read()
+ return result
+ except OSError:
+ log.warn("Can't execute {0}".format(FacterLinux.GET_MEMINFO_CMD))
+ return ""
+
def isSeLinux(self):
try:
- retcode, out, err = run_os_command(GET_SE_LINUX_ST_CMD)
+ retcode, out, err = run_os_command(FacterLinux.GET_SE_LINUX_ST_CMD)
se_status = re.search('(enforcing|permissive|enabled)', out)
if se_status:
return True
except OSError:
- log.warn("Could not run {0}: OK".format(GET_SE_LINUX_ST_CMD))
+ log.warn("Could not run {0}: OK".format(FacterLinux.GET_SE_LINUX_ST_CMD))
return False
# Function that returns list of values that matches
@@ -183,10 +353,6 @@ class Facter():
return result
- #Convert kB to GB
- def convertSizeKbToGb(self, size):
- return "%0.2f GB" % round(float(size) / (1024.0 * 1024.0), 2)
-
# Return first ip adress
def getIpAddress(self):
ip_pattern="(?: inet addr:)(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})"
@@ -195,7 +361,7 @@ class Facter():
result = self.data_return_first(ip_pattern,self.DATA_IFCONFIG_OUTPUT)
if result == '':
log.warn("Can't get an ip address from {0}".format(self.DATA_IFCONFIG_OUTPUT))
- return socket.gethostbyname(socket.gethostname())
+ return socket.gethostbyname(socket.gethostname().lower())
else:
return result
@@ -231,15 +397,6 @@ class Facter():
log.warn("Can't get an uptime value from {0}".format(self.DATA_UPTIME_OUTPUT))
return 0
-
- # Return uptime hours
- def getUptimeHours(self):
- return self.getUptimeSeconds() / (60 * 60)
-
- # Return uptime days
- def getUptimeDays(self):
- return self.getUptimeSeconds() / (60 * 60 * 24)
-
# Return memoryfree
def getMemoryFree(self):
#:memoryfree_mb => "MemFree",
@@ -284,55 +441,18 @@ class Facter():
log.warn("Can't get memory size from {0}".format(self.DATA_MEMINFO_OUTPUT))
return 0
-
def facterInfo(self):
- facterInfo = {}
- facterInfo['id'] = self.getId()
- facterInfo['kernel'] = self.getKernel()
- facterInfo['domain'] = self.getDomain()
- facterInfo['fqdn'] = self.getFqdn()
- facterInfo['hostname'] = self.getHostname()
- facterInfo['macaddress'] = self.getMacAddress()
- facterInfo['architecture'] = self.getArchitecture()
- facterInfo['operatingsystem'] = self.getOperatingSystem()
- facterInfo['operatingsystemrelease'] = self.getOperatingSystemRelease()
- facterInfo['physicalprocessorcount'] = self.getProcessorcount()
- facterInfo['processorcount'] = self.getProcessorcount()
- facterInfo['timezone'] = self.getTimeZone()
- facterInfo['hardwareisa'] = self.getArchitecture()
- facterInfo['hardwaremodel'] = self.getArchitecture()
- facterInfo['kernelrelease'] = self.getKernelRelease()
- facterInfo['kernelversion'] = self.getKernelVersion()
- facterInfo['osfamily'] = self.getOsFamily()
+ facterInfo = FacterBase.facterInfo(self)
facterInfo['selinux'] = self.isSeLinux()
- facterInfo['kernelmajversion'] = self.getKernelMajVersion()
-
- facterInfo['ipaddress'] = self.getIpAddress()
- facterInfo['netmask'] = self.getNetmask()
- facterInfo['interfaces'] = self.getInterfaces()
-
- facterInfo['uptime_seconds'] = str(self.getUptimeSeconds())
- facterInfo['uptime_hours'] = str(self.getUptimeHours())
- facterInfo['uptime_days'] = str(self.getUptimeDays())
-
- facterInfo['memorysize'] = self.getMemorySize()
- facterInfo['memoryfree'] = self.getMemoryFree()
- facterInfo['swapsize'] = self.convertSizeKbToGb(self.getSwapSize())
- facterInfo['swapfree'] = self.convertSizeKbToGb(self.getSwapFree())
- facterInfo['memorytotal'] = self.getMemoryTotal()
-
+ facterInfo['swapsize'] = FacterBase.convertSizeKbToGb(self.getSwapSize())
+ facterInfo['swapfree'] = FacterBase.convertSizeKbToGb(self.getSwapFree())
return facterInfo
-def run_os_command(cmd):
- if type(cmd) == str:
- cmd = shlex.split(cmd)
- process = subprocess.Popen(cmd,
- stdout=subprocess.PIPE,
- stdin=subprocess.PIPE,
- stderr=subprocess.PIPE
- )
- (stdoutdata, stderrdata) = process.communicate()
- return process.returncode, stdoutdata, stderrdata
+
+if platform.system() == "Windows":
+ Facter = FacterWindows
+else:
+ Facter = FacterLinux
def main(argv=None):
@@ -341,8 +461,3 @@ def main(argv=None):
if __name__ == '__main__':
main()
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/FileCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/FileCache.py b/ambari-agent/src/main/python/ambari_agent/FileCache.py
index 67b14b2..6820db7 100644
--- a/ambari-agent/src/main/python/ambari_agent/FileCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/FileCache.py
@@ -24,6 +24,7 @@ import os
import shutil
import zipfile
import urllib2
+import urllib
logger = logging.getLogger()
@@ -155,7 +156,7 @@ class FileCache():
filename - file inside directory we are trying to fetch
"""
return "{0}/{1}/{2}".format(server_url_prefix,
- directory, filename)
+ urllib.pathname2url(directory), filename)
def fetch_url(self, url):
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/Hardware.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Hardware.py b/ambari-agent/src/main/python/ambari_agent/Hardware.py
index 801fe3f..8b93355 100644
--- a/ambari-agent/src/main/python/ambari_agent/Hardware.py
+++ b/ambari-agent/src/main/python/ambari_agent/Hardware.py
@@ -21,12 +21,15 @@ limitations under the License.
import os.path
import logging
import subprocess
+import platform
+from shell import shellRunner
from Facter import Facter
logger = logging.getLogger()
class Hardware:
SSH_KEY_PATTERN = 'ssh.*key'
+ WINDOWS_GET_DRIVES_CMD ="foreach ($drive in [System.IO.DriveInfo]::getdrives()){$available = $drive.TotalFreeSpace;$used = $drive.TotalSize-$drive.TotalFreeSpace;$percent = ($used*100)/$drive.TotalSize;$size = $drive.TotalSize;$type = $drive.DriveFormat;$mountpoint = $drive.RootDirectory.FullName;echo \"$available $used $percent% $size $type $mountpoint\"}"
def __init__(self):
self.hardware = {}
@@ -59,8 +62,15 @@ class Hardware:
@staticmethod
def osdisks():
- """ Run df to find out the disks on the host. Only works on linux
- platforms. Note that this parser ignores any filesystems with spaces
+ if platform.system() == "Windows":
+ return Hardware._osdisks_win()
+ else:
+ return Hardware._osdisks_linux()
+
+ @staticmethod
+ def _osdisks_linux():
+ """ Run df to find out the disks on the host. Only works on linux
+ platforms. Note that this parser ignores any filesystems with spaces
and any mounts with spaces. """
mounts = []
df = subprocess.Popen(["df", "-kPT"], stdout=subprocess.PIPE)
@@ -74,6 +84,25 @@ class Hardware:
pass
return mounts
+ @staticmethod
+ def _osdisks_win():
+ mounts = []
+ runner = shellRunner()
+ command_result = runner.runPowershell(script_block=Hardware.WINDOWS_GET_DRIVES_CMD)
+ if command_result.exitCode != 0:
+ return mounts
+ else:
+ for drive in [line for line in command_result.output.split(os.linesep) if line != '']:
+ available, used, percent, size, type, mountpoint = drive.split(" ")
+ mounts.append({"available": available,
+ "used": used,
+ "percent": percent,
+ "size": size,
+ "type": type,
+ "mountpoint": mountpoint})
+
+ return mounts
+
def get(self):
return self.hardware
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers_windows.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers_windows.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers_windows.py
new file mode 100644
index 0000000..1b6c7de
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers_windows.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import win32event
+
+from ambari_commons.exceptions import FatalException
+
+
+def bind_signal_handlers(agentPid):
+ return HeartbeatStopHandler()
+
+
+class HeartbeatStopHandler:
+ def __init__(self, stopEvent = None):
+ # Event is used for synchronizing heartbeat iterations (to make possible
+ # manual wait() interruption between heartbeats )
+ self._heventHeartbeat = win32event.CreateEvent(None, 0, 0, None)
+
+ # Event is used to stop the Agent process
+ if stopEvent is None:
+ #Allow standalone testing
+ self._heventStop = win32event.CreateEvent(None, 0, 0, None)
+ else:
+ #Allow one unique event per process
+ self._heventStop = stopEvent
+
+ def set_heartbeat(self):
+ win32event.SetEvent(self._heventHeartbeat)
+
+ def reset_heartbeat(self):
+ win32event.ResetEvent(self._heventHeartbeat)
+
+ def wait(self, timeout1, timeout2 = 0):
+ timeout = int(timeout1 + timeout2) * 1000
+
+ result = win32event.WaitForMultipleObjects([self._heventStop, self._heventHeartbeat], False, timeout)
+ if(win32event.WAIT_OBJECT_0 != result and win32event.WAIT_OBJECT_0 + 1 != result and win32event.WAIT_TIMEOUT != result):
+ raise FatalException(-1, "Error waiting for stop/heartbeat events: " + string(result))
+ if(win32event.WAIT_TIMEOUT == result):
+ return -1
+ return result - win32event.WAIT_OBJECT_0
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/HeartbeatStopHandler_linux.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatStopHandler_linux.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatStopHandler_linux.py
new file mode 100644
index 0000000..2ef8c7f
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatStopHandler_linux.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import logging
+import signal
+import threading
+import traceback
+
+
+logger = logging.getLogger()
+
+_handler = None
+
+def signal_handler(signum, frame):
+ _handler.set_stop()
+
+def bind_signal_handlers(agentPid):
+ if os.getpid() == agentPid:
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+ signal.signal(signal.SIGUSR1, debug)
+
+ global _handler
+ _handler = HeartbeatStopHandler()
+
+ return _handler
+
+def debug(sig, frame):
+ """Interrupt running process, and provide a python prompt for
+ interactive debugging."""
+ d={'_frame':frame} # Allow access to frame object.
+ d.update(frame.f_globals) # Unless shadowed by global
+ d.update(frame.f_locals)
+
+ message = "Signal received : entering python shell.\nTraceback:\n"
+ message += ''.join(traceback.format_stack(frame))
+ logger.info(message)
+
+class HeartbeatStopHandler:
+ def __init__(self, stopEvent = None):
+ # Event is used for synchronizing heartbeat iterations (to make possible
+ # manual wait() interruption between heartbeats )
+ self.heartbeat_wait_event = threading.Event()
+
+ # Event is used to stop the Agent process
+ if stopEvent is None:
+ #Allow standalone testing
+ self.stop_event = threading.Event()
+ else:
+ #Allow one unique event per process
+ self.stop_event = stopEvent
+
+ def set_heartbeat(self):
+ self.heartbeat_wait_event.set()
+
+ def reset_heartbeat(self):
+ self.heartbeat_wait_event.clear()
+
+ def set_stop(self):
+ self.stop_event.set()
+
+ def wait(self, timeout1, timeout2 = 0):
+ if self.heartbeat_wait_event.wait(timeout = timeout1):
+ #Event signaled, exit
+ return 0
+ # Stop loop when stop event received
+ # Otherwise sleep a bit more to allow STATUS_COMMAND results to be collected
+ # and sent in one heartbeat. Also avoid server overload with heartbeats
+ if self.stop_event.wait(timeout = timeout2):
+ logger.info("Stop event received")
+ return 1
+ #Timeout
+ return -1
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/HostCheckReportFileHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HostCheckReportFileHandler.py b/ambari-agent/src/main/python/ambari_agent/HostCheckReportFileHandler.py
index c671584..bf24730 100644
--- a/ambari-agent/src/main/python/ambari_agent/HostCheckReportFileHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/HostCheckReportFileHandler.py
@@ -31,12 +31,13 @@ class HostCheckReportFileHandler:
HOST_CHECK_FILE = "hostcheck.result"
def __init__(self, config):
- if config != None:
+ self.hostCheckFilePath = None
+ if config is not None:
hostCheckFileDir = config.get('agent', 'prefix')
self.hostCheckFilePath = os.path.join(hostCheckFileDir, self.HOST_CHECK_FILE)
def writeHostCheckFile(self, hostInfo):
- if self.hostCheckFilePath == None:
+ if self.hostCheckFilePath is None:
return
try:
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/HostInfo.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HostInfo.py b/ambari-agent/src/main/python/ambari_agent/HostInfo.py
index 89e22b1..7a1e50a 100644
--- a/ambari-agent/src/main/python/ambari_agent/HostInfo.py
+++ b/ambari-agent/src/main/python/ambari_agent/HostInfo.py
@@ -18,394 +18,11 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
-import os
-import glob
-import logging
-import pwd
-import re
-import time
-import subprocess
-import threading
-import shlex
import platform
-import hostname
-from PackagesAnalyzer import PackagesAnalyzer
-from HostCheckReportFileHandler import HostCheckReportFileHandler
-from Hardware import Hardware
-from ambari_commons import OSCheck, OSConst, Firewall
-import socket
-logger = logging.getLogger()
-
-# service cmd
-SERVICE_CMD = "service"
-
-
-class HostInfo:
- # List of project names to be used to find alternatives folders etc.
- DEFAULT_PROJECT_NAMES = [
- "hadoop*", "hadoop", "hbase", "hcatalog", "hive", "ganglia",
- "oozie", "sqoop", "hue", "zookeeper", "mapred", "hdfs", "flume",
- "storm", "hive-hcatalog", "tez", "falcon", "ambari_qa", "hadoop_deploy",
- "rrdcached", "hcat", "ambari-qa", "sqoop-ambari-qa", "sqoop-ambari_qa",
- "webhcat", "hadoop-hdfs", "hadoop-yarn", "hadoop-mapreduce"
- ]
-
- # List of live services checked for on the host, takes a map of plan strings
- DEFAULT_LIVE_SERVICES = [
- {OSConst.REDHAT_FAMILY: "ntpd", OSConst.SUSE_FAMILY: "ntp", OSConst.UBUNTU_FAMILY: "ntp"}
- ]
-
- # Set of default users (need to be replaced with the configured user names)
- DEFAULT_USERS = [
- "hive", "ambari-qa", "oozie", "hbase", "hcat", "mapred",
- "hdfs", "rrdcached", "zookeeper", "flume", "sqoop", "sqoop2",
- "hue", "yarn", "tez", "storm", "falcon", "kafka","knox"
- ]
-
- # Filters used to identify processed
- PROC_FILTER = [
- "hadoop", "zookeeper"
- ]
-
- # Additional path patterns to find existing directory
- DIRNAME_PATTERNS = [
- "/tmp/hadoop-", "/tmp/hsperfdata_"
- ]
-
- # Default set of directories that are checked for existence of files and folders
- DEFAULT_DIRS = [
- "/etc", "/var/run", "/var/log", "/usr/lib", "/var/lib", "/var/tmp", "/tmp", "/var", "/hadoop"
- ]
-
- # Packages that are used to find repos (then repos are used to find other packages)
- PACKAGES = [
- "hadoop_2_2_*","hadoop-2-2-.*","zookeeper_2_2_*","zookeeper-2-2-.*",
- "hadoop", "zookeeper", "webhcat", "*-manager-server-db", "*-manager-daemons"
- ]
-
- # Additional packages to look for (search packages that start with these)
- ADDITIONAL_PACKAGES = [
- "rrdtool", "rrdtool-python", "ganglia", "gmond", "gweb", "libconfuse",
- "ambari-log4j", "hadoop", "zookeeper", "oozie", "webhcat"
- ]
-
- # ignore packages from repos whose names start with these strings
- IGNORE_PACKAGES_FROM_REPOS = [
- "ambari", "installed"
- ]
-
- # ignore required packages
- IGNORE_PACKAGES = [
- "epel-release"
- ]
-
- # ignore repos from the list of repos to be cleaned
- IGNORE_REPOS = [
- "ambari", "HDP-UTILS"
- ]
-
- # default timeout for async invoked processes
- TIMEOUT_SECONDS = 60
- RESULT_UNAVAILABLE = "unable_to_determine"
-
- DEFAULT_SERVICE_NAME = "ntpd"
- SERVICE_STATUS_CMD = "%s %s status" % (SERVICE_CMD, DEFAULT_SERVICE_NAME)
-
- THP_FILE = "/sys/kernel/mm/redhat_transparent_hugepage/enabled"
-
- event = threading.Event()
-
- current_umask = -1
-
- def __init__(self, config=None):
- self.packages = PackagesAnalyzer()
- self.config = config
- self.reportFileHandler = HostCheckReportFileHandler(config)
-
- def dirType(self, path):
- if not os.path.exists(path):
- return 'not_exist'
- elif os.path.islink(path):
- return 'sym_link'
- elif os.path.isdir(path):
- return 'directory'
- elif os.path.isfile(path):
- return 'file'
- return 'unknown'
-
- def hadoopVarRunCount(self):
- if not os.path.exists('/var/run/hadoop'):
- return 0
- pids = glob.glob('/var/run/hadoop/*/*.pid')
- return len(pids)
-
- def hadoopVarLogCount(self):
- if not os.path.exists('/var/log/hadoop'):
- return 0
- logs = glob.glob('/var/log/hadoop/*/*.log')
- return len(logs)
-
- def etcAlternativesConf(self, projects, etcResults):
- if not os.path.exists('/etc/alternatives'):
- return []
- projectRegex = "'" + '|'.join(projects) + "'"
- files = [f for f in os.listdir('/etc/alternatives') if re.match(projectRegex, f)]
- for conf in files:
- result = {}
- filePath = os.path.join('/etc/alternatives', conf)
- if os.path.islink(filePath):
- realConf = os.path.realpath(filePath)
- result['name'] = conf
- result['target'] = realConf
- etcResults.append(result)
-
- def checkLiveServices(self, services, result):
- osType = OSCheck.get_os_family()
- for service in services:
- svcCheckResult = {}
- if isinstance(service, dict):
- serviceName = service[osType]
- else:
- serviceName = service
-
- service_check_live = shlex.split(self.SERVICE_STATUS_CMD)
- service_check_live[1] = serviceName
-
- svcCheckResult['name'] = serviceName
- svcCheckResult['status'] = "UNKNOWN"
- svcCheckResult['desc'] = ""
- try:
- osStat = subprocess.Popen(service_check_live, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- out, err = osStat.communicate()
- if 0 != osStat.returncode:
- svcCheckResult['status'] = "Unhealthy"
- svcCheckResult['desc'] = out
- if len(out) == 0:
- svcCheckResult['desc'] = err
- else:
- svcCheckResult['status'] = "Healthy"
- except Exception, e:
- svcCheckResult['status'] = "Unhealthy"
- svcCheckResult['desc'] = repr(e)
- result.append(svcCheckResult)
-
- def checkUsers(self, users, results):
- f = open('/etc/passwd', 'r')
- for userLine in f:
- fields = userLine.split(":")
- if fields[0] in users:
- result = {}
- homeDir = fields[5]
- result['name'] = fields[0]
- result['homeDir'] = fields[5]
- result['status'] = "Available"
- if not os.path.exists(homeDir):
- result['status'] = "Invalid home directory"
- results.append(result)
-
- def osdiskAvailableSpace(self, path):
- diskInfo = {}
- try:
- df = subprocess.Popen(["df", "-kPT", path], stdout=subprocess.PIPE)
- dfdata = df.communicate()[0]
- return Hardware.extractMountInfo(dfdata.splitlines()[-1])
- except:
- pass
- return diskInfo
-
- def createAlerts(self, alerts):
- existingUsers = []
- self.checkUsers(self.DEFAULT_USERS, existingUsers)
- dirs = []
- self.checkFolders(self.DEFAULT_DIRS, self.DEFAULT_PROJECT_NAMES, existingUsers, dirs)
- alert = {
- 'name': 'host_alert',
- 'instance': None,
- 'service': 'AMBARI',
- 'component': 'host',
- 'host': hostname.hostname(self.config),
- 'state': 'OK',
- 'label': 'Disk space',
- 'text': 'Used disk space less than 80%'}
- message = ""
- mountinfoSet = []
- for dir in dirs:
- if dir["type"] == 'directory':
- mountinfo = self.osdiskAvailableSpace(dir['name'])
- if int(mountinfo["percent"].strip('%')) >= 80:
- if not mountinfo in mountinfoSet:
- mountinfoSet.append(mountinfo)
- message += str(dir['name']) + ";\n"
-
- if message != "":
- message = "These discs have low space:\n" + str(mountinfoSet) + "\n They include following critical directories:\n" + message
- alert['state'] = 'WARNING'
- alert['text'] = message
- alerts.append(alert)
- return alerts
-
- def checkFolders(self, basePaths, projectNames, existingUsers, dirs):
- foldersToIgnore = []
- for user in existingUsers:
- foldersToIgnore.append(user['homeDir'])
- try:
- for dirName in basePaths:
- for project in projectNames:
- path = os.path.join(dirName.strip(), project.strip())
- if not path in foldersToIgnore and os.path.exists(path):
- obj = {}
- obj['type'] = self.dirType(path)
- obj['name'] = path
- dirs.append(obj)
- except:
- pass
-
- def javaProcs(self, list):
- try:
- pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
- for pid in pids:
- cmd = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
- cmd = cmd.replace('\0', ' ')
- if not 'AmbariServer' in cmd:
- if 'java' in cmd:
- dict = {}
- dict['pid'] = int(pid)
- dict['hadoop'] = False
- for filter in self.PROC_FILTER:
- if filter in cmd:
- dict['hadoop'] = True
- dict['command'] = cmd.strip()
- for line in open(os.path.join('/proc', pid, 'status')):
- if line.startswith('Uid:'):
- uid = int(line.split()[1])
- dict['user'] = pwd.getpwuid(uid).pw_name
- list.append(dict)
- except:
- pass
- pass
-
- def getReposToRemove(self, repos, ignoreList):
- reposToRemove = []
- for repo in repos:
- addToRemoveList = True
- for ignoreRepo in ignoreList:
- if self.packages.nameMatch(ignoreRepo, repo):
- addToRemoveList = False
- continue
- if addToRemoveList:
- reposToRemove.append(repo)
- return reposToRemove
-
- def getUMask(self):
- if (self.current_umask == -1):
- self.current_umask = os.umask(self.current_umask)
- os.umask(self.current_umask)
- return self.current_umask
- else:
- return self.current_umask
-
- def getTransparentHugePage(self):
- # This file exist only on redhat 6
- thp_regex = "\[(.+)\]"
- if os.path.isfile(self.THP_FILE):
- with open(self.THP_FILE) as f:
- file_content = f.read()
- return re.search(thp_regex, file_content).groups()[0]
- else:
- return ""
-
- def checkIptables(self):
- return Firewall().getFirewallObject().check_iptables()
-
- """ Return various details about the host
- componentsMapped: indicates if any components are mapped to this host
- commandsInProgress: indicates if any commands are in progress
- """
- def register(self, dict, componentsMapped=True, commandsInProgress=True):
- dict['hostHealth'] = {}
-
- java = []
- self.javaProcs(java)
- dict['hostHealth']['activeJavaProcs'] = java
-
- liveSvcs = []
- self.checkLiveServices(self.DEFAULT_LIVE_SERVICES, liveSvcs)
- dict['hostHealth']['liveServices'] = liveSvcs
-
- dict['umask'] = str(self.getUMask())
-
- dict['transparentHugePage'] = self.getTransparentHugePage()
- dict['iptablesIsRunning'] = self.checkIptables()
- dict['reverseLookup'] = self.checkReverseLookup()
- # If commands are in progress or components are already mapped to this host
- # Then do not perform certain expensive host checks
- if componentsMapped or commandsInProgress:
- dict['existingRepos'] = [self.RESULT_UNAVAILABLE]
- dict['installedPackages'] = []
- dict['alternatives'] = []
- dict['stackFoldersAndFiles'] = []
- dict['existingUsers'] = []
-
- else:
- etcs = []
- self.etcAlternativesConf(self.DEFAULT_PROJECT_NAMES, etcs)
- dict['alternatives'] = etcs
-
- existingUsers = []
- self.checkUsers(self.DEFAULT_USERS, existingUsers)
- dict['existingUsers'] = existingUsers
-
- dirs = []
- self.checkFolders(self.DEFAULT_DIRS, self.DEFAULT_PROJECT_NAMES, existingUsers, dirs)
- dict['stackFoldersAndFiles'] = dirs
-
- installedPackages = []
- availablePackages = []
- self.packages.allInstalledPackages(installedPackages)
- self.packages.allAvailablePackages(availablePackages)
-
- repos = []
- self.packages.getInstalledRepos(self.PACKAGES, installedPackages + availablePackages,
- self.IGNORE_PACKAGES_FROM_REPOS, repos)
- packagesInstalled = self.packages.getInstalledPkgsByRepo(repos, self.IGNORE_PACKAGES, installedPackages)
- additionalPkgsInstalled = self.packages.getInstalledPkgsByNames(
- self.ADDITIONAL_PACKAGES, installedPackages)
- allPackages = list(set(packagesInstalled + additionalPkgsInstalled))
- dict['installedPackages'] = self.packages.getPackageDetails(installedPackages, allPackages)
-
- repos = self.getReposToRemove(repos, self.IGNORE_REPOS)
- dict['existingRepos'] = repos
-
- self.reportFileHandler.writeHostCheckFile(dict)
- pass
-
- # The time stamp must be recorded at the end
- dict['hostHealth']['agentTimeStampAtReporting'] = int(time.time() * 1000)
-
- pass
-
- def checkReverseLookup(self):
- """
- Check if host fqdn resolves to current host ip
- """
- try:
- host_name = socket.gethostname()
- host_ip = socket.gethostbyname(host_name)
- host_fqdn = socket.getfqdn()
- fqdn_ip = socket.gethostbyname(host_fqdn)
- return host_ip == fqdn_ip
- except socket.error:
- pass
- return False
-
-def main(argv=None):
- h = HostInfo()
- struct = {}
- h.register(struct)
- print struct
-
-
-if __name__ == '__main__':
- main()
+if platform.system() == "Windows":
+ import HostInfo_win
+ HostInfo = HostInfo_win.HostInfo
+else:
+ import HostInfo_linux
+ HostInfo = HostInfo_linux.HostInfo
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/HostInfo_linux.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HostInfo_linux.py b/ambari-agent/src/main/python/ambari_agent/HostInfo_linux.py
new file mode 100644
index 0000000..d172443
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/HostInfo_linux.py
@@ -0,0 +1,411 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import glob
+import logging
+import pwd
+import re
+import time
+import subprocess
+import threading
+import shlex
+import platform
+import hostname
+from PackagesAnalyzer import PackagesAnalyzer
+from HostCheckReportFileHandler import HostCheckReportFileHandler
+from Hardware import Hardware
+from ambari_commons import OSCheck, OSConst, Firewall
+import socket
+
+logger = logging.getLogger()
+
+# service cmd
+SERVICE_CMD = "service"
+
+
+class HostInfo:
+ # List of project names to be used to find alternatives folders etc.
+ DEFAULT_PROJECT_NAMES = [
+ "hadoop*", "hadoop", "hbase", "hcatalog", "hive", "ganglia",
+ "oozie", "sqoop", "hue", "zookeeper", "mapred", "hdfs", "flume",
+ "storm", "hive-hcatalog", "tez", "falcon", "ambari_qa", "hadoop_deploy",
+ "rrdcached", "hcat", "ambari-qa", "sqoop-ambari-qa", "sqoop-ambari_qa",
+ "webhcat", "hadoop-hdfs", "hadoop-yarn", "hadoop-mapreduce"
+ ]
+
+ # List of live services checked for on the host, takes a map of plan strings
+ DEFAULT_LIVE_SERVICES = [
+ {OSConst.REDHAT_FAMILY: "ntpd", OSConst.SUSE_FAMILY: "ntp", OSConst.UBUNTU_FAMILY: "ntp"}
+ ]
+
+ # Set of default users (need to be replaced with the configured user names)
+ DEFAULT_USERS = [
+ "hive", "ambari-qa", "oozie", "hbase", "hcat", "mapred",
+ "hdfs", "rrdcached", "zookeeper", "flume", "sqoop", "sqoop2",
+ "hue", "yarn", "tez", "storm", "falcon", "kafka","knox"
+ ]
+
+ # Filters used to identify processed
+ PROC_FILTER = [
+ "hadoop", "zookeeper"
+ ]
+
+ # Additional path patterns to find existing directory
+ DIRNAME_PATTERNS = [
+ "/tmp/hadoop-", "/tmp/hsperfdata_"
+ ]
+
+ # Default set of directories that are checked for existence of files and folders
+ DEFAULT_DIRS = [
+ "/etc", "/var/run", "/var/log", "/usr/lib", "/var/lib", "/var/tmp", "/tmp", "/var", "/hadoop"
+ ]
+
+ # Packages that are used to find repos (then repos are used to find other packages)
+ PACKAGES = [
+ "hadoop_2_2_*","hadoop-2-2-.*","zookeeper_2_2_*","zookeeper-2-2-.*",
+ "hadoop", "zookeeper", "webhcat", "*-manager-server-db", "*-manager-daemons"
+ ]
+
+ # Additional packages to look for (search packages that start with these)
+ ADDITIONAL_PACKAGES = [
+ "rrdtool", "rrdtool-python", "ganglia", "gmond", "gweb", "libconfuse",
+ "ambari-log4j", "hadoop", "zookeeper", "oozie", "webhcat"
+ ]
+
+ # ignore packages from repos whose names start with these strings
+ IGNORE_PACKAGES_FROM_REPOS = [
+ "ambari", "installed"
+ ]
+
+ # ignore required packages
+ IGNORE_PACKAGES = [
+ "epel-release"
+ ]
+
+ # ignore repos from the list of repos to be cleaned
+ IGNORE_REPOS = [
+ "ambari", "HDP-UTILS"
+ ]
+
+ # default timeout for async invoked processes
+ TIMEOUT_SECONDS = 60
+ RESULT_UNAVAILABLE = "unable_to_determine"
+
+ DEFAULT_SERVICE_NAME = "ntpd"
+ SERVICE_STATUS_CMD = "%s %s status" % (SERVICE_CMD, DEFAULT_SERVICE_NAME)
+
+ THP_FILE = "/sys/kernel/mm/redhat_transparent_hugepage/enabled"
+
+ event = threading.Event()
+
+ current_umask = -1
+
+ def __init__(self, config=None):
+ self.packages = PackagesAnalyzer()
+ self.config = config
+ self.reportFileHandler = HostCheckReportFileHandler(config)
+
+ def dirType(self, path):
+ if not os.path.exists(path):
+ return 'not_exist'
+ elif os.path.islink(path):
+ return 'sym_link'
+ elif os.path.isdir(path):
+ return 'directory'
+ elif os.path.isfile(path):
+ return 'file'
+ return 'unknown'
+
+ def hadoopVarRunCount(self):
+ if not os.path.exists('/var/run/hadoop'):
+ return 0
+ pids = glob.glob('/var/run/hadoop/*/*.pid')
+ return len(pids)
+
+ def hadoopVarLogCount(self):
+ if not os.path.exists('/var/log/hadoop'):
+ return 0
+ logs = glob.glob('/var/log/hadoop/*/*.log')
+ return len(logs)
+
+ def etcAlternativesConf(self, projects, etcResults):
+ if not os.path.exists('/etc/alternatives'):
+ return []
+ projectRegex = "'" + '|'.join(projects) + "'"
+ files = [f for f in os.listdir('/etc/alternatives') if re.match(projectRegex, f)]
+ for conf in files:
+ result = {}
+ filePath = os.path.join('/etc/alternatives', conf)
+ if os.path.islink(filePath):
+ realConf = os.path.realpath(filePath)
+ result['name'] = conf
+ result['target'] = realConf
+ etcResults.append(result)
+
+ def checkLiveServices(self, services, result):
+ osType = OSCheck.get_os_family()
+ for service in services:
+ svcCheckResult = {}
+ if isinstance(service, dict):
+ serviceName = service[osType]
+ else:
+ serviceName = service
+
+ service_check_live = shlex.split(self.SERVICE_STATUS_CMD)
+ service_check_live[1] = serviceName
+
+ svcCheckResult['name'] = serviceName
+ svcCheckResult['status'] = "UNKNOWN"
+ svcCheckResult['desc'] = ""
+ try:
+ osStat = subprocess.Popen(service_check_live, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ if 0 != osStat.returncode:
+ svcCheckResult['status'] = "Unhealthy"
+ svcCheckResult['desc'] = out
+ if len(out) == 0:
+ svcCheckResult['desc'] = err
+ else:
+ svcCheckResult['status'] = "Healthy"
+ except Exception, e:
+ svcCheckResult['status'] = "Unhealthy"
+ svcCheckResult['desc'] = repr(e)
+ result.append(svcCheckResult)
+
+ def checkUsers(self, users, results):
+ f = open('/etc/passwd', 'r')
+ for userLine in f:
+ fields = userLine.split(":")
+ if fields[0] in users:
+ result = {}
+ homeDir = fields[5]
+ result['name'] = fields[0]
+ result['homeDir'] = fields[5]
+ result['status'] = "Available"
+ if not os.path.exists(homeDir):
+ result['status'] = "Invalid home directory"
+ results.append(result)
+
+ def osdiskAvailableSpace(self, path):
+ diskInfo = {}
+ try:
+ df = subprocess.Popen(["df", "-kPT", path], stdout=subprocess.PIPE)
+ dfdata = df.communicate()[0]
+ return Hardware.extractMountInfo(dfdata.splitlines()[-1])
+ except:
+ pass
+ return diskInfo
+
+ def createAlerts(self, alerts):
+ existingUsers = []
+ self.checkUsers(self.DEFAULT_USERS, existingUsers)
+ dirs = []
+ self.checkFolders(self.DEFAULT_DIRS, self.DEFAULT_PROJECT_NAMES, existingUsers, dirs)
+ alert = {
+ 'name': 'host_alert',
+ 'instance': None,
+ 'service': 'AMBARI',
+ 'component': 'host',
+ 'host': hostname.hostname(self.config),
+ 'state': 'OK',
+ 'label': 'Disk space',
+ 'text': 'Used disk space less than 80%'}
+ message = ""
+ mountinfoSet = []
+ for dir in dirs:
+ if dir["type"] == 'directory':
+ mountinfo = self.osdiskAvailableSpace(dir['name'])
+ if int(mountinfo["percent"].strip('%')) >= 80:
+ if not mountinfo in mountinfoSet:
+ mountinfoSet.append(mountinfo)
+ message += str(dir['name']) + ";\n"
+
+ if message != "":
+ message = "These discs have low space:\n" + str(mountinfoSet) + "\n They include following critical directories:\n" + message
+ alert['state'] = 'WARNING'
+ alert['text'] = message
+ alerts.append(alert)
+ return alerts
+
+ def checkFolders(self, basePaths, projectNames, existingUsers, dirs):
+ foldersToIgnore = []
+ for user in existingUsers:
+ foldersToIgnore.append(user['homeDir'])
+ try:
+ for dirName in basePaths:
+ for project in projectNames:
+ path = os.path.join(dirName.strip(), project.strip())
+ if not path in foldersToIgnore and os.path.exists(path):
+ obj = {}
+ obj['type'] = self.dirType(path)
+ obj['name'] = path
+ dirs.append(obj)
+ except:
+ pass
+
+ def javaProcs(self, list):
+ try:
+ pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
+ for pid in pids:
+ cmd = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
+ cmd = cmd.replace('\0', ' ')
+ if not 'AmbariServer' in cmd:
+ if 'java' in cmd:
+ dict = {}
+ dict['pid'] = int(pid)
+ dict['hadoop'] = False
+ for filter in self.PROC_FILTER:
+ if filter in cmd:
+ dict['hadoop'] = True
+ dict['command'] = cmd.strip()
+ for line in open(os.path.join('/proc', pid, 'status')):
+ if line.startswith('Uid:'):
+ uid = int(line.split()[1])
+ dict['user'] = pwd.getpwuid(uid).pw_name
+ list.append(dict)
+ except:
+ pass
+ pass
+
+ def getReposToRemove(self, repos, ignoreList):
+ reposToRemove = []
+ for repo in repos:
+ addToRemoveList = True
+ for ignoreRepo in ignoreList:
+ if self.packages.nameMatch(ignoreRepo, repo):
+ addToRemoveList = False
+ continue
+ if addToRemoveList:
+ reposToRemove.append(repo)
+ return reposToRemove
+
+ def getUMask(self):
+ if (self.current_umask == -1):
+ self.current_umask = os.umask(self.current_umask)
+ os.umask(self.current_umask)
+ return self.current_umask
+ else:
+ return self.current_umask
+
+ def getTransparentHugePage(self):
+ # This file exist only on redhat 6
+ thp_regex = "\[(.+)\]"
+ if os.path.isfile(self.THP_FILE):
+ with open(self.THP_FILE) as f:
+ file_content = f.read()
+ return re.search(thp_regex, file_content).groups()[0]
+ else:
+ return ""
+
+ def checkIptables(self):
+ return Firewall().getFirewallObject().check_iptables()
+
+ """ Return various details about the host
+ componentsMapped: indicates if any components are mapped to this host
+ commandsInProgress: indicates if any commands are in progress
+ """
+ def register(self, dict, componentsMapped=True, commandsInProgress=True):
+ dict['hostHealth'] = {}
+
+ java = []
+ self.javaProcs(java)
+ dict['hostHealth']['activeJavaProcs'] = java
+
+ liveSvcs = []
+ self.checkLiveServices(self.DEFAULT_LIVE_SERVICES, liveSvcs)
+ dict['hostHealth']['liveServices'] = liveSvcs
+
+ dict['umask'] = str(self.getUMask())
+
+ dict['transparentHugePage'] = self.getTransparentHugePage()
+ dict['iptablesIsRunning'] = self.checkIptables()
+ dict['reverseLookup'] = self.checkReverseLookup()
+ # If commands are in progress or components are already mapped to this host
+ # Then do not perform certain expensive host checks
+ if componentsMapped or commandsInProgress:
+ dict['existingRepos'] = [self.RESULT_UNAVAILABLE]
+ dict['installedPackages'] = []
+ dict['alternatives'] = []
+ dict['stackFoldersAndFiles'] = []
+ dict['existingUsers'] = []
+
+ else:
+ etcs = []
+ self.etcAlternativesConf(self.DEFAULT_PROJECT_NAMES, etcs)
+ dict['alternatives'] = etcs
+
+ existingUsers = []
+ self.checkUsers(self.DEFAULT_USERS, existingUsers)
+ dict['existingUsers'] = existingUsers
+
+ dirs = []
+ self.checkFolders(self.DEFAULT_DIRS, self.DEFAULT_PROJECT_NAMES, existingUsers, dirs)
+ dict['stackFoldersAndFiles'] = dirs
+
+ installedPackages = []
+ availablePackages = []
+ self.packages.allInstalledPackages(installedPackages)
+ self.packages.allAvailablePackages(availablePackages)
+
+ repos = []
+ self.packages.getInstalledRepos(self.PACKAGES, installedPackages + availablePackages,
+ self.IGNORE_PACKAGES_FROM_REPOS, repos)
+ packagesInstalled = self.packages.getInstalledPkgsByRepo(repos, self.IGNORE_PACKAGES, installedPackages)
+ additionalPkgsInstalled = self.packages.getInstalledPkgsByNames(
+ self.ADDITIONAL_PACKAGES, installedPackages)
+ allPackages = list(set(packagesInstalled + additionalPkgsInstalled))
+ dict['installedPackages'] = self.packages.getPackageDetails(installedPackages, allPackages)
+
+ repos = self.getReposToRemove(repos, self.IGNORE_REPOS)
+ dict['existingRepos'] = repos
+
+ self.reportFileHandler.writeHostCheckFile(dict)
+ pass
+
+ # The time stamp must be recorded at the end
+ dict['hostHealth']['agentTimeStampAtReporting'] = int(time.time() * 1000)
+
+ pass
+
+ def checkReverseLookup(self):
+ """
+ Check if host fqdn resolves to current host ip
+ """
+ try:
+ host_name = socket.gethostname()
+ host_ip = socket.gethostbyname(host_name)
+ host_fqdn = socket.getfqdn()
+ fqdn_ip = socket.gethostbyname(host_fqdn)
+ return host_ip == fqdn_ip
+ except socket.error:
+ pass
+ return False
+
+def main(argv=None):
+ h = HostInfo()
+ struct = {}
+ h.register(struct)
+ print struct
+
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/HostInfo_win.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HostInfo_win.py b/ambari-agent/src/main/python/ambari_agent/HostInfo_win.py
new file mode 100644
index 0000000..6ac987f
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/HostInfo_win.py
@@ -0,0 +1,231 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import logging
+import time
+import subprocess
+from HostCheckReportFileHandler import HostCheckReportFileHandler
+from shell import shellRunner
+from ambari_commons.os_check import OSCheck, OSConst
+from ambari_commons.os_windows import run_powershell_script, CHECK_FIREWALL_SCRIPT
+import socket
+
+logger = logging.getLogger()
+
+# OS info
+OS_VERSION = OSCheck().get_os_major_version()
+OS_TYPE = OSCheck.get_os_type()
+OS_FAMILY = OSCheck.get_os_family()
+
+class HostInfo:
+ # List of live services checked for on the host, takes a map of plan strings
+ DEFAULT_LIVE_SERVICES = [
+ {OSConst.WINSRV_FAMILY: "W32Time"}
+ ]
+
+ # Set of default users (need to be replaced with the configured user names)
+ DEFAULT_USERS = [
+ "hive", "ambari-qa", "oozie", "hbase", "hcat", "mapred",
+ "hdfs", "rrdcached", "zookeeper", "flume", "sqoop", "sqoop2",
+ "hue", "yarn"
+ ]
+
+ # Filters used to identify processed
+ PROC_FILTER = [
+ "hadoop", "zookeeper"
+ ]
+
+ RESULT_UNAVAILABLE = "unable_to_determine"
+
+ SERVICE_STATUS_CMD = 'If ((Get-Service | Where-Object {{$_.Name -eq \'{0}\'}}).Status -eq \'Running\') {{echo "Running"; $host.SetShouldExit(0)}} Else {{echo "Stopped"; $host.SetShouldExit(1)}}'
+ GET_USERS_CMD = '$accounts=(Get-WmiObject -Class Win32_UserAccount -Namespace "root\cimv2" -Filter "LocalAccount=\'$True\'" -ComputerName "LocalHost" -ErrorAction Stop); foreach ($acc in $accounts) {echo $acc.Name}'
+ GET_JAVA_PROC_CMD = 'foreach ($process in (gwmi Win32_Process -Filter "name = \'java.exe\'")){echo $process.ProcessId;echo $process.CommandLine; echo $process.GetOwner().User}'
+
+ current_umask = -1
+
+ def __init__(self, config=None):
+ self.reportFileHandler = HostCheckReportFileHandler(config)
+
+ def dirType(self, path):
+ if not os.path.exists(path):
+ return 'not_exist'
+ elif os.path.islink(path):
+ return 'sym_link'
+ elif os.path.isdir(path):
+ return 'directory'
+ elif os.path.isfile(path):
+ return 'file'
+ return 'unknown'
+
+ def checkLiveServices(self, services, result):
+ osType = OSCheck.get_os_family()
+ for service in services:
+ svcCheckResult = {}
+ if isinstance(service, dict):
+ serviceName = service[osType]
+ else:
+ serviceName = service
+
+ service_check_live = ["powershell",'-noProfile', '-NonInteractive', '-nologo', "-Command", self.SERVICE_STATUS_CMD.format(serviceName)]
+ svcCheckResult['name'] = serviceName
+ svcCheckResult['status'] = "UNKNOWN"
+ svcCheckResult['desc'] = ""
+ try:
+ osStat = subprocess.Popen(service_check_live, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ if 0 != osStat.returncode:
+ svcCheckResult['status'] = "Unhealthy"
+ svcCheckResult['desc'] = out
+ if len(out) == 0:
+ svcCheckResult['desc'] = err
+ else:
+ svcCheckResult['status'] = "Healthy"
+ except Exception, e:
+ svcCheckResult['status'] = "Unhealthy"
+ svcCheckResult['desc'] = repr(e)
+ result.append(svcCheckResult)
+
+ #TODO get user directory
+ def checkUsers(self, users, results):
+ get_users_cmd = ["powershell",'-noProfile', '-NonInteractive', '-nologo', "-Command", self.GET_USERS_CMD]
+ try:
+ osStat = subprocess.Popen(get_users_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ except:
+ raise Exception("Failed to get users.")
+ for user in out.split(os.linesep):
+ if user in users:
+ result = {}
+ result['name'] = user
+ result['status'] = "Available"
+ results.append(result)
+
+ def createAlerts(self, alerts):
+ #TODO AMBARI-7849 Implement createAlerts for Windows
+ return alerts
+
+ def javaProcs(self, list):
+ try:
+ runner = shellRunner()
+ command_result = runner.run(["powershell",'-noProfile', '-NonInteractive', '-nologo', "-Command", self.GET_JAVA_PROC_CMD])
+ if command_result["exitCode"] == 0:
+ splitted_output = command_result["output"].split(os.linesep)
+ for i in [index for index in range(0,len(splitted_output)) if (index % 3)==0]:
+ pid = splitted_output[i]
+ cmd = splitted_output[i+1]
+ user = splitted_output[i+2]
+ if not 'AmbariServer' in cmd:
+ if 'java' in cmd:
+ dict = {}
+ dict['pid'] = int(pid)
+ dict['hadoop'] = False
+ for filter in self.PROC_FILTER:
+ if filter in cmd:
+ dict['hadoop'] = True
+ dict['command'] = cmd.strip()
+ dict['user'] = user
+ list.append(dict)
+ except Exception as e:
+ pass
+ pass
+
+ def getUMask(self):
+ if (self.current_umask == -1):
+ self.current_umask = os.umask(self.current_umask)
+ os.umask(self.current_umask)
+ return self.current_umask
+ else:
+ return self.current_umask
+
+ def checkIptables(self):
+ out = run_powershell_script(CHECK_FIREWALL_SCRIPT)
+ if out[0] != 0:
+ logger.warn("Unable to check firewall status:{0}".format(out[2]))
+ return False
+ profiles_status = [i for i in out[1].split("\n") if not i == ""]
+ if "1" in profiles_status:
+ return True
+ return False
+
+ """ Return various details about the host
+ componentsMapped: indicates if any components are mapped to this host
+ commandsInProgress: indicates if any commands are in progress
+ """
+ def register(self, dict, componentsMapped=True, commandsInProgress=True):
+ dict['hostHealth'] = {}
+
+ java = []
+ self.javaProcs(java)
+ dict['hostHealth']['activeJavaProcs'] = java
+
+ liveSvcs = []
+ self.checkLiveServices(self.DEFAULT_LIVE_SERVICES, liveSvcs)
+ dict['hostHealth']['liveServices'] = liveSvcs
+
+ dict['umask'] = str(self.getUMask())
+
+ dict['iptablesIsRunning'] = self.checkIptables()
+ dict['reverseLookup'] = self.checkReverseLookup()
+ # If commands are in progress or components are already mapped to this host
+ # Then do not perform certain expensive host checks
+ if componentsMapped or commandsInProgress:
+ dict['existingRepos'] = [self.RESULT_UNAVAILABLE]
+ dict['installedPackages'] = []
+ dict['alternatives'] = []
+ dict['stackFoldersAndFiles'] = []
+ dict['existingUsers'] = []
+ else:
+ existingUsers = []
+ self.checkUsers(self.DEFAULT_USERS, existingUsers)
+ dict['existingUsers'] = existingUsers
+ #TODO check HDP stack and folders here
+ self.reportFileHandler.writeHostCheckFile(dict)
+ pass
+
+ # The time stamp must be recorded at the end
+ dict['hostHealth']['agentTimeStampAtReporting'] = int(time.time() * 1000)
+
+ pass
+
+ def checkReverseLookup(self):
+ """
+ Check if host fqdn resolves to current host ip
+ """
+ try:
+ host_name = socket.gethostname().lower()
+ host_ip = socket.gethostbyname(host_name)
+ host_fqdn = socket.getfqdn().lower()
+ fqdn_ip = socket.gethostbyname(host_fqdn)
+ return host_ip == fqdn_ip
+ except socket.error:
+ pass
+ return False
+
+def main(argv=None):
+ h = HostInfo()
+ struct = {}
+ h.register(struct)
+ print struct
+
+
+if __name__ == '__main__':
+ main()
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
index 49189c8..ec01ee7 100644
--- a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
+++ b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
@@ -65,7 +65,7 @@ class LiveStatus:
status = self.DEAD_STATUS # CLIENT components can't have status STARTED
elif component in self.COMPONENTS:
statusCheck = StatusCheck(AmbariConfig.servicesToPidNames,
- AmbariConfig.pidPathesVars, self.globalConfig,
+ AmbariConfig.pidPathVars, self.globalConfig,
AmbariConfig.servicesToLinuxUser)
serviceStatus = statusCheck.getStatus(self.component)
if serviceStatus is None:
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/NetUtil.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/NetUtil.py b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
index 3ce981a..79d5343 100644
--- a/ambari-agent/src/main/python/ambari_agent/NetUtil.py
+++ b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
@@ -15,10 +15,10 @@
from urlparse import urlparse
-import time
import logging
import httplib
from ssl import SSLError
+import platform
ERROR_SSL_WRONG_VERSION = "SSLError: Failed to connect. Please check openssl library versions. \n" +\
"Refer to: https://bugzilla.redhat.com/show_bug.cgi?id=1022468 for more details."
@@ -39,6 +39,23 @@ class NetUtil:
# For testing purposes
DEBUG_STOP_RETRIES_FLAG = False
+ # Stop implementation
+ # Typically, it waits for a certain time for the daemon/service to receive the stop signal.
+ # Received the number of seconds to wait as an argument
+ # Returns true if the application is stopping, false if continuing execution
+ stopCallback = None
+
+ def __init__(self, stop_callback=None):
+ if stop_callback is None:
+ IS_WINDOWS = platform.system() == "Windows"
+ if IS_WINDOWS:
+ from HeartbeatHandlers_windows import HeartbeatStopHandler
+ else:
+ from HeartbeatStopHandler_linux import HeartbeatStopHandler
+ stop_callback = HeartbeatStopHandler()
+
+ self.stopCallback = stop_callback
+
def checkURL(self, url):
"""Try to connect to a given url. Result is True if url returns HTTP code 200, in any other case
(like unreachable server or wrong HTTP code) result will be False.
@@ -78,6 +95,7 @@ class NetUtil:
Returns count of retries
"""
+ connected = False
if logger is not None:
logger.debug("Trying to connect to %s", server_url)
@@ -85,11 +103,17 @@ class NetUtil:
while (max_retries == -1 or retries < max_retries) and not self.DEBUG_STOP_RETRIES_FLAG:
server_is_up, responseBody = self.checkURL(self.SERVER_STATUS_REQUEST.format(server_url))
if server_is_up:
+ connected = True
break
else:
if logger is not None:
logger.warn('Server at {0} is not reachable, sleeping for {1} seconds...'.format(server_url,
self.CONNECT_SERVER_RETRY_INTERVAL_SEC))
retries += 1
- time.sleep(self.CONNECT_SERVER_RETRY_INTERVAL_SEC)
- return retries
+
+ if 0 == self.stopCallback.wait(self.CONNECT_SERVER_RETRY_INTERVAL_SEC):
+ #stop waiting
+ if logger is not None:
+ logger.info("Stop event received")
+ self.DEBUG_STOP_RETRIES_FLAG = True
+ return retries, connected
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py b/ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py
index 7dabe7c..062a1e7 100644
--- a/ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py
+++ b/ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py
@@ -77,7 +77,7 @@ class PackagesAnalyzer:
# All installed packages in systems supporting yum
def allInstalledPackages(self, allInstalledPackages):
osType = OSCheck.get_os_family()
-
+
if osType == OSConst.SUSE_FAMILY:
return self.lookUpZypperPackages(
["zypper", "search", "--installed-only", "--details"],
@@ -90,11 +90,11 @@ class PackagesAnalyzer:
elif osType == OSConst.UBUNTU_FAMILY:
return self.lookUpAptPackages(
LIST_INSTALLED_PACKAGES_UBUNTU,
- allInstalledPackages)
+ allInstalledPackages)
def allAvailablePackages(self, allAvailablePackages):
osType = OSCheck.get_os_family()
-
+
if osType == OSConst.SUSE_FAMILY:
return self.lookUpZypperPackages(
["zypper", "search", "--uninstalled-only", "--details"],
@@ -107,16 +107,16 @@ class PackagesAnalyzer:
elif osType == OSConst.UBUNTU_FAMILY:
return self.lookUpAptPackages(
LIST_AVAILABLE_PACKAGES_UBUNTU,
- allAvailablePackages)
-
- def lookUpAptPackages(self, command, allPackages):
+ allAvailablePackages)
+
+ def lookUpAptPackages(self, command, allPackages):
try:
result = self.subprocessWithTimeout(command)
if 0 == result['retCode']:
for x in result['out'].split('\n'):
if x.strip():
allPackages.append(x.split(' '))
-
+
except:
pass
http://git-wip-us.apache.org/repos/asf/ambari/blob/8de3425f/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index d504183..2a8fa5a 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -23,9 +23,10 @@ import os
import subprocess
import pprint
import threading
+import platform
from threading import Thread
import time
-from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
+from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
from Grep import Grep
import shell, sys
@@ -58,7 +59,7 @@ class PythonExecutor:
tmpout = open(tmpoutfile, 'a')
tmperr = open(tmperrfile, 'a')
return tmpout, tmperr
-
+
def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile,
timeout, tmpstructedoutfile, logger_level, callback, task_id,
override_output_files = True, handle = None):
@@ -84,7 +85,7 @@ class PythonExecutor:
logger.info("Running command " + pprint.pformat(pythonCommand))
if(handle == None) :
tmpout, tmperr = self.open_subporcess_files(tmpoutfile, tmperrfile, override_output_files)
-
+
process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
# map task_id to pid
callback(task_id, process.pid)
@@ -100,7 +101,7 @@ class PythonExecutor:
return self.prepare_process_result(process, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout)
else:
holder = Holder(pythonCommand, tmpoutfile, tmperrfile, tmpstructedoutfile, handle)
-
+
background = BackgroundThread(holder, self)
background.start()
return {"exitcode": 777}
@@ -117,7 +118,7 @@ class PythonExecutor:
result = self.condenseOutput(out, error, returncode, structured_out)
logger.info("Result: %s" % result)
return result
-
+
def read_result_from_files(self, out_path, err_path, structured_out_path):
out = open(out_path, 'r').read()
error = open(err_path, 'r').read()
@@ -134,21 +135,23 @@ class PythonExecutor:
else:
structured_out = {}
return out, error, structured_out
-
+
def launch_python_subprocess(self, command, tmpout, tmperr):
"""
Creates subprocess with given parameters. This functionality was moved to separate method
to make possible unit testing
"""
+ close_fds = None if platform.system() == "Windows" else True
return subprocess.Popen(command,
stdout=tmpout,
- stderr=tmperr, close_fds=True)
-
+ stderr=tmperr, close_fds=close_fds)
+
def isSuccessfull(self, returncode):
return not self.python_process_has_been_killed and returncode == 0
def python_command(self, script, script_params):
- python_binary = sys.executable
+ #we need manually pass python executable on windows because sys.executable will return service wrapper
+ python_binary = os.environ['PYTHON_EXE'] if 'PYTHON_EXE' in os.environ else sys.executable
python_command = [python_binary, script] + script_params
return python_command
@@ -180,31 +183,29 @@ class Holder:
self.err_file = err_file
self.structured_out_file = structured_out_file
self.handle = handle
-
+
class BackgroundThread(threading.Thread):
def __init__(self, holder, pythonExecutor):
threading.Thread.__init__(self)
self.holder = holder
self.pythonExecutor = pythonExecutor
-
+
def run(self):
process_out, process_err = self.pythonExecutor.open_subporcess_files(self.holder.out_file, self.holder.err_file, True)
-
+
logger.info("Starting process command %s" % self.holder.command)
process = self.pythonExecutor.launch_python_subprocess(self.holder.command, process_out, process_err)
-
+
logger.info("Process has been started. Pid = %s" % process.pid)
-
+
self.holder.handle.pid = process.pid
self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS
self.holder.handle.on_background_command_started(self.holder.handle.command['taskId'], process.pid)
-
+
process.communicate()
-
+
self.holder.handle.exitCode = process.returncode
process_condenced_result = self.pythonExecutor.prepare_process_result(process, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
logger.info("Calling callback with args %s" % process_condenced_result)
self.holder.handle.on_background_command_complete_callback(process_condenced_result, self.holder.handle)
logger.info("Exiting from thread for holder pid %s" % self.holder.handle.pid)
-
-