You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by se...@apache.org on 2013/06/23 17:27:03 UTC
git commit: updated refs/heads/master to 34fc209
Updated Branches:
refs/heads/master 3fdcf1836 -> 34fc20941
CLOUDSTACK-3096 format asyncJobMgr
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/34fc2094
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/34fc2094
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/34fc2094
Branch: refs/heads/master
Commit: 34fc209419dc0f9b6d347e7a33371bc0941069b6
Parents: 3fdcf18
Author: Daan Hoogland <da...@onecht.net>
Authored: Sat Jun 22 23:38:19 2013 +0200
Committer: Sebastien Goasguen <ru...@gmail.com>
Committed: Sun Jun 23 11:26:53 2013 -0400
----------------------------------------------------------------------
tools/marvin/marvin/asyncJobMgr.py | 46 +++++++++++++++++++++------------
1 file changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34fc2094/tools/marvin/marvin/asyncJobMgr.py
----------------------------------------------------------------------
diff --git a/tools/marvin/marvin/asyncJobMgr.py b/tools/marvin/marvin/asyncJobMgr.py
index 6984627..25818a6 100644
--- a/tools/marvin/marvin/asyncJobMgr.py
+++ b/tools/marvin/marvin/asyncJobMgr.py
@@ -5,9 +5,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@ import sys
import jsonHelper
import datetime
+
class job(object):
def __init__(self):
self.id = None
@@ -41,7 +42,8 @@ class jobStatus(object):
self.responsecls = None
def __str__(self):
- return '{%s}' % str(', '.join('%s : %s' % (k, repr(v)) for (k, v) in self.__dict__.iteritems()))
+ return '{%s}' % str(', '.join('%s : %s' % (k, repr(v)) for (k, v)
+ in self.__dict__.iteritems()))
class workThread(threading.Thread):
@@ -82,8 +84,9 @@ class workThread(threading.Thread):
result = self.connection.make_request(cmd)
jobstatus.result = result
jobstatus.endTime = datetime.datetime.now()
- jobstatus.duration = time.mktime(jobstatus.endTime.timetuple()) - time.mktime(
- jobstatus.startTime.timetuple())
+ jobstatus.duration =\
+ time.mktime(jobstatus.endTime.timetuple()) - time.mktime(
+ jobstatus.startTime.timetuple())
else:
result = self.connection.make_request(cmd, None, True)
if result is None:
@@ -92,8 +95,10 @@ class workThread(threading.Thread):
jobId = result.jobid
jobstatus.jobId = jobId
try:
- responseName = cmd.__class__.__name__.replace("Cmd", "Response")
- jobstatus.responsecls = jsonHelper.getclassFromName(cmd, responseName)
+ responseName =\
+ cmd.__class__.__name__.replace("Cmd", "Response")
+ jobstatus.responsecls =\
+ jsonHelper.getclassFromName(cmd, responseName)
except:
pass
jobstatus.status = True
@@ -175,7 +180,8 @@ class asyncJobMgr(object):
jobId = jobstatus.jobId
if jobId is not None and self.db is not None:
result = self.db.execute(
- "select job_status, created, last_updated from async_job where id='%s'" % str(jobId))
+ "select job_status, created, last_updated from async_job where\
+ id='%s'" % str(jobId))
if result is not None and len(result) > 0:
if result[0][0] == 1:
jobstatus.status = True
@@ -192,7 +198,8 @@ class asyncJobMgr(object):
resultQueue = Queue.Queue()
'''intermediate result is stored in self.outqueue'''
for i in range(workers):
- worker = workThread(self.outqueue, resultQueue, self.apiClient, self.db, lock)
+ worker = workThread(self.outqueue, resultQueue, self.apiClient,
+ self.db, lock)
worker.start()
self.outqueue.join()
@@ -205,20 +212,26 @@ class asyncJobMgr(object):
return asyncJobResult
- '''put commands into a queue at first, then start workers numbers threads to execute this commands'''
-
def submitCmdsAndWait(self, cmds, workers=10):
+ '''
+ put commands into a queue at first, then start workers numbers
+ threads to execute this commands
+ '''
self.submitCmds(cmds)
lock = threading.Lock()
for i in range(workers):
- worker = workThread(self.inqueue, self.outqueue, self.apiClient, self.db, lock)
+ worker = workThread(self.inqueue, self.outqueue, self.apiClient,
+ self.db, lock)
worker.start()
return self.waitForComplete(workers)
- '''submit one job and execute the same job ntimes, with nums_threads of threads'''
-
- def submitJobExecuteNtimes(self, job, ntimes=1, nums_threads=1, interval=1):
+ def submitJobExecuteNtimes(self, job, ntimes=1, nums_threads=1,
+ interval=1):
+ '''
+ submit one job and execute the same job ntimes, with nums_threads
+ of threads
+ '''
inqueue1 = Queue.Queue()
lock = threading.Condition()
for i in range(ntimes):
@@ -232,9 +245,8 @@ class asyncJobMgr(object):
work.start()
inqueue1.join()
- '''submit n jobs, execute them with nums_threads of threads'''
-
def submitJobs(self, jobs, nums_threads=1, interval=1):
+ '''submit n jobs, execute them with nums_threads of threads'''
inqueue1 = Queue.Queue()
lock = threading.Condition()