You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2009/07/08 20:15:37 UTC
svn commit: r792287 - in /incubator/tashi/trunk: etc/TashiDefaults.cfg
src/tashi/agents/mauipacket.py src/tashi/agents/mauiwiki.py
src/tashi/agents/primitive.py
Author: mryan3
Date: Wed Jul 8 20:15:36 2009
New Revision: 792287
URL: http://svn.apache.org/viewvc?rev=792287&view=rev
Log:
Jim Cipar's Maui patches with modifications:
1. The tashi.publisher object is initialized in mauiwiki.py
2. The "publisher" config option was moved into the "Agent" section of the file to avoid duplication
3. Indentation cleanups
This patch adds a new agent: mauiwiki, that allows the maui cluster
scheduler to use Tashi as a resource manager. It basically translates
between the Tashi cluster manager interface, and the maui wiki
interface ( http://www.clusterresources.com/products/maui/docs/wiki/ ).
There is a bug in the way checksums are converted to strings: leading
0s are truncated. Here is the fixed patch for mauipacket.py
Added:
incubator/tashi/trunk/src/tashi/agents/mauipacket.py
incubator/tashi/trunk/src/tashi/agents/mauiwiki.py (with props)
Modified:
incubator/tashi/trunk/etc/TashiDefaults.cfg
incubator/tashi/trunk/src/tashi/agents/primitive.py
Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=792287&r1=792286&r2=792287&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/trunk/etc/TashiDefaults.cfg Wed Jul 8 20:15:36 2009
@@ -113,11 +113,20 @@
clusterManagerTimeout = 5.0
# Agent portion
+[Agent]
+publisher = tashi.messaging.GangliaPublisher
+
[Primitive]
hook1 = tashi.agents.DhcpDns
scheduleDelay = 2.0
densePack = False
-publisher = tashi.messaging.GangliaPublisher
+
+[MauiWiki]
+hook1 = tashi.agents.DhcpDns
+refreshTime = 5
+authuser = changeme
+authkey = 1111
+defaultJobTime = 8640000000
[DhcpDns]
dnsKeyFile = /location/of/private/key/for/dns
Added: incubator/tashi/trunk/src/tashi/agents/mauipacket.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/mauipacket.py?rev=792287&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/mauipacket.py (added)
+++ incubator/tashi/trunk/src/tashi/agents/mauipacket.py Wed Jul 8 20:15:36 2009
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import subprocess
+import time
+import pseudoDes
+
+class MauiPacket:
+ def __init__(self, key=0):
+ self.size = 0
+ self.char = '\n'
+ self.chksum = '0'*16
+ self.timestamp = int(time.time())
+ self.auth = ''
+ self.data = []
+ self.msg = ''
+ self.key=key
+ def readPacket(self, istream):
+ self.msg = ''
+
+ size = istream.read(8)
+ self.msg = self.msg+size
+ self.size = int(size)
+
+ self.char = istream.read(1)
+ self.msg = self.msg + self.char
+
+ packet = istream.read(self.size)
+ self.msg = self.msg + packet
+
+ packet = packet.split()
+
+ for i in range(len(packet)):
+ item = packet[i].split('=')
+ if item[0] == 'CK':
+ self.chksum = item[1]
+ if item[0] == 'TS':
+ self.timestamp = int(item[1])
+ if item[0] == 'AUTH':
+ self.auth = item[1]
+ if item[0] == 'DT':
+ self.data = packet[i:]
+ self.data=self.data[0].split('=',1)[1:] + self.data[1:]
+
+ def checksumMessage(self, message, key=None):
+ if key == None:
+ key = self.key
+ if type(key) == type(''):
+ key = int(key)
+ chksum = pseudoDes.generateKey(message, key)
+ chksum = '%016x' % chksum
+ return chksum
+ def getChecksum(self):
+ cs = self.msg.partition('TS=')
+ cs = cs[1]+cs[2]
+ chksum = self.checksumMessage(cs)
+ return chksum
+ def verifyChecksum(self):
+ chksum = self.getChecksum()
+ if chksum != self.chksum:
+ print 'verifyChecksum: "%s"\t"%s"'%(chksum, self.chksum)
+ print 'verifyChecksum (types): %s\t%s' %(type(chksum), type(self.chksum))
+ return False
+ return True
+ def set(self, data, auth=None, key=None, timestamp=None):
+ if timestamp==None:
+ timestamp = int(time.time())
+ self.data = data
+ if auth !=None:
+ self.auth = auth
+ if key != None:
+ self.key = key
+ self.timstamp=timestamp
+ self.fixup()
+ def fixup(self):
+ datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
+ self.chksum = self.checksumMessage(datastring)
+
+ pktstring = 'CK=%s %s'%(self.chksum, datastring)
+ self.size = len(pktstring)
+ def __str__(self):
+ datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
+ self.chksum = self.checksumMessage(datastring)
+
+ pktstring = 'CK=%s %s'%(self.chksum, datastring)
+ self.msg = ''
+ self.msg = self.msg + '%08i'%len(pktstring)
+ self.msg = self.msg + self.char
+ self.msg = self.msg + pktstring
+
+ return self.msg
+ def prettyString(self):
+ s = '''Maui Packet
+-----------
+size:\t\t%i
+checksum:\t%s
+timestamp:\t%s
+auth:\t\t%s
+data:
+%s
+-----------'''
+ s = s%(self.size, self.chksum, self.timestamp, self.auth, self.data)
+ return s
Added: incubator/tashi/trunk/src/tashi/agents/mauiwiki.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/mauiwiki.py?rev=792287&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/mauiwiki.py (added)
+++ incubator/tashi/trunk/src/tashi/agents/mauiwiki.py Wed Jul 8 20:15:36 2009
@@ -0,0 +1,467 @@
+#! /usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+import hashlib
+import sys
+import subprocess
+import socket, SocketServer
+from socket import gethostname
+import os
+import threading
+import logging.config
+
+from tashi.parallel import synchronizedmethod
+from tashi.services.ttypes import *
+from tashi.util import getConfig, createClient, instantiateImplementation, boolean
+from tashi.agents.mauipacket import MauiPacket
+import tashi.util
+
+def jobnameToId(jobname):
+ return int(jobname.split('.')[-1])
+
+class InstanceHooks():
+ def __init__(self, config):
+ self.log = logging.getLogger(__file__)
+ self.hooks=[]
+ items = config.items("MauiWiki")
+ items.sort()
+ for item in items:
+ (name, value) = item
+ name = name.lower()
+ if (name.startswith("hook")):
+ try:
+ self.hooks.append(instantiateImplementation(value, config, client, transport, False))
+ except:
+ self.log.exception("Failed to load hook %s" % (value))
+ (self.client, self.transport) = createClient(config)
+ def preCreate(self, inst):
+ for hook in self.hooks:
+ hook.preCreate(inst)
+ def postDestroy(self, inst):
+ for hook in self.hooks:
+ hook.postDestroy(inst)
+ def idToInst(self, id):
+ instances = self.client.getInstances()
+ print 'instances ', instances
+ insts = [i for i in instances if str(i.id)==str(id)]
+ if len(insts) == 0:
+ raise "No instance with ID %s"%id
+ if len(insts) > 1:
+ raise "Multiple instances with ID %s"%id
+ inst = insts[0]
+ return inst
+ def destroyById(self, id):
+ inst = self.idToInst(id)
+ self.client.destroyVm(int(id))
+ self.postDestroy(inst)
+ def activateById(self, id, host):
+ inst = self.idToInst(id)
+ self.preCreate(inst)
+ self.client.activateVm(int(id), host)
+
+def cmplists(a, b):
+ for i in range(len(a)):
+ if a[i] < b[i]:
+ return -1
+ if a[i] > b[i]:
+ return 1
+ return 0
+
+class TashiConnection(threading.Thread):
+ def __init__(self, config, client, transport):
+ (self.client, self.transport) = createClient(config)
+
+ self.hosts={}
+ self.instances={}
+ self.users={}
+
+ self.config = config
+ self.ihooks = InstanceHooks(config)
+ self.log = logging.getLogger(__file__)
+ self.refreshTime = float(self.config.get('MauiWiki', 'refreshTime'))
+ self.defaultJobTime = str(self.config.get('MauiWiki', 'defaultJobTime'))
+ threading.Thread.__init__(self)
+ self.daemon = True
+ def run(self):
+ while True:
+ print 'TashiConnection:run updating hosts ...'
+ self.updateHosts()
+ print 'TashiConnection:run updating instances ...'
+ self.updateInstances()
+ print 'TashiConnection:run updating users ...'
+ self.updateUsers()
+ time.sleep(self.refreshTime)
+ def wikiHostState(self, host):
+ '''Returns a string representing the host state in a form compatable
+ with the maui-wiki protocol. This code simply chooses between
+ "Down" and "Unknown":
+
+ - Busy: Node is running some jobs and will not accept additional jobs
+ - Down: Resource Manager problems have been detected. Node is
+ incapable of running jobs.
+ - Draining: Node is responding but will not accept new jobs
+ - Idle: Node is ready to run jobs but currently is not running any.
+ Running: Node is running some jobs and will accept additional jobs
+ - Unknown: Node is capable of running jobs but the scheduler
+ will need to determine if the node state is actually Idle,
+ Running, or Busy.'''
+ if host.up == False or host.state == HostState.VersionMismatch:
+ return "Down"
+ if host.state == HostState.Drained:
+ return "Draining"
+ return "Unknown"
+
+ def wikiInstanceState(self, instance):
+ '''Returns a string representing the instance stat in a form compatable
+ with the maui-wiki protocol.
+
+ Completed: Job has completed
+ Hold: Job is in the queue but is not allowed to run
+ Idle: Job is ready to run
+ Removed: Job has been canceled or otherwise terminated externally
+ Running: Job is currently executing
+ Suspended: job has started but execution has temporarily been suspended'''
+ tashiToWiki = {InstanceState.Pending:'Idle',
+ InstanceState.Held:'Hold',
+ InstanceState.Exited:'Removed'}
+ if tashiToWiki.has_key(instance.state):
+ return tashiToWiki[instance.state]
+ else:
+ return 'Running'
+
+ # Host handling
+ def compareHosts(self, host1, host2):
+ def ii(host):
+ try:
+ state = tashi.util.hostStates[host.state]
+ except:
+ state = 'Unknown'
+ return [host.id, host.name, host.up, state, host.memory, host.cores]
+ return cmplists(ii(host1), ii(host2))
+# @synchronizedmethod
+ def updateHost(self, host):
+ self.hosts[host.id] = host
+ self.hosts[host.id].updateTime = time.time()
+# @synchronizedmethod
+ def addHost(self, host):
+ self.hosts[host.id] = host
+ self.hosts[host.id].updateTime = time.time()
+# @synchronizedmethod
+ def removeHost(self, host):
+ self.hosts.pop(host.id)
+# @synchronizedmethod
+ def updateHosts(self):
+ if (not self.transport.isOpen):
+ self.transport.open()
+ hosts = self.client.getHosts()
+ for host in hosts:
+ if not self.hosts.has_key(host.id):
+ self.addHost(host)
+ elif self.compareHosts(self.hosts[host.id], host) != 0:
+ self.updateHost(host)
+ hhosts = {}
+ for host in hosts:
+ hhosts[host.id] = host
+ for host in self.hosts.values():
+ if not hhosts.has_key(host.id):
+ self.removeHost(host)
+ # Instance handling
+ def compareInstances(self, instance1, instance2):
+ def ii(inst):
+ return [inst.id,
+ inst.vmId,
+ inst.hostId,
+ tashi.util.vmStates[inst.state],
+ inst.userId,
+ inst.name,
+ inst.cores,
+ inst.memory,
+ len(inst.disks), # FIXME: this isn't a good way to compare
+ len(inst.nics), # FIXME: this isn't a good way to compare
+ len(inst.hints)]
+ return cmplists(ii(instance1), ii(instance2))
+ return 0
+ @synchronizedmethod
+ def updateInstance(self, instance):
+ qt = self.instances[instance.id].queueTime
+ self.instances[instance.id] = instance
+ self.instances[instance.id].updateTime = time.time()
+ self.instances[instance.id].queueTime = qt
+ @synchronizedmethod
+ def addInstance(self, instance):
+ self.instances[instance.id] = instance
+ self.instances[instance.id].updateTime = time.time()
+ self.instances[instance.id].queueTime = time.time()
+ @synchronizedmethod
+ def removeInstance(self, instance):
+ self.instances[instance.id].state = InstanceState.Exited
+ self.ihooks.postDestroy(instance)
+ @synchronizedmethod
+ def updateInstances(self):
+ if (not self.transport.isOpen):
+ self.transport.open()
+ instances = self.client.getInstances()
+ for instance in instances:
+ print 'found instance', instance.id
+ if not self.instances.has_key(instance.id):
+ print "it's a new instance"
+ self.addInstance(instance)
+ elif self.compareInstances(self.instances[instance.id], instance) != 0:
+ self.updateInstance(instance)
+ iinsts = {}
+ for instance in instances:
+ iinsts[instance.id] = instance
+ for instance in self.instances.values():
+ if instance.state == InstanceState.Exited:
+ continue
+ if not iinsts.has_key(instance.id):
+ print 'removing instance ', instance.id
+ self.removeInstance(instance)
+ # User handling
+ def compareUsers(self, user1, user2):
+ if user1.id < user2.id:
+ return -1
+ elif user1.id > user2.id:
+ return 1
+ if user1.name < user2.name:
+ return -1
+ if user1.name > user2.name:
+ return 1
+ return 0
+ @synchronizedmethod
+ def updateUser(self, user):
+ self.users[user.id] = user
+ self.users[user.id].updatetime = time.time()
+ @synchronizedmethod
+ def addUser(self, user):
+ self.users[user.id] = user
+ self.users[user.id].updatetime = time.time()
+ @synchronizedmethod
+ def removeUser(self, user):
+ self.users.pop(user.id)
+ @synchronizedmethod
+ def updateUsers(self):
+ if (not self.transport.isOpen):
+ self.transport.open()
+ users = self.client.getUsers()
+ for user in users:
+ if not self.users.has_key(user.id):
+ self.addUser(user)
+ elif self.compareUsers(self.users[user.id], user) != 0:
+ self.updateUser(user)
+ uusers = {}
+ for user in users:
+ uusers[user.id] = user
+ for user in self.users.values():
+ if not uusers.has_key(user.id):
+ self.removeUser(user)
+ # Get data structures for maui
+ # Format is {id:{field:value}}
+ @synchronizedmethod
+ def getNodes(self, updatetime=0, nodelist=['ALL']):
+ if len(nodelist) == 0:
+ return {}
+ if nodelist[0]=='ALL':
+ nodes = [n for n in self.hosts.values() if n.updateTime >= updatetime]
+ else:
+ nodes = [n for n in self.hosts.values()
+ if n.updateTime >= updatetime and n.name in nodelist]
+ nl = {}
+ for node in nodes:
+ nl[node.name] = {'STATE':self.wikiHostState(node),
+ 'UPDATETIME':str(int(node.updateTime)),
+ 'CPROC':str(node.cores),
+ 'CMEMORY':str(node.memory)}
+ return nl
+ @synchronizedmethod
+ def getJobs(self, updatetime=0, joblist=['ALL']):
+ if len(joblist) == 0:
+ return {}
+ if joblist[0] == 'ALL':
+ jobs = [j for j in self.instances.values() if j.updateTime >= updatetime]
+ else:
+ jobs = [j for j in self.instances.values()
+ if j.updateTime >= updatetime and j.id in joblist]
+ jl = {}
+ for job in jobs:
+ id = "%s.%i"%(job.name, job.id)
+ jl[id] = {'STATE':self.wikiInstanceState(job),
+ 'UNAME':self.users[job.userId].name,
+ 'GNAME':self.users[job.userId].name,
+ 'UPDATETIME':int(job.updateTime),
+ 'QUEUETIME':job.queueTime,
+ 'TASKS':'1',
+ 'DPROCS':str(job.cores),
+ 'DMEM':str(job.memory),
+ 'RMEM':str(job.memory),
+ 'WCLIMIT':str(self.defaultJobTime)}
+ if job.hostId != None:
+ jl[id]['TASKLIST'] = self.hosts[job.hostId].name
+ return jl
+ @synchronizedmethod
+ def activateById(self, id, host):
+ if not self.instances.has_key(id):
+ raise "no such instance"
+ self.ihooks.activateById(id, host)
+ self.instances[id].state=InstanceState.Activating
+
+class MauiListener(SocketServer.StreamRequestHandler):
+ def setup(self):
+ global config
+ self.log = logging.getLogger(__file__)
+ SocketServer.StreamRequestHandler.setup(self)
+ self.ihooks = InstanceHooks(config)
+ (self.client, self.transport) = createClient(config)
+ self.tashiconnection=tashiconnection
+ self.auth = config.get('MauiWiki', 'authuser')
+ self.key = config.get('MauiWiki', 'authkey')
+
+ def handle(self):
+ p = MauiPacket(key=self.key)
+ self.istream = self.ostream = self.rfile
+ p.readPacket(self.istream)
+ self.processPacket(p)
+
+ def processGetNodes(self, p):
+ arg = p.data[1]
+ arg = arg.split('=')
+ arg = arg[1].split(':')
+ updatetime = int(arg[0])
+ nodelist = arg[1:]
+ print 'got GETNODES packet "%s" "%s"'%(updatetime, nodelist)
+ r = MauiPacket()
+ nodes = tashiconnection.getNodes(updatetime, nodelist)
+ numNodes = len(nodes)
+ dat = 'ARG=%i#'%numNodes
+ first = True
+ for node, attributes in nodes.iteritems():
+ if first:
+ dat = dat + '%s:'%node
+ first = False
+ else:
+ dat = dat + '#%s:'%node
+ attrs = ['%s=%s'%(a,v) for a,v in attributes.iteritems()]
+ dat = dat + ';'.join(attrs)+';'
+ r.set(['SC=0', dat], auth=self.auth, key=self.key)
+ return r
+
+ def processGetJobs(self, p):
+ arg = p.data[1]
+ arg = arg.split('=')
+ arg = arg[1].split(':')
+ updatetime = int(arg[0])
+ joblist = arg[1:]
+ print 'got GETJOBS packet "%s" "%s"'%(updatetime, joblist)
+ r = MauiPacket();
+ jobs = tashiconnection.getJobs(updatetime, joblist)
+ numJobs = len(jobs)
+ dat = 'ARG=%i#'%numJobs
+ first = True
+ for job, attributes in jobs.iteritems():
+ if first:
+ dat = dat +'%s:'%job
+ first = False
+ else:
+ dat = dat +'#%s:'%job
+ # FIXME: support limits
+ attributes['WCLIMIT']=str(10000)
+ attrs = ['%s=%s'%(a,v) for a,v in attributes.iteritems()]
+ dat = dat + ';'.join(attrs) + ';'
+ r.set(['SC=0', dat], auth=self.auth, key=self.key)
+ return r
+
+ def processStartJob(self, p):
+ job = p.data[1]
+ job = job.split('=')[1].strip()
+ job = job.split('.')[-1]
+ tasklist = p.data[2].split('=')[1].split(':')
+ print 'STARTJOB ', job, tasklist
+ try:
+ hosts = self.client.getHosts()
+ print 'hosts ', hosts
+ host = [h for h in hosts if h.name == tasklist[0]][0]
+ self.tashiconnection.activateById(jobnameToId(job), host)
+ print '\tactivated VM!'
+ r = MauiPacket()
+ r.set(['SC=0','RESPONSE=VM %s started on host %s'%(job, tasklist[0])])
+ return r
+ except Exception, e:
+ # FIXME: make this a real failure response
+ print 'Oh noes! ', e
+ r = MauiPacket()
+ r.set(['SC=-1', 'RESPONSE=%s'%str(e)])
+ return r
+
+ def processCancelJob(self, p):
+ job = p.data[1]
+ job = job.split('=')[1].strip()
+ print 'CANCELJOB ', job
+ try:
+ self.client.destroyVm(jobnameToId(job))
+ print '\tdestroyed VM!'
+ r = MauiPacket()
+ r.set(['SC=0', 'RESPONSE=VM %s destroyed'%job])
+ return r
+ except Exception, e:
+ # FIXME: make this a real failure response
+ print 'Oh noes! ', e
+ r = MauiPacket()
+ r.set(['SC=-1', 'RESPONSE=%s'%str(e)])
+ return r
+
+ def processPacket(self,p):
+ dat = p.data
+ if not p.verifyChecksum():
+ print p
+ print 'bad checksum'
+ return
+ r = None
+ if dat[0] == 'CMD=GETNODES':
+ r = self.processGetNodes(p)
+ elif dat[0] == 'CMD=GETJOBS':
+ r = self.processGetJobs(p)
+ elif dat[0] == 'CMD=STARTJOB':
+ r = self.processStartJob(p)
+ elif dat[0] == 'CMD=CANCELJOB':
+ r = self.processCancelJob(p)
+ else:
+ print 'got unknown packet'
+ print p.prettyString()
+ r = MauiPacket()
+ r.set(['SC=-810', 'RESPONSE=command not supported'])
+ print r.prettyString()
+ self.ostream.write(str(r))
+ self.ostream.flush()
+ self.ostream.close()
+ self.ostream.close()
+ self.istream.close()
+
+if __name__ == '__main__':
+ (config, configFiles) = getConfig(["Agent"])
+ publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
+ tashi.publisher = publisher
+ (client, transport) = createClient(config)
+ logging.config.fileConfig(configFiles)
+ tashiconnection = TashiConnection(config, client, transport)
+ tashiconnection.start()
+
+ HOST, PORT = '', 1717
+ server = SocketServer.TCPServer((HOST,PORT), MauiListener)
+ server.serve_forever()
Propchange: incubator/tashi/trunk/src/tashi/agents/mauiwiki.py
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=792287&r1=792286&r2=792287&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Wed Jul 8 20:15:36 2009
@@ -135,7 +135,7 @@
def main():
(config, configFiles) = getConfig(["Agent"])
- publisher = instantiateImplementation(config.get("Primitive", "publisher"), config)
+ publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
tashi.publisher = publisher
(client, transport) = createClient(config)
logging.config.fileConfig(configFiles)