You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2009/06/04 18:11:58 UTC

svn commit: r781819 - in /incubator/tashi/trunk: etc/ src/tashi/ src/tashi/agents/ src/tashi/clustermanager/ src/tashi/messaging/ src/tashi/nodemanager/

Author: mryan3
Date: Thu Jun  4 18:11:57 2009
New Revision: 781819

URL: http://svn.apache.org/viewvc?rev=781819&view=rev
Log:
Messaging and ganglia code cleanup

A new class implements a publishing interface (GangliaPublisher)
A new log handler uses the publishing interface (MessagingLogHandler)
Older code that explicitly used Notification and the messaging services was cleaned up


Added:
    incubator/tashi/trunk/src/tashi/messaging/gangliapublisher.py
    incubator/tashi/trunk/src/tashi/messaging/messagingloghandler.py
Removed:
    incubator/tashi/trunk/src/tashi/ganglialoghandler.py
    incubator/tashi/trunk/src/tashi/nodemanager/notification.py
Modified:
    incubator/tashi/trunk/etc/TashiDefaults.cfg
    incubator/tashi/trunk/src/tashi/__init__.py
    incubator/tashi/trunk/src/tashi/agents/primitive.py
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/trunk/src/tashi/messaging/__init__.py
    incubator/tashi/trunk/src/tashi/nodemanager/__init__.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py

Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/trunk/etc/TashiDefaults.cfg Thu Jun  4 18:11:57 2009
@@ -20,6 +20,7 @@
 service = tashi.clustermanager.ClusterManagerService
 data = tashi.clustermanager.data.GetentOverride
 dfs = tashi.dfs.Vfs
+publisher = tashi.messaging.GangliaPublisher
 nodeManagerPort = 9883
 
 [ClusterManagerService]
@@ -76,6 +77,7 @@
 infoFile = /var/tmp/nm.dat
 clusterManagerHost = localhost
 clusterManagerPort = 9882
+statsInterval = 0.0
 ;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [Qemu]
@@ -115,6 +117,7 @@
 hook1 = tashi.agents.DhcpDns
 scheduleDelay = 2.0
 densePack = False
+publisher = tashi.messaging.GangliaPublisher
 
 [DhcpDns]
 dnsKeyFile = /location/of/private/key/for/dns
@@ -127,13 +130,17 @@
 ipRange1 = 172.16.128.2-172.16.255.254
 reverseDns = True
 
+[GangliaPublisher]
+dmax = 60
+retry = 3600
+
 # Logging stuff
-# Switch the "keys" and "handlers" variables below to output log data to ganglia metrics
+# Switch the "keys" and "handlers" variables below to output log data to the publisher
 [loggers]
 keys = root	
 
 [handlers]
-#keys = consoleHandler,gangliaHandler,fileHandler
+#keys = consoleHandler,publisherHandler,fileHandler
 keys = consoleHandler
 
 [formatters]
@@ -141,7 +148,7 @@
 
 [logger_root]
 level = DEBUG
-#handlers = consoleHandler,gangliaHandler,fileHandler
+#handlers = consoleHandler,publisherHandler,fileHandler
 handlers = consoleHandler
 propagate = 1
 	
@@ -151,11 +158,11 @@
 formatter = standardFormatter
 args = (sys.stdout,)
 
-[handler_gangliaHandler]
-class = tashi.GangliaLogHandler
+[handler_publisherHandler]
+class = tashi.messaging.MessagingLogHandler
 level = NOTSET
 formatter = standardFormatter
-args = (60, 3600)
+args = ()
 
 [handler_fileHandler]
 class = FileHandler

Modified: incubator/tashi/trunk/src/tashi/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/__init__.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/__init__.py (original)
+++ incubator/tashi/trunk/src/tashi/__init__.py Thu Jun  4 18:11:57 2009
@@ -18,4 +18,3 @@
 from util import *
 from connectionmanager import ConnectionManager
 from version import version
-from ganglialoghandler import GangliaLogHandler

Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Thu Jun  4 18:11:57 2009
@@ -27,6 +27,7 @@
 
 from tashi.services.ttypes import *
 from tashi.util import getConfig, createClient, instantiateImplementation, boolean
+import tashi
 
 class Primitive(object):
 	def __init__(self, config, client, transport):
@@ -134,6 +135,8 @@
 
 def main():
 	(config, configFiles) = getConfig(["Agent"])
+	publisher = instantiateImplementation(config.get("Primitive", "publisher"), config)
+	tashi.publisher = publisher
 	(client, transport) = createClient(config)
 	logging.config.fileConfig(configFiles)
 	agent = Primitive(config, client, transport)

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py Thu Jun  4 18:11:57 2009
@@ -27,20 +27,13 @@
 from thrift.transport.TSocket import TServerSocket
 from thrift.server.TServer import TThreadedServer
 
-from tashi.messaging.thriftmessaging import MessageBrokerThrift
-from tashi.messaging.tashimessaging import TashiLogHandler
 from tashi.services import clustermanagerservice
 from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
+import tashi
 
 def startClusterManager(config):
 	global service, data
 	
-	# start the event broker
-	broker = MessageBrokerThrift(int(config.get('MessageBroker', 'port')))
-	broker.ready.wait()
-	messageHandler = TashiLogHandler(config)
-	log.addHandler(messageHandler)
-
 	dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
 	data = instantiateImplementation(config.get("ClusterManager", "data"), config)
 	service = instantiateImplementation(config.get("ClusterManager", "service"), config, data, dfs)
@@ -65,6 +58,8 @@
 	
 	# setup configuration and logging
 	(config, configFiles) = getConfig(["ClusterManager"])
+	publisher = instantiateImplementation(config.get("ClusterManager", "publisher"), config)
+	tashi.publisher = publisher
 	logging.config.fileConfig(configFiles)
 	log = logging.getLogger(__file__)
 	log.info('Using configuration file(s) %s' % configFiles)

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Thu Jun  4 18:11:57 2009
@@ -25,8 +25,6 @@
 import threading
 import time
 
-from tashi.messaging.thriftmessaging import MessageBrokerThrift
-from tashi.messaging.tashimessaging import TashiLogHandler
 from tashi.services.ttypes import Errors, InstanceState, HostState, TashiException
 from tashi.services import nodemanagerservice
 from tashi import boolean, convertExceptions, ConnectionManager, vmStates, timed, version, scrubString
@@ -44,8 +42,6 @@
 		self.proxy = ConnectionManager(nodemanagerservice.Client, int(self.config.get('ClusterManager', 'nodeManagerPort')))
 		self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
 		self.log = logging.getLogger(__name__)
-		self.messageHandler = TashiLogHandler(config)
-		self.log.addHandler(self.messageHandler)
 		self.lastContacted = {}
 		self.decayedHosts = {}
 		self.decayedInstances = {}

Modified: incubator/tashi/trunk/src/tashi/messaging/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/messaging/__init__.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/messaging/__init__.py (original)
+++ incubator/tashi/trunk/src/tashi/messaging/__init__.py Thu Jun  4 18:11:57 2009
@@ -15,3 +15,5 @@
 # specific language governing permissions and limitations
 # under the License.    
 
+from messagingloghandler import MessagingLogHandler
+from gangliapublisher import GangliaPublisher

Added: incubator/tashi/trunk/src/tashi/messaging/gangliapublisher.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/messaging/gangliapublisher.py?rev=781819&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/messaging/gangliapublisher.py (added)
+++ incubator/tashi/trunk/src/tashi/messaging/gangliapublisher.py Thu Jun  4 18:11:57 2009
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.    
+
+import os
+import time
+import types
+
+from tashi import scrubString
+
+class GangliaPublisher(object):
+	def __init__(self, config):
+		self.disable = False
+		self.disableAt = 0.0
+		self.retry = float(config.get("GangliaPublisher", "retry"))
+		self.dmax = float(config.get("GangliaPublisher", "dmax"))
+
+	def publish(self, message):
+		for (key, val) in message.iteritems():
+			if (self.disable):
+				if (time.time() - self.disableAt > self.retry):
+					disable = False
+				else:
+					return
+			key = scrubString(str(key))
+			val = str(val)
+			metricName = "tashi_%s" % (key)
+			val = val.replace('"', "'")
+			val = val.replace('<', '&lt;')
+			val = val.replace('>', '&gt;')
+			metricValue = val
+			metricType = "string"
+			try:
+				metricValue = float(metricValue)
+				metricType = "float"
+				metricValue = "%3.3f" % (metricValue)
+			except:
+				pass
+			cmd = 'gmetric -n "%s" -v "%s" -t "%s" -d "%d"' % (metricName, metricValue, metricType, self.dmax)
+			(stdin, stdout) = os.popen4(cmd)
+			stdin.close()
+			res = stdout.read()
+			stdout.close()
+			if (res != ""):
+				self.disable = True
+				self.disableAt = time.time()
+				print "Failed to exec gmetric, disabling: %s" % (res)
+	
+	def publishList(self, messages):
+		for message in messages:
+			self.publish(message)

Added: incubator/tashi/trunk/src/tashi/messaging/messagingloghandler.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/messaging/messagingloghandler.py?rev=781819&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/messaging/messagingloghandler.py (added)
+++ incubator/tashi/trunk/src/tashi/messaging/messagingloghandler.py Thu Jun  4 18:11:57 2009
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.    
+
+import logging
+import os.path
+import sys
+import time
+
+import tashi
+
+logging.tashi = tashi
+
+class MessagingLogHandler(logging.Handler):
+	def __init__(self):
+		logging.Handler.__init__(self)
+		self.name = os.path.basename(sys.argv[0])
+		self.msgIndex = 0
+	
+	def emit(self, record):
+		try:
+			key = "log_%s_%d_%d" % (self.name, self.msgIndex, int(time.time()*1000))
+			val = self.format(record)
+			tashi.publisher.publish({key:val})
+			self.msgIndex = self.msgIndex + 1
+		except Exception, e:
+			print e

Modified: incubator/tashi/trunk/src/tashi/nodemanager/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/__init__.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/__init__.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/__init__.py Thu Jun  4 18:11:57 2009
@@ -21,4 +21,3 @@
 	return convertExceptions(oldFunc)
 
 from nodemanagerservice import NodeManagerService
-from notification import Notifier

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py Thu Jun  4 18:11:57 2009
@@ -26,8 +26,7 @@
 from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
 from tashi.services import nodemanagerservice, clustermanagerservice
 from tashi import ConnectionManager
-
-import notification
+import tashi
 
 @signalHandler(signal.SIGTERM)
 def handleSIGTERM(signalNumber, stackFrame):
@@ -37,6 +36,8 @@
 	global config, dfs, vmm, service, server, log, notifier
 	
 	(config, configFiles) = getConfig(["NodeManager"])
+	publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
+	tashi.publisher = publisher
 	logging.config.fileConfig(configFiles)
 	log = logging.getLogger(__name__)
 	log.info('Using configuration file(s) %s' % configFiles)
@@ -49,9 +50,6 @@
 	server = TThreadedServer(processor, transport)
 	debugConsole(globals())
 	
-	notifier = notification.Notifier(config)
-	log.addHandler(notifier)
-
 	try:
 		server.serve()
 	except KeyboardInterrupt: