You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 15:56:12 UTC
[09/14] stratos git commit: fixing conflicts in StratosThreadPool
fixing conflicts in StratosThreadPool
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/412cb2c2
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/412cb2c2
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/412cb2c2
Branch: refs/heads/master
Commit: 412cb2c245a5ad3c1b6f189517c0c4f7d9879050
Parents: f302906
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Fri Dec 4 05:22:40 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:05:16 2015 +0530
----------------------------------------------------------------------
.../stratos/cartridge/agent/CartridgeAgent.java | 30 ++---
.../agent/CartridgeAgentEventListeners.java | 46 ++++----
.../agent/test/JavaCartridgeAgentTest.java | 10 +-
.../CloudControllerServiceComponent.java | 4 +-
.../status/InstanceStatusTopicReceiver.java | 24 ++--
.../common/threading/StratosThreadPool.java | 4 +-
.../StratosManagerServiceComponent.java | 4 +-
...ratosManagerInstanceStatusEventReceiver.java | 22 ++--
.../notifier/InstanceNotifierEventReceiver.java | 109 +++++++++++++------
.../status/InstanceStatusEventReceiver.java | 44 +++++---
.../mock/iaas/services/impl/MockInstance.java | 32 +++---
.../tests/PythonAgentIntegrationTest.java | 6 +-
.../integration/tests/adc/GitHookTestCase.java | 22 ++--
13 files changed, 208 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index b0bf326..c498caa 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -90,10 +90,10 @@ public class CartridgeAgent implements Runnable {
}
// Start instance notifier listener thread
- registerInstanceNotifierEventListeners();
- if (log.isInfoEnabled()) {
- log.info("Cartridge agent registerInstanceNotifierEventListeners done");
- }
+// registerInstanceNotifierEventListeners();
+// if (log.isInfoEnabled()) {
+// log.info("Cartridge agent registerInstanceNotifierEventListeners done");
+// }
// Start tenant event receiver thread
/*
@@ -174,17 +174,17 @@ public class CartridgeAgent implements Runnable {
logPublisherManager.stop();
}
- protected void registerInstanceNotifierEventListeners() {
- if (log.isDebugEnabled()) {
- log.debug("SsubscribeToTopicsAndRegisterListeners before");
- }
-
- eventListenerns.startInstanceNotifierReceiver();
-
- if (log.isDebugEnabled()) {
- log.debug("SsubscribeToTopicsAndRegisterListeners after");
- }
- }
+// protected void registerInstanceNotifierEventListeners() {
+// if (log.isDebugEnabled()) {
+// log.debug("SsubscribeToTopicsAndRegisterListeners before");
+// }
+//
+// eventListenerns.startInstanceNotifierReceiver();
+//
+// if (log.isDebugEnabled()) {
+// log.debug("SsubscribeToTopicsAndRegisterListeners after");
+// }
+// }
// protected void registerTopologyEventListeners() {
// if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
index ffa3750..5954b76 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
@@ -65,8 +65,8 @@ public class CartridgeAgentEventListeners {
private ApplicationSignUpEventReceiver applicationsEventReceiver;
private ExtensionHandler extensionHandler;
- private static final ExecutorService eventListenerExecutorService =
- StratosThreadPool.getExecutorService("cartridge.agent.event.listener.thread.pool", 10);
+// private static final ExecutorService eventListenerExecutorService =
+// StratosThreadPool.getExecutorService("cartridge.agent.event.listener.thread.pool", 10);
public CartridgeAgentEventListeners() {
if (log.isDebugEnabled()) {
@@ -77,7 +77,7 @@ public class CartridgeAgentEventListeners {
this.topologyEventReceiver = TopologyEventReceiver.getInstance();
//this.topologyEventReceiver.setExecutorService(eventListenerExecutorService);
- this.instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
+ this.instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance();
this.tenantEventReceiver = TenantEventReceiver.getInstance();
// this.tenantEventReceiver.setExecutorService(eventListenerExecutorService);
@@ -113,23 +113,23 @@ public class CartridgeAgentEventListeners {
//
// }
- public void startInstanceNotifierReceiver() {
-
- if (log.isDebugEnabled()) {
- log.debug("Starting cartridge agent instance notifier event message receiver");
- }
-
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- instanceNotifierEventReceiver.execute();
- }
- });
-
- if (log.isDebugEnabled()) {
- log.debug("Cartridge agent Instance notifier event message receiver started, waiting for event messages ...");
- }
- }
+// public void startInstanceNotifierReceiver() {
+//
+// if (log.isDebugEnabled()) {
+// log.debug("Starting cartridge agent instance notifier event message receiver");
+// }
+//
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// instanceNotifierEventReceiver.execute();
+// }
+// });
+//
+// if (log.isDebugEnabled()) {
+// log.debug("Cartridge agent Instance notifier event message receiver started, waiting for event messages ...");
+// }
+// }
// public void startTenantEventReceiver() {
//
@@ -521,9 +521,9 @@ public class CartridgeAgentEventListeners {
* Terminate load balancer topology receiver thread.
*/
- public void terminate() {
- topologyEventReceiver.terminate();
- }
+// public void terminate() {
+// topologyEventReceiver.terminate();
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
index 430e0b8..18ed2ab 100644
--- a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
+++ b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
@@ -111,9 +111,9 @@ public class JavaCartridgeAgentTest {
//topologyEventReceiver.setExecutorService(executorService);
//topologyEventReceiver.execute();
- instanceStatusEventReceiver = new InstanceStatusEventReceiver();
- instanceStatusEventReceiver.setExecutorService(executorService);
- instanceStatusEventReceiver.execute();
+ instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance();
+// instanceStatusEventReceiver.setExecutorService(executorService);
+// instanceStatusEventReceiver.execute();
instanceStarted = false;
instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() {
@@ -176,8 +176,8 @@ public class JavaCartridgeAgentTest {
} catch (Exception ignore) {
}
- this.instanceStatusEventReceiver.terminate();
- this.topologyEventReceiver.terminate();
+ //this.instanceStatusEventReceiver.terminate();
+ // this.topologyEventReceiver.terminate();
this.instanceActivated = false;
this.instanceStarted = false;
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index c4c0336..267d5a8 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -162,8 +162,8 @@ public class CloudControllerServiceComponent {
}
instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
- instanceStatusTopicReceiver.setExecutorService(executorService);
- instanceStatusTopicReceiver.execute();
+// instanceStatusTopicReceiver.setExecutorService(executorService);
+// instanceStatusTopicReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Instance status event receiver thread started");
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index 1f012b3..00ab8b7 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -45,21 +45,21 @@ public class InstanceStatusTopicReceiver {
private ExecutorService executorService;
public InstanceStatusTopicReceiver() {
- this.statusEventReceiver = new InstanceStatusEventReceiver();
+ this.statusEventReceiver = InstanceStatusEventReceiver.getInstance();
addEventListeners();
}
- public void execute() {
- statusEventReceiver.setExecutorService(executorService);
- statusEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread started");
- }
-
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread terminated");
- }
- }
+// public void execute() {
+// statusEventReceiver.setExecutorService(executorService);
+// statusEventReceiver.execute();
+// if (log.isInfoEnabled()) {
+// log.info("Cloud controller application status thread started");
+// }
+//
+// if (log.isInfoEnabled()) {
+// log.info("Cloud controller application status thread terminated");
+// }
+// }
private void addEventListeners() {
statusEventReceiver.addEventListener(new InstanceActivatedEventListener() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index c0ae8ae..687cec2 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -36,8 +36,8 @@ public class StratosThreadPool {
private static final Log log = LogFactory.getLog(StratosThreadPool.class);
- private static Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<String, ExecutorService>();
- private static Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
+ private static volatile Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<>();
+ private static volatile Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
private static Object executorServiceMapLock = new Object();
private static Object scheduledServiceMapLock = new Object();
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 573c19d..c4d68ae 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -202,8 +202,8 @@ public class StratosManagerServiceComponent {
*/
private void initializeInstanceStatusEventReceiver() {
instanceStatusEventReceiver = new StratosManagerInstanceStatusEventReceiver();
- instanceStatusEventReceiver.setExecutorService(executorService);
- instanceStatusEventReceiver.execute();
+// instanceStatusEventReceiver.setExecutorService(executorService);
+// instanceStatusEventReceiver.execute();
}
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
index 1da448e..ab92d1b 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
@@ -37,32 +37,34 @@ import java.util.List;
/**
* Stratos manager instance status event receiver.
*/
-public class StratosManagerInstanceStatusEventReceiver extends InstanceStatusEventReceiver {
+public class StratosManagerInstanceStatusEventReceiver {
private static final Log log = LogFactory.getLog(StratosManagerInstanceStatusEventReceiver.class);
private ApplicationSignUpHandler signUpManager;
private ArtifactDistributionCoordinator artifactDistributionCoordinator;
+ private InstanceStatusEventReceiver instanceStatusEventReceiver;
public StratosManagerInstanceStatusEventReceiver() {
signUpManager = new ApplicationSignUpHandler();
artifactDistributionCoordinator = new ArtifactDistributionCoordinator();
+ instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance();
addEventListeners();
}
- @Override
- public void execute() {
- super.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Stratos manager instance status event receiver thread started");
- }
- }
+// @Override
+// public void execute() {
+// super.execute();
+//
+// if (log.isInfoEnabled()) {
+// log.info("Stratos manager instance status event receiver thread started");
+// }
+// }
private void addEventListeners() {
- addEventListener(new InstanceStartedEventListener() {
+ instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() {
@Override
protected void onEvent(Event event) {
InstanceStartedEvent instanceStartedEvent = (InstanceStartedEvent) event;
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index 4ad6572..cfc7f11 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -21,64 +21,107 @@ package org.apache.stratos.messaging.message.receiver.instance.notifier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
/**
* A thread for receiving instance notifier information from message broker.
*/
-public class InstanceNotifierEventReceiver {
+public class InstanceNotifierEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(InstanceNotifierEventReceiver.class);
private final InstanceNotifierEventMessageDelegator messageDelegator;
private EventSubscriber eventSubscriber;
- private boolean terminated;
+ private InstanceNotifierEventMessageListener messageListener;
+ private static volatile InstanceNotifierEventReceiver instance;
+ //private boolean terminated;
- public InstanceNotifierEventReceiver() {
+ private InstanceNotifierEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue();
this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue);
- InstanceNotifierEventMessageListener messageListener = new InstanceNotifierEventMessageListener(messageQueue);
+ messageListener = new InstanceNotifierEventMessageListener(messageQueue);
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(),
messageListener);
+ execute();
+ }
+
+ public static InstanceNotifierEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (InstanceNotifierEventReceiver.class) {
+ if (instance == null) {
+ instance = new InstanceNotifierEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}
- public void execute() {
- synchronized (this) {
- if (terminated) {
- log.info("InstanceNotifierEventReceiver has been terminated. Event subscriber will not be created.");
- return;
+// public void execute() {
+// synchronized (this) {
+// if (terminated) {
+// log.info("InstanceNotifierEventReceiver has been terminated. Event subscriber will not be created.");
+// return;
+// }
+// try {
+// Thread subscriberThread = new Thread(eventSubscriber);
+// subscriberThread.start();
+// if (log.isDebugEnabled()) {
+// log.debug("InstanceNotifier event message receiver thread started");
+// }
+//
+// // Start instance notifier event message delegator thread
+// Thread receiverThread = new Thread(messageDelegator);
+// receiverThread.start();
+// if (log.isDebugEnabled()) {
+// log.debug("InstanceNotifier event message delegator thread started");
+// }
+// } catch (Exception e) {
+// if (log.isErrorEnabled()) {
+// log.error("InstanceNotifier receiver failed", e);
+// }
+// }
+// }
+// log.info("InstanceNotifierEventReceiver started");
+//
+// // Keep the thread live until terminated
+// while (!terminated) {
+// try {
+// Thread.sleep(2000);
+// } catch (InterruptedException ignore) {
+// }
+// }
+// }
+
+ private void execute() {
+ try {
+ // Start topic subscriber thread
+ eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(),
+ messageListener);
+ executorService.execute(eventSubscriber);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Instance Notifier event message receiver thread started");
}
- try {
- Thread subscriberThread = new Thread(eventSubscriber);
- subscriberThread.start();
- if (log.isDebugEnabled()) {
- log.debug("InstanceNotifier event message receiver thread started");
- }
- // Start instance notifier event message delegator thread
- Thread receiverThread = new Thread(messageDelegator);
- receiverThread.start();
- if (log.isDebugEnabled()) {
- log.debug("InstanceNotifier event message delegator thread started");
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("InstanceNotifier receiver failed", e);
- }
+ // Start topology event message delegator thread
+ executorService.execute(messageDelegator);
+ if (log.isDebugEnabled()) {
+ log.debug("Instance Notifier event message delegator thread started");
}
- }
- log.info("InstanceNotifierEventReceiver started");
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ignore) {
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Instance Notifier receiver failed", e);
}
}
}
@@ -90,7 +133,7 @@ public class InstanceNotifierEventReceiver {
public synchronized void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- terminated = true;
+ //terminated = true;
log.info("InstanceNotifierEventReceiver terminated");
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index 41f444e..a2a1623 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -21,27 +21,41 @@ package org.apache.stratos.messaging.message.receiver.instance.status;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
-
/**
* A thread for receiving instance notifier information from message broker.
*/
-public class InstanceStatusEventReceiver {
+public class InstanceStatusEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(InstanceStatusEventReceiver.class);
private final InstanceStatusEventMessageDelegator messageDelegator;
private final InstanceStatusEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private boolean terminated;
- private ExecutorService executorService;
+ private static volatile InstanceStatusEventReceiver instance;
- public InstanceStatusEventReceiver() {
+ private InstanceStatusEventReceiver() {
+ // TODO: make pool size configurable
+ this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue);
this.messageListener = new InstanceStatusEventMessageListener(messageQueue);
+ execute();
+ }
+
+ public static InstanceStatusEventReceiver getInstance () {
+ if (instance == null) {
+ synchronized (InstanceStatusEventReceiver.class) {
+ if (instance == null) {
+ instance = new InstanceStatusEventReceiver();
+ }
+ }
+ }
+
+ return instance;
}
public void addEventListener(EventListener eventListener) {
@@ -49,7 +63,7 @@ public class InstanceStatusEventReceiver {
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_STATUS_TOPIC.getTopicName(), messageListener);
@@ -77,14 +91,14 @@ public class InstanceStatusEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- terminated = true;
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
+ // terminated = true;
}
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
+// public ExecutorService getExecutorService() {
+// return executorService;
+// }
+//
+// public void setExecutorService(ExecutorService executorService) {
+// this.executorService = executorService;
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
index c752f9e..9886335 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
@@ -161,7 +161,7 @@ public class MockInstance implements Serializable {
}
private void startInstanceNotifierEventReceiver() {
- instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
+ instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance();
instanceNotifierEventReceiver.addEventListener(new InstanceCleanupClusterEventListener() {
@Override
protected void onEvent(Event event) {
@@ -185,17 +185,17 @@ public class MockInstance implements Serializable {
});
// TODO: Fix InstanceNotifierEventReceiver to use executor service
// do not remove this since execute() is a blocking call
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- instanceNotifierEventReceiver.execute();
- }
- });
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Mock instance instance notifier event message receiver started for mock member [member-id] %s",
- mockInstanceContext.getMemberId()));
- }
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// instanceNotifierEventReceiver.execute();
+// }
+// });
+// if (log.isDebugEnabled()) {
+// log.debug(String.format(
+// "Mock instance instance notifier event message receiver started for mock member [member-id] %s",
+// mockInstanceContext.getMemberId()));
+// }
}
private void handleMemberTermination() {
@@ -213,9 +213,9 @@ public class MockInstance implements Serializable {
healthStatNotifierScheduledFuture.cancel(true);
}
- private void stopInstanceNotifierReceiver() {
- instanceNotifierEventReceiver.terminate();
- }
+// private void stopInstanceNotifierReceiver() {
+// instanceNotifierEventReceiver.terminate();
+// }
public MockInstanceContext getMockInstanceContext() {
return mockInstanceContext;
@@ -223,7 +223,7 @@ public class MockInstance implements Serializable {
public synchronized void terminate() {
if (MemberStatus.Initialized.equals(memberStatus)) {
- stopInstanceNotifierReceiver();
+ //stopInstanceNotifierReceiver();
stopHealthStatisticsPublisher();
memberStatus = MemberStatus.Terminated;
if (log.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index c5751bd..4a86e40 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -129,9 +129,9 @@ public abstract class PythonAgentIntegrationTest {
// topologyEventReceiver.setExecutorService(executorService);
// topologyEventReceiver.execute();
- instanceStatusEventReceiver = new InstanceStatusEventReceiver();
- instanceStatusEventReceiver.setExecutorService(executorService);
- instanceStatusEventReceiver.execute();
+ instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance();
+// instanceStatusEventReceiver.setExecutorService(executorService);
+// instanceStatusEventReceiver.execute();
instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() {
@Override
http://git-wip-us.apache.org/repos/asf/stratos/blob/412cb2c2/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java b/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
index 3f708db..7412540 100644
--- a/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
+++ b/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
@@ -56,19 +56,19 @@ public class GitHookTestCase extends StratosIntegrationTest {
private static final String appPolicyId = "application-policy-git-hook-test";
private static final String GIT_HOOK_ARTIFACT_FILENAME = "hook-req.json";
private static final int ARTIFACT_UPDATED_EXPECTED_COUNT = 2;
- private ExecutorService eventListenerExecutorService = StratosThreadPool
- .getExecutorService("stratos.integration.test.git.thread.pool", 5);
+// private ExecutorService eventListenerExecutorService = StratosThreadPool
+// .getExecutorService("stratos.integration.test.git.thread.pool", 5);
@Test(timeOut = DEFAULT_TEST_TIMEOUT)
public void sendRepoNotify() throws Exception {
deployArtifacts();
- final InstanceNotifierEventReceiver instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
- eventListenerExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- instanceNotifierEventReceiver.execute();
- }
- });
+ final InstanceNotifierEventReceiver instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance();
+// eventListenerExecutorService.submit(new Runnable() {
+// @Override
+// public void run() {
+// instanceNotifierEventReceiver.execute();
+// }
+// });
ArtifactUpdateEventListener artifactUpdateEventListener = new ArtifactUpdateEventListener() {
@Override
@@ -86,8 +86,8 @@ public class GitHookTestCase extends StratosIntegrationTest {
Thread.sleep(1000);
}
TopologyHandler.getInstance().assertApplicationActiveStatus(applicationId);
- instanceNotifierEventReceiver.terminate();
- eventListenerExecutorService.shutdownNow();
+ //instanceNotifierEventReceiver.terminate();
+ // eventListenerExecutorService.shutdownNow();
undeployArtifacts();
}