You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2009/06/04 18:11:58 UTC
svn commit: r781819 - in /incubator/tashi/trunk: etc/ src/tashi/
src/tashi/agents/ src/tashi/clustermanager/ src/tashi/messaging/
src/tashi/nodemanager/
Author: mryan3
Date: Thu Jun 4 18:11:57 2009
New Revision: 781819
URL: http://svn.apache.org/viewvc?rev=781819&view=rev
Log:
Messaging and ganglia code cleanup
A new class implements a publishing interface (GangliaPublisher)
A new log handler uses the publishing interface (MessagingLogHandler)
Older code that explicitly used Notification and the messaging services was cleaned up
Added:
incubator/tashi/trunk/src/tashi/messaging/gangliapublisher.py
incubator/tashi/trunk/src/tashi/messaging/messagingloghandler.py
Removed:
incubator/tashi/trunk/src/tashi/ganglialoghandler.py
incubator/tashi/trunk/src/tashi/nodemanager/notification.py
Modified:
incubator/tashi/trunk/etc/TashiDefaults.cfg
incubator/tashi/trunk/src/tashi/__init__.py
incubator/tashi/trunk/src/tashi/agents/primitive.py
incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py
incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/trunk/src/tashi/messaging/__init__.py
incubator/tashi/trunk/src/tashi/nodemanager/__init__.py
incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/trunk/etc/TashiDefaults.cfg Thu Jun 4 18:11:57 2009
@@ -20,6 +20,7 @@
service = tashi.clustermanager.ClusterManagerService
data = tashi.clustermanager.data.GetentOverride
dfs = tashi.dfs.Vfs
+publisher = tashi.messaging.GangliaPublisher
nodeManagerPort = 9883
[ClusterManagerService]
@@ -76,6 +77,7 @@
infoFile = /var/tmp/nm.dat
clusterManagerHost = localhost
clusterManagerPort = 9882
+statsInterval = 0.0
;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Qemu]
@@ -115,6 +117,7 @@
hook1 = tashi.agents.DhcpDns
scheduleDelay = 2.0
densePack = False
+publisher = tashi.messaging.GangliaPublisher
[DhcpDns]
dnsKeyFile = /location/of/private/key/for/dns
@@ -127,13 +130,17 @@
ipRange1 = 172.16.128.2-172.16.255.254
reverseDns = True
+[GangliaPublisher]
+dmax = 60
+retry = 3600
+
# Logging stuff
-# Switch the "keys" and "handlers" variables below to output log data to ganglia metrics
+# Switch the "keys" and "handlers" variables below to output log data to the publisher
[loggers]
keys = root
[handlers]
-#keys = consoleHandler,gangliaHandler,fileHandler
+#keys = consoleHandler,publisherHandler,fileHandler
keys = consoleHandler
[formatters]
@@ -141,7 +148,7 @@
[logger_root]
level = DEBUG
-#handlers = consoleHandler,gangliaHandler,fileHandler
+#handlers = consoleHandler,publisherHandler,fileHandler
handlers = consoleHandler
propagate = 1
@@ -151,11 +158,11 @@
formatter = standardFormatter
args = (sys.stdout,)
-[handler_gangliaHandler]
-class = tashi.GangliaLogHandler
+[handler_publisherHandler]
+class = tashi.messaging.MessagingLogHandler
level = NOTSET
formatter = standardFormatter
-args = (60, 3600)
+args = ()
[handler_fileHandler]
class = FileHandler
Modified: incubator/tashi/trunk/src/tashi/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/__init__.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/__init__.py (original)
+++ incubator/tashi/trunk/src/tashi/__init__.py Thu Jun 4 18:11:57 2009
@@ -18,4 +18,3 @@
from util import *
from connectionmanager import ConnectionManager
from version import version
-from ganglialoghandler import GangliaLogHandler
Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Thu Jun 4 18:11:57 2009
@@ -27,6 +27,7 @@
from tashi.services.ttypes import *
from tashi.util import getConfig, createClient, instantiateImplementation, boolean
+import tashi
class Primitive(object):
def __init__(self, config, client, transport):
@@ -134,6 +135,8 @@
def main():
(config, configFiles) = getConfig(["Agent"])
+ publisher = instantiateImplementation(config.get("Primitive", "publisher"), config)
+ tashi.publisher = publisher
(client, transport) = createClient(config)
logging.config.fileConfig(configFiles)
agent = Primitive(config, client, transport)
Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py Thu Jun 4 18:11:57 2009
@@ -27,20 +27,13 @@
from thrift.transport.TSocket import TServerSocket
from thrift.server.TServer import TThreadedServer
-from tashi.messaging.thriftmessaging import MessageBrokerThrift
-from tashi.messaging.tashimessaging import TashiLogHandler
from tashi.services import clustermanagerservice
from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
+import tashi
def startClusterManager(config):
global service, data
- # start the event broker
- broker = MessageBrokerThrift(int(config.get('MessageBroker', 'port')))
- broker.ready.wait()
- messageHandler = TashiLogHandler(config)
- log.addHandler(messageHandler)
-
dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
data = instantiateImplementation(config.get("ClusterManager", "data"), config)
service = instantiateImplementation(config.get("ClusterManager", "service"), config, data, dfs)
@@ -65,6 +58,8 @@
# setup configuration and logging
(config, configFiles) = getConfig(["ClusterManager"])
+ publisher = instantiateImplementation(config.get("ClusterManager", "publisher"), config)
+ tashi.publisher = publisher
logging.config.fileConfig(configFiles)
log = logging.getLogger(__file__)
log.info('Using configuration file(s) %s' % configFiles)
Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Thu Jun 4 18:11:57 2009
@@ -25,8 +25,6 @@
import threading
import time
-from tashi.messaging.thriftmessaging import MessageBrokerThrift
-from tashi.messaging.tashimessaging import TashiLogHandler
from tashi.services.ttypes import Errors, InstanceState, HostState, TashiException
from tashi.services import nodemanagerservice
from tashi import boolean, convertExceptions, ConnectionManager, vmStates, timed, version, scrubString
@@ -44,8 +42,6 @@
self.proxy = ConnectionManager(nodemanagerservice.Client, int(self.config.get('ClusterManager', 'nodeManagerPort')))
self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
- self.messageHandler = TashiLogHandler(config)
- self.log.addHandler(self.messageHandler)
self.lastContacted = {}
self.decayedHosts = {}
self.decayedInstances = {}
Modified: incubator/tashi/trunk/src/tashi/messaging/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/messaging/__init__.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/messaging/__init__.py (original)
+++ incubator/tashi/trunk/src/tashi/messaging/__init__.py Thu Jun 4 18:11:57 2009
@@ -15,3 +15,5 @@
# specific language governing permissions and limitations
# under the License.
+from messagingloghandler import MessagingLogHandler
+from gangliapublisher import GangliaPublisher
Added: incubator/tashi/trunk/src/tashi/messaging/gangliapublisher.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/messaging/gangliapublisher.py?rev=781819&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/messaging/gangliapublisher.py (added)
+++ incubator/tashi/trunk/src/tashi/messaging/gangliapublisher.py Thu Jun 4 18:11:57 2009
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import time
+import types
+
+from tashi import scrubString
+
+class GangliaPublisher(object):
+ def __init__(self, config):
+ self.disable = False
+ self.disableAt = 0.0
+ self.retry = float(config.get("GangliaPublisher", "retry"))
+ self.dmax = float(config.get("GangliaPublisher", "dmax"))
+
+ def publish(self, message):
+ for (key, val) in message.iteritems():
+ if (self.disable):
+ if (time.time() - self.disableAt > self.retry):
+ disable = False
+ else:
+ return
+ key = scrubString(str(key))
+ val = str(val)
+ metricName = "tashi_%s" % (key)
+ val = val.replace('"', "'")
+ val = val.replace('<', '<')
+ val = val.replace('>', '>')
+ metricValue = val
+ metricType = "string"
+ try:
+ metricValue = float(metricValue)
+ metricType = "float"
+ metricValue = "%3.3f" % (metricValue)
+ except:
+ pass
+ cmd = 'gmetric -n "%s" -v "%s" -t "%s" -d "%d"' % (metricName, metricValue, metricType, self.dmax)
+ (stdin, stdout) = os.popen4(cmd)
+ stdin.close()
+ res = stdout.read()
+ stdout.close()
+ if (res != ""):
+ self.disable = True
+ self.disableAt = time.time()
+ print "Failed to exec gmetric, disabling: %s" % (res)
+
+ def publishList(self, messages):
+ for message in messages:
+ self.publish(message)
Added: incubator/tashi/trunk/src/tashi/messaging/messagingloghandler.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/messaging/messagingloghandler.py?rev=781819&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/messaging/messagingloghandler.py (added)
+++ incubator/tashi/trunk/src/tashi/messaging/messagingloghandler.py Thu Jun 4 18:11:57 2009
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os.path
+import sys
+import time
+
+import tashi
+
+logging.tashi = tashi
+
+class MessagingLogHandler(logging.Handler):
+ def __init__(self):
+ logging.Handler.__init__(self)
+ self.name = os.path.basename(sys.argv[0])
+ self.msgIndex = 0
+
+ def emit(self, record):
+ try:
+ key = "log_%s_%d_%d" % (self.name, self.msgIndex, int(time.time()*1000))
+ val = self.format(record)
+ tashi.publisher.publish({key:val})
+ self.msgIndex = self.msgIndex + 1
+ except Exception, e:
+ print e
Modified: incubator/tashi/trunk/src/tashi/nodemanager/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/__init__.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/__init__.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/__init__.py Thu Jun 4 18:11:57 2009
@@ -21,4 +21,3 @@
return convertExceptions(oldFunc)
from nodemanagerservice import NodeManagerService
-from notification import Notifier
Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py?rev=781819&r1=781818&r2=781819&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py Thu Jun 4 18:11:57 2009
@@ -26,8 +26,7 @@
from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
from tashi.services import nodemanagerservice, clustermanagerservice
from tashi import ConnectionManager
-
-import notification
+import tashi
@signalHandler(signal.SIGTERM)
def handleSIGTERM(signalNumber, stackFrame):
@@ -37,6 +36,8 @@
global config, dfs, vmm, service, server, log, notifier
(config, configFiles) = getConfig(["NodeManager"])
+ publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
+ tashi.publisher = publisher
logging.config.fileConfig(configFiles)
log = logging.getLogger(__name__)
log.info('Using configuration file(s) %s' % configFiles)
@@ -49,9 +50,6 @@
server = TThreadedServer(processor, transport)
debugConsole(globals())
- notifier = notification.Notifier(config)
- log.addHandler(notifier)
-
try:
server.serve()
except KeyboardInterrupt: