You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by bh...@apache.org on 2014/11/24 19:48:23 UTC

git commit: updated refs/heads/4.3 to 2aa9bb6

Repository: cloudstack
Updated Branches:
  refs/heads/4.3 15ded4fdd -> 2aa9bb689


CLOUDSTACK-6743: Use edge-triggering in MessageDetector to handle bogus wakeup gracefully. Level triggering plus bogus wakeup can cause a tight loop to spin

(cherry picked from commit 09ec127470febacb45df1e0323a7bd7e7343bd2e)
Signed-off-by: Rohit Yadav <ro...@shapeblue.com>

Conflicts:
	framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
	framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/2aa9bb68
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/2aa9bb68
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/2aa9bb68

Branch: refs/heads/4.3
Commit: 2aa9bb6896c51733dc8a321888e1e1cba566e618
Parents: 15ded4f
Author: Kelven Yang <ke...@gmail.com>
Authored: Wed May 21 16:14:14 2014 -0700
Committer: Rohit Yadav <ro...@shapeblue.com>
Committed: Tue Nov 25 00:03:40 2014 +0530

----------------------------------------------------------------------
 .../apache/cloudstack/context/CallContext.java  |  15 +-
 .../framework/messagebus/MessageDetector.java   |  31 ++-
 .../cloudstack/messagebus/TestMessageBus.java   | 228 +++++++++----------
 framework/jobs/pom.xml                          |  17 +-
 .../framework/jobs/AsyncJobManagerTest.java     | 129 +++++++++++
 .../jobs/AsyncJobManagerTestConfiguration.java  |  54 +++++
 .../framework/jobs/AsyncJobTestDashboard.java   |  47 ++++
 .../framework/jobs/AsyncJobTestDispatcher.java  |  62 +++++
 .../resources/AsyncJobManagerTestContext.xml    |  38 ++++
 framework/jobs/test/resources/commonContext.xml |  37 +++
 framework/jobs/test/resources/db.properties     |  66 ++++++
 framework/jobs/test/resources/log4j.properties  |  35 +++
 12 files changed, 633 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/api/src/org/apache/cloudstack/context/CallContext.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/context/CallContext.java b/api/src/org/apache/cloudstack/context/CallContext.java
index 761eb51..2f9c6d9 100644
--- a/api/src/org/apache/cloudstack/context/CallContext.java
+++ b/api/src/org/apache/cloudstack/context/CallContext.java
@@ -113,7 +113,20 @@ public class CallContext {
     }
 
     public static CallContext current() {
-        return s_currentContext.get();
+        CallContext context = s_currentContext.get();
+
+        // TODO other than async job and api dispatches, there are many system background running threads
+        // that do not setup CallContext at all, however, many places in code that are touched by these background tasks
+        // assume not-null CallContext. Following is a fix to address therefore caused NPE problems
+        //
+        // There are security implications with this. It assumes that all system background running threads are
+        // indeed have no problem in running under system context.
+        //
+        if (context == null) {
+            context = registerSystemCallContextOnceOnly();
+        }
+
+        return context;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
index 3fb620c..1dcd6bd 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
@@ -18,27 +18,31 @@
  */
 package org.apache.cloudstack.framework.messagebus;
 
+import org.apache.log4j.Logger;
+
 public class MessageDetector implements MessageSubscriber {
+    private static final Logger s_logger = Logger.getLogger(MessageDetector.class);
 
     private MessageBus _messageBus;
     private String[] _subjects;
 
-    private volatile boolean _signalled = false;
-
     public MessageDetector() {
         _messageBus = null;
         _subjects = null;
     }
 
-    public boolean waitAny(long timeoutInMiliseconds) {
-        _signalled = false;
+    public void waitAny(long timeoutInMiliseconds) {
+        if (timeoutInMiliseconds < 100) {
+            s_logger.warn("waitAny is passed with a too short time-out interval. " + timeoutInMiliseconds + "ms");
+            timeoutInMiliseconds = 100;
+        }
+
         synchronized (this) {
             try {
                 wait(timeoutInMiliseconds);
             } catch (InterruptedException e) {
             }
         }
-        return _signalled;
     }
 
     public void open(MessageBus messageBus, String[] subjects) {
@@ -67,9 +71,20 @@ public class MessageDetector implements MessageSubscriber {
 
     @Override
     public void onPublishMessage(String senderAddress, String subject, Object args) {
-        synchronized (this) {
-            _signalled = true;
-            notifyAll();
+        if (subjectMatched(subject)) {
+            synchronized (this) {
+                notifyAll();
+            }
+        }
+    }
+
+    private boolean subjectMatched(String subject) {
+        if (_subjects != null) {
+            for (String sub : _subjects) {
+                if (sub.equals(subject))
+                    return true;
+            }
         }
+        return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
index 33c5ce5..4dd737b 100644
--- a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
+++ b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
@@ -36,120 +36,116 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(locations="classpath:/MessageBusTestContext.xml")
 public class TestMessageBus extends TestCase {
-	
-	@Inject MessageBus _messageBus;
-
-	@Test
-	public void testExactSubjectMatch() {
-		_messageBus.subscribe("Host", new MessageSubscriber() {
-
-			@Override
-			public void onPublishMessage(String senderAddress, String subject, Object args) {
-				Assert.assertEquals(subject, "Host");
-			}
-		});
-		
-		_messageBus.publish(null, "Host", PublishScope.LOCAL, null);
-		_messageBus.publish(null, "VM", PublishScope.LOCAL, null);
-		_messageBus.clearAll();
-	}
-
-	@Test
-	public void testRootSubjectMatch() {
-		_messageBus.subscribe("/", new MessageSubscriber() {
-
-			@Override
-			public void onPublishMessage(String senderAddress, String subject, Object args) {
-				Assert.assertTrue(subject.equals("Host") || subject.equals("VM"));
-			}
-		});
-		
-		_messageBus.publish(null, "Host", PublishScope.LOCAL, null);
-		_messageBus.publish(null, "VM", PublishScope.LOCAL, null);
-		_messageBus.clearAll();
-	}
-	
-	@Test
-	public void testMiscMatch() {
-		MessageSubscriber subscriberAtParentLevel = new MessageSubscriber() {
-			@Override
-			public void onPublishMessage(String senderAddress, String subject, Object args) {
-				Assert.assertTrue(subject.startsWith(("Host")) || subject.startsWith("VM"));
-			}
-		};
-		
-		MessageSubscriber subscriberAtChildLevel = new MessageSubscriber() {
-			@Override
-			public void onPublishMessage(String senderAddress, String subject, Object args) {
-				Assert.assertTrue(subject.equals("Host.123"));
-			}
-		};
-		
-		subscriberAtParentLevel = Mockito.spy(subscriberAtParentLevel);
-		subscriberAtChildLevel = Mockito.spy(subscriberAtChildLevel);
-		
-		_messageBus.subscribe("Host", subscriberAtParentLevel);
-		_messageBus.subscribe("VM", subscriberAtParentLevel);
-		_messageBus.subscribe("Host.123", subscriberAtChildLevel);
-		
-		_messageBus.publish(null, "Host.123", PublishScope.LOCAL, null);
-		_messageBus.publish(null, "Host.321", PublishScope.LOCAL, null);
-		_messageBus.publish(null, "VM.123", PublishScope.LOCAL, null);
-		
-		Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "Host.123", null);
-		Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "Host.321", null);
-		Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "VM.123", null);
-		Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, "Host.123", null);
-
-		Mockito.reset(subscriberAtParentLevel);
-		Mockito.reset(subscriberAtChildLevel);
-		
-		_messageBus.unsubscribe(null, subscriberAtParentLevel);
-		_messageBus.publish(null, "Host.123", PublishScope.LOCAL, null);
-		_messageBus.publish(null, "VM.123", PublishScope.LOCAL, null);
-	
-		Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, "Host.123", null);
-		Mockito.verify(subscriberAtParentLevel, Mockito.times(0)).onPublishMessage(null, "Host.123", null);
-		Mockito.verify(subscriberAtParentLevel, Mockito.times(0)).onPublishMessage(null, "VM.123", null);
-		
-		_messageBus.clearAll();
-	}
-	
-	public void testMessageDetector() {
-		MessageDetector detector = new MessageDetector();
-		detector.open(_messageBus, new String[] {"VM", "Host"});
-		
-		Thread thread = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				for(int i = 0; i < 2; i++) {
-					try {
-						Thread.sleep(3000);
-					} catch (InterruptedException e) {
-					}
-					_messageBus.publish(null, "Host", PublishScope.GLOBAL, null);
-				}
-			}
-		});
-		thread.start();
-		
-		try {
-			int count = 0;
-			while(count < 2) {
-				if(detector.waitAny(1000)) {
-					System.out.println("Detected signal on bus");
-					count++;
-				} else {
-					System.out.println("Waiting timed out");
-				}
-			}
-		} finally {
-			detector.close();
-		}
-		
-		try {
-			thread.join();
-		} catch (InterruptedException e) {
-		}
-	}
+
+    @Inject
+    MessageBus _messageBus;
+
+    @Test
+    public void testExactSubjectMatch() {
+        _messageBus.subscribe("Host", new MessageSubscriber() {
+
+            @Override
+            public void onPublishMessage(String senderAddress, String subject, Object args) {
+                Assert.assertEquals(subject, "Host");
+            }
+        });
+
+        _messageBus.publish(null, "Host", PublishScope.LOCAL, null);
+        _messageBus.publish(null, "VM", PublishScope.LOCAL, null);
+        _messageBus.clearAll();
+    }
+
+    @Test
+    public void testRootSubjectMatch() {
+        _messageBus.subscribe("/", new MessageSubscriber() {
+
+            @Override
+            public void onPublishMessage(String senderAddress, String subject, Object args) {
+                Assert.assertTrue(subject.equals("Host") || subject.equals("VM"));
+            }
+        });
+
+        _messageBus.publish(null, "Host", PublishScope.LOCAL, null);
+        _messageBus.publish(null, "VM", PublishScope.LOCAL, null);
+        _messageBus.clearAll();
+    }
+
+    @Test
+    public void testMiscMatch() {
+        MessageSubscriber subscriberAtParentLevel = new MessageSubscriber() {
+            @Override
+            public void onPublishMessage(String senderAddress, String subject, Object args) {
+                Assert.assertTrue(subject.startsWith(("Host")) || subject.startsWith("VM"));
+            }
+        };
+
+        MessageSubscriber subscriberAtChildLevel = new MessageSubscriber() {
+            @Override
+            public void onPublishMessage(String senderAddress, String subject, Object args) {
+                Assert.assertTrue(subject.equals("Host.123"));
+            }
+        };
+
+        subscriberAtParentLevel = Mockito.spy(subscriberAtParentLevel);
+        subscriberAtChildLevel = Mockito.spy(subscriberAtChildLevel);
+
+        _messageBus.subscribe("Host", subscriberAtParentLevel);
+        _messageBus.subscribe("VM", subscriberAtParentLevel);
+        _messageBus.subscribe("Host.123", subscriberAtChildLevel);
+
+        _messageBus.publish(null, "Host.123", PublishScope.LOCAL, null);
+        _messageBus.publish(null, "Host.321", PublishScope.LOCAL, null);
+        _messageBus.publish(null, "VM.123", PublishScope.LOCAL, null);
+
+        Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "Host.123", null);
+        Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "Host.321", null);
+        Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "VM.123", null);
+        Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, "Host.123", null);
+
+        Mockito.reset(subscriberAtParentLevel);
+        Mockito.reset(subscriberAtChildLevel);
+
+        _messageBus.unsubscribe(null, subscriberAtParentLevel);
+        _messageBus.publish(null, "Host.123", PublishScope.LOCAL, null);
+        _messageBus.publish(null, "VM.123", PublishScope.LOCAL, null);
+
+        Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, "Host.123", null);
+        Mockito.verify(subscriberAtParentLevel, Mockito.times(0)).onPublishMessage(null, "Host.123", null);
+        Mockito.verify(subscriberAtParentLevel, Mockito.times(0)).onPublishMessage(null, "VM.123", null);
+
+        _messageBus.clearAll();
+    }
+
+    public void testMessageDetector() {
+        MessageDetector detector = new MessageDetector();
+        detector.open(_messageBus, new String[] {"VM", "Host"});
+
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                for (int i = 0; i < 2; i++) {
+                    try {
+                        Thread.sleep(3000);
+                    } catch (InterruptedException e) {
+                    }
+                    _messageBus.publish(null, "Host", PublishScope.GLOBAL, null);
+                }
+            }
+        });
+        thread.start();
+
+        try {
+            int count = 0;
+            while (count < 2) {
+                detector.waitAny(1000);
+            }
+        } finally {
+            detector.close();
+        }
+
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/jobs/pom.xml
----------------------------------------------------------------------
diff --git a/framework/jobs/pom.xml b/framework/jobs/pom.xml
index 02645d3..569932a 100644
--- a/framework/jobs/pom.xml
+++ b/framework/jobs/pom.xml
@@ -56,6 +56,21 @@
       <groupId>org.apache.cloudstack</groupId>
       <artifactId>cloud-framework-config</artifactId>
       <version>${project.version}</version>
-    </dependency>    
+    </dependency>
+    <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-framework-events</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-engine-schema</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTest.java
----------------------------------------------------------------------
diff --git a/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTest.java b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTest.java
new file mode 100644
index 0000000..62a8d81
--- /dev/null
+++ b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTest.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+/*
+ * This integration test requires real DB setup, it is not meant to run at per-build
+ * basis, it can only be opened in developer's run
+ *
+ *
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations = "classpath:/AsyncJobManagerTestContext.xml")
+public class AsyncJobManagerTest extends TestCase {
+    private static final Logger s_logger =
+            Logger.getLogger(AsyncJobManagerTest.class);
+
+    @Inject
+    AsyncJobManager _jobMgr;
+
+    @Inject
+    AsyncJobTestDashboard _testDashboard;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        try {
+            ComponentContext.initComponentsLifeCycle();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            s_logger.error(ex.getMessage());
+        }
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    public void testWaitBehave() {
+
+        final Object me = this;
+        new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                s_logger.info("Sleeping...");
+                try {
+                    Thread.sleep(3000);
+                } catch (InterruptedException e) {
+                }
+
+                s_logger.info("wakeup");
+                synchronized (me) {
+                    me.notifyAll();
+                }
+            }
+
+        }).start();
+
+        s_logger.info("First wait");
+        synchronized (me) {
+            try {
+                wait(5000);
+            } catch (InterruptedException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+        s_logger.info("First wait done");
+
+        s_logger.info("Second wait");
+        synchronized (me) {
+            try {
+                wait(5000);
+            } catch (InterruptedException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+        s_logger.info("Second wait done");
+    }
+
+    @Test
+    public void test() {
+        final int TOTAL_JOBS_PER_QUEUE = 5;
+        final int TOTAL_QUEUES = 100;
+
+        for (int i = 0; i < TOTAL_QUEUES; i++) {
+            for (int j = 0; j < TOTAL_JOBS_PER_QUEUE; j++) {
+                AsyncJobVO job = new AsyncJobVO();
+                job.setCmd("TestCmd");
+                job.setDispatcher("TestJobDispatcher");
+                job.setCmdInfo("TestCmd info");
+
+                _jobMgr.submitAsyncJob(job, "fakequeue", i);
+
+                s_logger.info("Job submitted. job " + job.getId() + ", queue: " + i);
+            }
+        }
+
+        while (true) {
+            if (_testDashboard.getCompletedJobCount() == TOTAL_JOBS_PER_QUEUE * TOTAL_QUEUES)
+                break;
+
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
+
+        s_logger.info("Test done with " + _testDashboard.getCompletedJobCount() + " job executed");
+    }
+}
+
+*/

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTestConfiguration.java
----------------------------------------------------------------------
diff --git a/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTestConfiguration.java b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTestConfiguration.java
new file mode 100644
index 0000000..a70913c
--- /dev/null
+++ b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTestConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.jobs;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import org.apache.cloudstack.framework.config.ConfigDepot;
+import org.apache.cloudstack.framework.config.ScopedConfigStorage;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDaoImpl;
+import org.apache.cloudstack.framework.config.impl.ConfigDepotImpl;
+
+import com.cloud.storage.dao.StoragePoolDetailsDaoImpl;
+
+@Configuration
+public class AsyncJobManagerTestConfiguration {
+
+    @Bean
+    public ConfigDepot configDepot() {
+        return new ConfigDepotImpl();
+    }
+
+    @Bean
+    public ConfigurationDao configDao() {
+        return new ConfigurationDaoImpl();
+    }
+
+    @Bean
+    public ScopedConfigStorage scopedConfigStorage() {
+        return new StoragePoolDetailsDaoImpl();
+    }
+
+    @Bean
+    public AsyncJobTestDashboard testDashboard() {
+        return new AsyncJobTestDashboard();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDashboard.java
----------------------------------------------------------------------
diff --git a/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDashboard.java b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDashboard.java
new file mode 100644
index 0000000..728138d
--- /dev/null
+++ b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDashboard.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.jobs;
+
+public class AsyncJobTestDashboard {
+    int _completedJobCount = 0;
+    int _concurrencyCount = 0;
+
+    public AsyncJobTestDashboard() {
+    }
+
+    public synchronized int getCompletedJobCount() {
+        return _completedJobCount;
+    }
+
+    public synchronized void jobCompleted() {
+        _completedJobCount++;
+    }
+
+    public synchronized int getConcurrencyCount() {
+        return _concurrencyCount;
+    }
+
+    public synchronized void increaseConcurrency() {
+        _concurrencyCount++;
+    }
+
+    public synchronized void decreaseConcurrency() {
+        _concurrencyCount--;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDispatcher.java b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDispatcher.java
new file mode 100644
index 0000000..34351a6
--- /dev/null
+++ b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDispatcher.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+import java.util.Random;
+
+import javax.inject.Inject;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.jobs.JobInfo.Status;
+
+import com.cloud.utils.component.AdapterBase;
+
+public class AsyncJobTestDispatcher extends AdapterBase implements AsyncJobDispatcher {
+    private static final Logger s_logger =
+            Logger.getLogger(AsyncJobTestDispatcher.class);
+
+    @Inject
+    private AsyncJobManager _asyncJobMgr;
+
+    @Inject
+    private AsyncJobTestDashboard _testDashboard;
+
+    Random _random = new Random();
+
+    public AsyncJobTestDispatcher() {
+    }
+
+    @Override
+    public void runJob(final AsyncJob job) {
+        _testDashboard.increaseConcurrency();
+
+        s_logger.info("Execute job " + job.getId() + ", current concurrency " + _testDashboard.getConcurrencyCount());
+
+        int interval = 3000;
+
+        try {
+            Thread.sleep(interval);
+        } catch (InterruptedException e) {
+        }
+
+        _asyncJobMgr.completeAsyncJob(job.getId(), Status.SUCCEEDED, 0, null);
+
+        _testDashboard.decreaseConcurrency();
+        _testDashboard.jobCompleted();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/jobs/test/resources/AsyncJobManagerTestContext.xml
----------------------------------------------------------------------
diff --git a/framework/jobs/test/resources/AsyncJobManagerTestContext.xml b/framework/jobs/test/resources/AsyncJobManagerTestContext.xml
new file mode 100644
index 0000000..fd5db30
--- /dev/null
+++ b/framework/jobs/test/resources/AsyncJobManagerTestContext.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
+  xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans
+                      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
+                      http://www.springframework.org/schema/tx 
+                      http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
+                      http://www.springframework.org/schema/aop
+                      http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
+                      http://www.springframework.org/schema/context
+                      http://www.springframework.org/schema/context/spring-context-3.0.xsd">
+    <import resource="commonContext.xml"/>
+    <import resource="classpath*:spring-framework-jobs-core-context.xml"/>
+    <bean id="AsyncJobManagerTestConfiguration"
+    	class="org.apache.cloudstack.framework.jobs.AsyncJobManagerTestConfiguration" />
+    <bean id="TestJobDispatcher" class="org.apache.cloudstack.framework.jobs.AsyncJobTestDispatcher">
+        <property name="name" value="TestJobDispatcher" />
+    </bean>      
+</beans>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/jobs/test/resources/commonContext.xml
----------------------------------------------------------------------
diff --git a/framework/jobs/test/resources/commonContext.xml b/framework/jobs/test/resources/commonContext.xml
new file mode 100644
index 0000000..6c3ca75
--- /dev/null
+++ b/framework/jobs/test/resources/commonContext.xml
@@ -0,0 +1,37 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+  license agreements. See the NOTICE file distributed with this work for additional 
+  information regarding copyright ownership. The ASF licenses this file to 
+  you under the Apache License, Version 2.0 (the "License"); you may not use 
+  this file except in compliance with the License. You may obtain a copy of 
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+  by applicable law or agreed to in writing, software distributed under the 
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+  OF ANY KIND, either express or implied. See the License for the specific 
+  language governing permissions and limitations under the License. -->
+<beans xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
+  xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans
+                      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
+                      http://www.springframework.org/schema/tx 
+                      http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
+                      http://www.springframework.org/schema/aop
+                      http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
+                      http://www.springframework.org/schema/context
+                      http://www.springframework.org/schema/context/spring-context-3.0.xsd">
+
+  <context:annotation-config />
+  <bean id="transactionContextBuilder" class="com.cloud.utils.db.TransactionContextBuilder" />
+
+  <bean id="instantiatePostProcessor" class="com.cloud.utils.component.ComponentInstantiationPostProcessor">
+    <property name="Interceptors">
+      <list>
+        <ref bean="transactionContextBuilder" />
+      </list>
+    </property>
+  </bean>
+
+  <bean id="eventBus" class = "org.apache.cloudstack.framework.messagebus.MessageBusBase" />
+  <bean id="componentContext" class="com.cloud.utils.component.ComponentContext"/>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/jobs/test/resources/db.properties
----------------------------------------------------------------------
diff --git a/framework/jobs/test/resources/db.properties b/framework/jobs/test/resources/db.properties
new file mode 100644
index 0000000..e07d80c
--- /dev/null
+++ b/framework/jobs/test/resources/db.properties
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cluster.servlet.port=9090
+
+# CloudStack database settings
+db.cloud.username=cloud
+db.cloud.password=cloud
+db.root.password=
+db.cloud.host=localhost
+db.cloud.port=3306
+db.cloud.name=cloud
+
+# CloudStack database tuning parameters
+db.cloud.maxActive=250
+db.cloud.maxIdle=30
+db.cloud.maxWait=10000
+db.cloud.autoReconnect=true
+db.cloud.validationQuery=SELECT 1
+db.cloud.testOnBorrow=true
+db.cloud.testWhileIdle=true
+db.cloud.timeBetweenEvictionRunsMillis=40000
+db.cloud.minEvictableIdleTimeMillis=240000
+db.cloud.poolPreparedStatements=false
+db.cloud.url.params=prepStmtCacheSize=517&cachePrepStmts=true&prepStmtCacheSqlLimit=4096
+
+# usage database settings
+db.usage.username=cloud
+db.usage.password=cloud
+db.usage.host=localhost
+db.usage.port=3306
+db.usage.name=cloud_usage
+
+# usage database tuning parameters
+db.usage.maxActive=100
+db.usage.maxIdle=30
+db.usage.maxWait=10000
+db.usage.autoReconnect=true
+
+# awsapi database settings
+db.awsapi.name=cloudbridge
+
+# Simulator database settings
+db.simulator.username=cloud
+db.simulator.password=cloud
+db.simulator.host=localhost
+db.simulator.port=3306
+db.simulator.name=simulator
+db.simulator.maxActive=250
+db.simulator.maxIdle=30
+db.simulator.maxWait=10000
+db.simulator.autoReconnect=true

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2aa9bb68/framework/jobs/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/framework/jobs/test/resources/log4j.properties b/framework/jobs/test/resources/log4j.properties
new file mode 100644
index 0000000..1119ecb
--- /dev/null
+++ b/framework/jobs/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
+log4j.appender.stdout.threshold=DEBUG
+log4j.rootLogger=INFO, rolling
+log4j.appender.rolling=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
+log4j.appender.rolling.layout.ConversionPattern=%d %-5p [%c{3}] (%t:%x) %m%n
+log4j.appender.rolling.file.threshold=DEBUG
+log4j.appender.rolling.File_testDashboard=./logs/testclient.log
+log4j.appender.rolling.DatePattern='.'yyy-MM-dd
+log4j.appender.rolling.file.append=false
+log4j.category.org.apache=DEBUG, rolling, stdout
+#log4j.category.com.cloud.utils.db.Transaction=ALL
+log4j.category.org.apache.cloudstack.network.contrail=ALL
+log4j.category.com.cloud.network=ALL
+