You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2009/07/08 20:15:37 UTC

svn commit: r792287 - in /incubator/tashi/trunk: etc/TashiDefaults.cfg src/tashi/agents/mauipacket.py src/tashi/agents/mauiwiki.py src/tashi/agents/primitive.py

Author: mryan3
Date: Wed Jul  8 20:15:36 2009
New Revision: 792287

URL: http://svn.apache.org/viewvc?rev=792287&view=rev
Log:
Jim Cipar's Maui patches with modifications:
1. The tashi.publisher object is initialized in mauiwiki.py
2. The "publisher" config option was moved into the "Agent" section of the file to avoid duplication
3. Indentation cleanups

This patch adds a new agent: mauiwiki, that allows the maui cluster  
scheduler to use Tashi as a resource manager. It basically translates  
between the Tashi cluster manager interface, and the maui wiki  
interface ( http://www.clusterresources.com/products/maui/docs/wiki/ ).

There is a bug in the way checksums are converted to strings: leading  
0s are truncated.  Here is the fixed patch for mauipacket.py


Added:
    incubator/tashi/trunk/src/tashi/agents/mauipacket.py
    incubator/tashi/trunk/src/tashi/agents/mauiwiki.py   (with props)
Modified:
    incubator/tashi/trunk/etc/TashiDefaults.cfg
    incubator/tashi/trunk/src/tashi/agents/primitive.py

Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=792287&r1=792286&r2=792287&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/trunk/etc/TashiDefaults.cfg Wed Jul  8 20:15:36 2009
@@ -113,11 +113,20 @@
 clusterManagerTimeout = 5.0
 
 # Agent portion
+[Agent]
+publisher = tashi.messaging.GangliaPublisher
+
 [Primitive]
 hook1 = tashi.agents.DhcpDns
 scheduleDelay = 2.0
 densePack = False
-publisher = tashi.messaging.GangliaPublisher
+
+[MauiWiki]
+hook1 = tashi.agents.DhcpDns
+refreshTime = 5
+authuser = changeme
+authkey = 1111
+defaultJobTime = 8640000000
 
 [DhcpDns]
 dnsKeyFile = /location/of/private/key/for/dns

Added: incubator/tashi/trunk/src/tashi/agents/mauipacket.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/mauipacket.py?rev=792287&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/mauipacket.py (added)
+++ incubator/tashi/trunk/src/tashi/agents/mauipacket.py Wed Jul  8 20:15:36 2009
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import subprocess
+import time
+import pseudoDes
+
+class MauiPacket:
+	def __init__(self, key=0):
+		self.size = 0
+		self.char = '\n'
+		self.chksum = '0'*16
+		self.timestamp = int(time.time())
+		self.auth = ''
+		self.data = []
+		self.msg = ''
+		self.key=key
+	def readPacket(self, istream):
+		self.msg = ''
+
+		size = istream.read(8)
+		self.msg = self.msg+size
+		self.size = int(size)
+
+		self.char = istream.read(1)
+		self.msg = self.msg + self.char
+
+		packet = istream.read(self.size)
+		self.msg = self.msg + packet
+
+		packet = packet.split()
+		
+		for i in range(len(packet)):
+			item = packet[i].split('=')
+			if item[0] == 'CK':
+				self.chksum = item[1]
+			if item[0] == 'TS':
+				self.timestamp = int(item[1])
+			if item[0] == 'AUTH':
+				self.auth = item[1]
+			if item[0] == 'DT':
+				self.data = packet[i:]
+				self.data=self.data[0].split('=',1)[1:] + self.data[1:]
+
+	def checksumMessage(self, message, key=None):
+		if key == None:
+			key = self.key
+		if type(key) == type(''):
+			key = int(key)
+		chksum = pseudoDes.generateKey(message, key)
+		chksum = '%016x' % chksum
+		return chksum
+	def getChecksum(self):
+		cs = self.msg.partition('TS=')
+		cs = cs[1]+cs[2]
+		chksum = self.checksumMessage(cs)
+		return chksum
+	def verifyChecksum(self):
+		chksum = self.getChecksum()
+		if chksum != self.chksum:
+			print 'verifyChecksum: "%s"\t"%s"'%(chksum, self.chksum)
+			print 'verifyChecksum (types): %s\t%s' %(type(chksum), type(self.chksum))
+			return False
+		return True
+	def set(self, data, auth=None, key=None, timestamp=None):
+		if timestamp==None:
+			timestamp = int(time.time())
+		self.data = data
+		if auth !=None:
+			self.auth = auth
+		if key != None:
+			self.key = key
+		self.timstamp=timestamp
+		self.fixup()
+	def fixup(self):
+		datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
+		self.chksum = self.checksumMessage(datastring)
+
+		pktstring = 'CK=%s %s'%(self.chksum, datastring)
+		self.size = len(pktstring)
+	def __str__(self):
+		datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
+		self.chksum = self.checksumMessage(datastring)
+
+		pktstring = 'CK=%s %s'%(self.chksum, datastring)
+		self.msg = ''
+		self.msg = self.msg + '%08i'%len(pktstring)
+		self.msg = self.msg + self.char
+		self.msg = self.msg + pktstring
+
+		return self.msg
+	def prettyString(self):
+		s = '''Maui Packet
+-----------
+size:\t\t%i
+checksum:\t%s
+timestamp:\t%s
+auth:\t\t%s
+data:
+%s
+-----------'''
+		s = s%(self.size, self.chksum, self.timestamp, self.auth, self.data)
+		return s

Added: incubator/tashi/trunk/src/tashi/agents/mauiwiki.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/mauiwiki.py?rev=792287&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/mauiwiki.py (added)
+++ incubator/tashi/trunk/src/tashi/agents/mauiwiki.py Wed Jul  8 20:15:36 2009
@@ -0,0 +1,467 @@
+#! /usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+import hashlib
+import sys
+import subprocess
+import socket, SocketServer
+from socket import gethostname
+import os
+import threading
+import logging.config
+
+from tashi.parallel import synchronizedmethod
+from tashi.services.ttypes import *
+from tashi.util import getConfig, createClient, instantiateImplementation, boolean
+from tashi.agents.mauipacket import MauiPacket
+import tashi.util
+
+def jobnameToId(jobname):
+	return int(jobname.split('.')[-1])
+
+class InstanceHooks():
+	def __init__(self, config):
+		self.log = logging.getLogger(__file__)
+		self.hooks=[]
+		items = config.items("MauiWiki")
+		items.sort()
+		for item in items:
+			(name, value) = item
+			name = name.lower()
+			if (name.startswith("hook")):
+				try:
+					self.hooks.append(instantiateImplementation(value, config, client, transport, False))
+				except:
+					self.log.exception("Failed to load hook %s" % (value))
+		(self.client, self.transport) = createClient(config)
+	def preCreate(self, inst):
+		for hook in self.hooks:
+			hook.preCreate(inst)
+	def postDestroy(self, inst):
+		for hook in self.hooks:
+			hook.postDestroy(inst)
+	def idToInst(self, id):
+		instances = self.client.getInstances()
+		print 'instances ', instances
+		insts = [i for i in instances if str(i.id)==str(id)]
+		if len(insts) == 0:
+			raise "No instance with ID %s"%id
+		if len(insts) > 1:
+			raise "Multiple instances with ID %s"%id
+		inst = insts[0]
+		return inst
+	def destroyById(self, id):
+		inst = self.idToInst(id)
+		self.client.destroyVm(int(id))
+		self.postDestroy(inst)
+	def activateById(self, id, host):
+		inst = self.idToInst(id)
+		self.preCreate(inst)
+		self.client.activateVm(int(id), host)
+
+def cmplists(a, b):
+	for i in range(len(a)):
+		if a[i] < b[i]:
+			return -1
+		if a[i] > b[i]:
+			return 1
+	return 0
+
+class TashiConnection(threading.Thread):
+	def __init__(self, config, client, transport):
+		(self.client, self.transport) = createClient(config)
+
+		self.hosts={}
+		self.instances={}
+		self.users={}
+
+		self.config = config
+		self.ihooks = InstanceHooks(config)
+		self.log = logging.getLogger(__file__)
+		self.refreshTime = float(self.config.get('MauiWiki', 'refreshTime'))
+		self.defaultJobTime = str(self.config.get('MauiWiki', 'defaultJobTime'))
+		threading.Thread.__init__(self)
+		self.daemon = True
+	def run(self):
+		while True:
+			print 'TashiConnection:run updating hosts ...'
+			self.updateHosts()
+			print 'TashiConnection:run updating instances ...'
+			self.updateInstances()
+			print 'TashiConnection:run updating users ...'
+			self.updateUsers()
+			time.sleep(self.refreshTime)
+	def wikiHostState(self, host):
+		'''Returns a string representing the host state in a form compatable
+		with the maui-wiki protocol.  This code simply chooses between
+		"Down" and "Unknown":
+
+		- Busy: Node is running some jobs and will not accept additional jobs
+		- Down: Resource Manager problems have been detected.  Node is
+		incapable of running jobs.
+		- Draining: Node is responding but will not accept new jobs
+		- Idle: Node is ready to run jobs but currently is not running any.
+		Running: Node is running some jobs and will accept additional jobs
+		- Unknown: Node is capable of running jobs but the scheduler
+		will need to determine if the node state is actually Idle,
+		Running, or Busy.'''
+		if host.up == False or host.state == HostState.VersionMismatch:
+			return "Down"
+		if host.state == HostState.Drained:
+			return "Draining"
+		return "Unknown"
+
+	def wikiInstanceState(self, instance):
+		'''Returns a string representing the instance stat in a form compatable
+		with the maui-wiki protocol.
+
+		Completed: Job has completed
+		Hold: Job is in the queue but is not allowed to run
+		Idle: Job is ready to run
+		Removed: Job has been canceled or otherwise terminated externally
+		Running: Job is currently executing
+		Suspended: job has started but execution has temporarily been suspended'''
+		tashiToWiki = {InstanceState.Pending:'Idle',
+		               InstanceState.Held:'Hold',
+		               InstanceState.Exited:'Removed'}
+		if tashiToWiki.has_key(instance.state):
+			return tashiToWiki[instance.state]
+		else:
+			return 'Running'
+
+	# Host handling
+	def compareHosts(self, host1, host2):
+		def ii(host):
+			try:
+				state = tashi.util.hostStates[host.state]
+			except:
+				state = 'Unknown'
+			return [host.id, host.name, host.up, state, host.memory, host.cores]
+		return cmplists(ii(host1), ii(host2))
+#	 @synchronizedmethod
+	def updateHost(self, host):
+		self.hosts[host.id] = host
+		self.hosts[host.id].updateTime = time.time()
+#	 @synchronizedmethod
+	def addHost(self, host):
+		self.hosts[host.id] = host
+		self.hosts[host.id].updateTime = time.time()
+#	 @synchronizedmethod
+	def removeHost(self, host):
+		self.hosts.pop(host.id)
+#	 @synchronizedmethod
+	def updateHosts(self):
+		if (not self.transport.isOpen):
+			self.transport.open()
+		hosts = self.client.getHosts()
+		for host in hosts:
+			if not self.hosts.has_key(host.id):
+				self.addHost(host)
+			elif self.compareHosts(self.hosts[host.id], host) != 0:
+				self.updateHost(host)
+		hhosts = {}
+		for host in hosts:
+			hhosts[host.id] = host
+		for host in self.hosts.values():
+			if not hhosts.has_key(host.id):
+				self.removeHost(host)
+	# Instance handling
+	def compareInstances(self, instance1, instance2):
+		def ii(inst):
+			return [inst.id,
+			        inst.vmId,
+			        inst.hostId,
+			        tashi.util.vmStates[inst.state],
+			        inst.userId,
+			        inst.name,
+			        inst.cores,
+			        inst.memory,
+			        len(inst.disks), # FIXME: this isn't a good way to compare
+			        len(inst.nics),   # FIXME: this isn't a good way to compare
+			        len(inst.hints)]
+		return cmplists(ii(instance1), ii(instance2))
+		return 0
+	@synchronizedmethod
+	def updateInstance(self, instance):
+		qt = self.instances[instance.id].queueTime
+		self.instances[instance.id] = instance
+		self.instances[instance.id].updateTime = time.time()
+		self.instances[instance.id].queueTime = qt
+	@synchronizedmethod
+	def addInstance(self, instance):
+		self.instances[instance.id] = instance
+		self.instances[instance.id].updateTime = time.time()
+		self.instances[instance.id].queueTime = time.time()
+	@synchronizedmethod
+	def removeInstance(self, instance):
+		self.instances[instance.id].state = InstanceState.Exited
+		self.ihooks.postDestroy(instance)
+	@synchronizedmethod
+	def updateInstances(self):
+		if (not self.transport.isOpen):
+			self.transport.open()
+		instances = self.client.getInstances()
+		for instance in instances:
+			print 'found instance', instance.id
+			if not self.instances.has_key(instance.id):
+				print "it's a new instance"
+				self.addInstance(instance)
+			elif self.compareInstances(self.instances[instance.id], instance) != 0:
+				self.updateInstance(instance)
+		iinsts = {}
+		for instance in instances:
+			iinsts[instance.id] = instance
+		for instance in self.instances.values():
+			if instance.state == InstanceState.Exited:
+				continue
+			if not iinsts.has_key(instance.id):
+				print 'removing instance ', instance.id
+				self.removeInstance(instance)
+	# User handling
+	def compareUsers(self, user1, user2):
+		if user1.id < user2.id:
+			return -1
+		elif user1.id > user2.id:
+			return 1
+		if user1.name < user2.name:
+			return -1
+		if user1.name > user2.name:
+			return 1
+		return 0
+	@synchronizedmethod
+	def updateUser(self, user):
+		self.users[user.id] = user
+		self.users[user.id].updatetime = time.time()
+	@synchronizedmethod
+	def addUser(self, user):
+		self.users[user.id] = user
+		self.users[user.id].updatetime = time.time()
+	@synchronizedmethod
+	def removeUser(self, user):
+		self.users.pop(user.id)
+	@synchronizedmethod
+	def updateUsers(self):
+		if (not self.transport.isOpen):
+			self.transport.open()
+		users = self.client.getUsers()
+		for user in users:
+			if not self.users.has_key(user.id):
+				self.addUser(user)
+			elif self.compareUsers(self.users[user.id], user) != 0:
+				self.updateUser(user)
+		uusers = {}
+		for user in users:
+			uusers[user.id] = user
+		for user in self.users.values():
+			if not uusers.has_key(user.id):
+				self.removeUser(user)
+	# Get data structures for maui
+	# Format is {id:{field:value}}
+	@synchronizedmethod
+	def getNodes(self, updatetime=0, nodelist=['ALL']):
+		if len(nodelist) == 0:
+			return {}
+		if nodelist[0]=='ALL':
+			nodes = [n for n in self.hosts.values() if n.updateTime >= updatetime]
+		else:
+			nodes = [n for n in self.hosts.values()
+					 if n.updateTime >= updatetime and n.name in nodelist]
+		nl = {}
+		for node in nodes:
+			nl[node.name] = {'STATE':self.wikiHostState(node),
+			                 'UPDATETIME':str(int(node.updateTime)),
+			                 'CPROC':str(node.cores),
+			                 'CMEMORY':str(node.memory)}
+		return nl
+	@synchronizedmethod
+	def getJobs(self, updatetime=0, joblist=['ALL']):
+		if len(joblist) == 0:
+			return {}
+		if joblist[0] == 'ALL':
+			jobs = [j for j in self.instances.values() if j.updateTime >= updatetime]
+		else:
+			jobs = [j for j in self.instances.values()
+					if j.updateTime >= updatetime and j.id in joblist]
+		jl = {}
+		for job in jobs:
+			id = "%s.%i"%(job.name, job.id)
+			jl[id] = {'STATE':self.wikiInstanceState(job),
+			          'UNAME':self.users[job.userId].name,
+			          'GNAME':self.users[job.userId].name,
+			          'UPDATETIME':int(job.updateTime),
+			          'QUEUETIME':job.queueTime,
+			          'TASKS':'1',
+			          'DPROCS':str(job.cores),
+			          'DMEM':str(job.memory),
+			          'RMEM':str(job.memory),
+			          'WCLIMIT':str(self.defaultJobTime)}
+			if job.hostId != None:
+				jl[id]['TASKLIST'] = self.hosts[job.hostId].name
+		return jl
+	@synchronizedmethod
+	def activateById(self, id, host):
+		if not self.instances.has_key(id):
+			raise "no such instance"
+		self.ihooks.activateById(id, host)
+		self.instances[id].state=InstanceState.Activating
+
+class MauiListener(SocketServer.StreamRequestHandler):
+	def setup(self):
+		global config
+		self.log = logging.getLogger(__file__)
+		SocketServer.StreamRequestHandler.setup(self)
+		self.ihooks = InstanceHooks(config)
+		(self.client, self.transport) = createClient(config)
+		self.tashiconnection=tashiconnection
+		self.auth = config.get('MauiWiki', 'authuser')
+		self.key = config.get('MauiWiki', 'authkey')
+
+	def handle(self):
+		p = MauiPacket(key=self.key)
+		self.istream = self.ostream = self.rfile
+		p.readPacket(self.istream)
+		self.processPacket(p)
+
+	def processGetNodes(self, p):
+		arg = p.data[1]
+		arg = arg.split('=')
+		arg = arg[1].split(':')
+		updatetime = int(arg[0])
+		nodelist = arg[1:]
+		print 'got GETNODES packet "%s" "%s"'%(updatetime, nodelist)
+		r = MauiPacket()
+		nodes = tashiconnection.getNodes(updatetime, nodelist)
+		numNodes = len(nodes)
+		dat = 'ARG=%i#'%numNodes
+		first = True
+		for node, attributes in nodes.iteritems():
+			if first:
+				dat = dat + '%s:'%node
+				first = False
+			else:
+				dat = dat + '#%s:'%node
+			attrs = ['%s=%s'%(a,v) for a,v in attributes.iteritems()]
+			dat = dat + ';'.join(attrs)+';'
+		r.set(['SC=0', dat], auth=self.auth, key=self.key)
+		return r
+
+	def processGetJobs(self, p):
+		arg = p.data[1]
+		arg = arg.split('=')
+		arg = arg[1].split(':')
+		updatetime = int(arg[0])
+		joblist = arg[1:]
+		print 'got GETJOBS packet "%s" "%s"'%(updatetime, joblist)
+		r = MauiPacket();
+		jobs = tashiconnection.getJobs(updatetime, joblist)
+		numJobs = len(jobs)
+		dat = 'ARG=%i#'%numJobs
+		first = True
+		for job, attributes in jobs.iteritems():
+			if first:
+				dat = dat +'%s:'%job
+				first = False
+			else:
+				dat = dat +'#%s:'%job
+			# FIXME: support limits
+			attributes['WCLIMIT']=str(10000)
+			attrs = ['%s=%s'%(a,v) for a,v in attributes.iteritems()]
+			dat = dat + ';'.join(attrs) + ';'
+		r.set(['SC=0', dat], auth=self.auth, key=self.key)
+		return r
+
+	def processStartJob(self, p):
+		job = p.data[1]
+		job = job.split('=')[1].strip()
+		job = job.split('.')[-1]
+		tasklist = p.data[2].split('=')[1].split(':')
+		print 'STARTJOB ', job, tasklist
+		try:
+			hosts = self.client.getHosts()
+			print 'hosts ', hosts
+			host = [h for h in hosts if h.name == tasklist[0]][0]
+			self.tashiconnection.activateById(jobnameToId(job), host)
+			print '\tactivated VM!'
+			r = MauiPacket()
+			r.set(['SC=0','RESPONSE=VM %s started on host %s'%(job, tasklist[0])])
+			return r
+		except Exception, e:
+			# FIXME: make this a real failure response
+			print 'Oh noes! ', e
+			r = MauiPacket()
+			r.set(['SC=-1', 'RESPONSE=%s'%str(e)])
+			return r
+
+	def processCancelJob(self, p):
+		job = p.data[1]
+		job = job.split('=')[1].strip()
+		print 'CANCELJOB ', job
+		try:
+			self.client.destroyVm(jobnameToId(job))
+			print '\tdestroyed VM!'
+			r = MauiPacket()
+			r.set(['SC=0', 'RESPONSE=VM %s destroyed'%job])
+			return r
+		except Exception, e:
+			# FIXME: make this a real failure response
+			print 'Oh noes! ', e
+			r = MauiPacket()
+			r.set(['SC=-1', 'RESPONSE=%s'%str(e)])
+			return r
+
+	def processPacket(self,p):
+		dat = p.data
+		if not p.verifyChecksum():
+			print p
+			print 'bad checksum'
+			return
+		r = None
+		if dat[0] == 'CMD=GETNODES':
+			r = self.processGetNodes(p)
+		elif dat[0] == 'CMD=GETJOBS':
+			r = self.processGetJobs(p)
+		elif dat[0] == 'CMD=STARTJOB':
+			r = self.processStartJob(p)
+		elif dat[0] == 'CMD=CANCELJOB':
+			r = self.processCancelJob(p)
+		else:
+			print 'got unknown packet'
+			print p.prettyString()
+			r = MauiPacket()
+			r.set(['SC=-810', 'RESPONSE=command not supported'])
+		print r.prettyString()
+		self.ostream.write(str(r))
+		self.ostream.flush()
+		self.ostream.close()
+		self.ostream.close()
+		self.istream.close()
+
+if __name__ == '__main__':
+	(config, configFiles) = getConfig(["Agent"])
+	publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
+	tashi.publisher = publisher
+	(client, transport) = createClient(config)
+	logging.config.fileConfig(configFiles)
+	tashiconnection = TashiConnection(config, client, transport)
+	tashiconnection.start()
+
+	HOST, PORT = '', 1717
+	server = SocketServer.TCPServer((HOST,PORT), MauiListener)
+	server.serve_forever()

Propchange: incubator/tashi/trunk/src/tashi/agents/mauiwiki.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=792287&r1=792286&r2=792287&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Wed Jul  8 20:15:36 2009
@@ -135,7 +135,7 @@
 
 def main():
 	(config, configFiles) = getConfig(["Agent"])
-	publisher = instantiateImplementation(config.get("Primitive", "publisher"), config)
+	publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
 	tashi.publisher = publisher
 	(client, transport) = createClient(config)
 	logging.config.fileConfig(configFiles)