You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/11/02 16:31:10 UTC
[29/50] [abbrv] stratos git commit: Revert "Merge with
tenant-isolation branch"
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationInstanceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationInstanceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationInstanceCreatedMessageProcessor.java
index 9376fd7..e56d7bf 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationInstanceCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/ApplicationInstanceCreatedMessageProcessor.java
@@ -40,18 +40,14 @@ public class ApplicationInstanceCreatedMessageProcessor extends MessageProcessor
@Override
public boolean process(String type, String message, Object object) {
-
Applications applications = (Applications) object;
-
if (ApplicationInstanceCreatedEvent.class.getName().equals(type)) {
-
if (!applications.isInitialized()) {
return false;
}
-
-
- ApplicationInstanceCreatedEvent event = (ApplicationInstanceCreatedEvent) MessagingUtil.jsonToObject(message,
- ApplicationInstanceCreatedEvent.class);
+ ApplicationInstanceCreatedEvent event =
+ (ApplicationInstanceCreatedEvent) MessagingUtil.jsonToObject(message,
+ ApplicationInstanceCreatedEvent.class);
if (event == null) {
log.error("Unable to convert the JSON message to ApplicationInstanceCreatedEvent");
return false;
@@ -60,17 +56,18 @@ public class ApplicationInstanceCreatedMessageProcessor extends MessageProcessor
ApplicationsUpdater.acquireWriteLockForApplications();
try {
return doProcess(event, applications);
-
- } finally {
+ }
+ finally {
ApplicationsUpdater.releaseWriteLockForApplications();
}
-
} else {
if (nextProcessor != null) {
// ask the next processor to take care of the message.
return nextProcessor.process(type, message, applications);
} else {
- throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ throw new RuntimeException(String.format(
+ "Failed to process message using available message processors: [type] %s [body] %s", type,
+ message));
}
}
}
@@ -78,13 +75,18 @@ public class ApplicationInstanceCreatedMessageProcessor extends MessageProcessor
private boolean doProcess(ApplicationInstanceCreatedEvent event, Applications applications) {
// check if required properties are available
- if (event.getApplicationInstance() == null) {
- String errorMsg = "Application instance object of application instance created event is invalid";
+ if (event.getApplicationInstance() == null || event.getApplicationId() == null) {
+ String errorMsg = "Application instance object of ApplicationInstanceCreatedEvent is invalid. " +
+ "[ApplicationId] " + event.getApplicationId() + ", [ApplicationInstance] " +
+ event.getApplicationInstance();
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ if (applications == null) {
+ String errorMsg = "Error! Applications object is null";
log.error(errorMsg);
throw new RuntimeException(errorMsg);
-
}
-
ApplicationInstance applicationInstance = event.getApplicationInstance();
if (applicationInstance.getInstanceId() == null || applicationInstance.getInstanceId().isEmpty()) {
@@ -96,16 +98,14 @@ public class ApplicationInstanceCreatedMessageProcessor extends MessageProcessor
// check if an Application instance with same name exists in applications instance
if (null != applications.getApplication(event.getApplicationId()).
- getInstanceByNetworkPartitionId(applicationInstance.getNetworkPartitionUuid())) {
-
- log.warn("Application instance with id [ " + applicationInstance.getInstanceId() + " ] already exists");
-
+ getInstanceByNetworkPartitionId(applicationInstance.getNetworkPartitionId())) {
+ log.warn("Application instance [AppInstanceId] " + applicationInstance.getInstanceId() + " already exists");
} else {
// add application instance to Application Topology
- applications.getApplication(event.getApplicationId()).addInstance(applicationInstance.getInstanceId(), applicationInstance);
+ applications.getApplication(event.getApplicationId())
+ .addInstance(applicationInstance.getInstanceId(), applicationInstance);
}
-
notifyEventListeners(event);
return true;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
index 8c3644c..dcae73e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
@@ -24,6 +24,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.topology.ApplicationClustersCreatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -73,13 +74,19 @@ public class ApplicationClustersCreatedMessageProcessor extends MessageProcessor
List<Cluster> clusters = event.getClusterList();
for (Cluster cluster : clusters) {
- String serviceUuid = cluster.getServiceName();
+ String applicationId = cluster.getAppId();
+ String serviceName = cluster.getServiceName();
String clusterId = cluster.getClusterId();
- TopologyUpdater.acquireWriteLockForService(serviceUuid);
+ TopologyUpdater.acquireWriteLockForService(serviceName);
+
try {
+ // Apply application filter
+ if(TopologyApplicationFilter.apply(applicationId)) {
+ continue;
+ }
// Apply service filter
- if (TopologyServiceFilter.apply(serviceUuid)) {
+ if (TopologyServiceFilter.apply(serviceName)) {
continue;
}
@@ -89,18 +96,18 @@ public class ApplicationClustersCreatedMessageProcessor extends MessageProcessor
}
// Validate event against the existing topology
- Service service = topology.getService(serviceUuid);
+ Service service = topology.getService(serviceName);
if (service == null) {
if (log.isWarnEnabled()) {
log.warn(String.format("Service does not exist: [service] %s",
- serviceUuid));
+ serviceName));
}
return false;
}
if (service.clusterExists(clusterId)) {
if (log.isDebugEnabled()) {
log.debug(String.format("Cluster already exists in service: [service] %s " +
- "[cluster] %s", serviceUuid,
+ "[cluster] %s", serviceName,
clusterId));
}
} else {
@@ -115,7 +122,7 @@ public class ApplicationClustersCreatedMessageProcessor extends MessageProcessor
}
} finally {
- TopologyUpdater.releaseWriteLockForService(serviceUuid);
+ TopologyUpdater.releaseWriteLockForService(serviceName);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java
index 0f558b2..8fb28cb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java
@@ -56,12 +56,12 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor {
ClusterInstanceCreatedEvent event = (ClusterInstanceCreatedEvent) MessagingUtil.
jsonToObject(message, ClusterInstanceCreatedEvent.class);
- TopologyUpdater.acquireWriteLockForService(event.getServiceUuid());
+ TopologyUpdater.acquireWriteLockForService(event.getServiceName());
try {
return doProcess(event, topology);
} finally {
- TopologyUpdater.releaseWriteLockForService(event.getServiceUuid());
+ TopologyUpdater.releaseWriteLockForService(event.getServiceName());
}
} else {
@@ -76,7 +76,7 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor {
private boolean doProcess(ClusterInstanceCreatedEvent event, Topology topology) {
- String serviceName = event.getServiceUuid();
+ String serviceName = event.getServiceName();
String clusterId = event.getClusterId();
// Apply service filter
@@ -90,11 +90,11 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor {
}
// Validate event against the existing topology
- Service service = topology.getService(event.getServiceUuid());
+ Service service = topology.getService(event.getServiceName());
if (service == null) {
if (log.isWarnEnabled()) {
log.warn(String.format("Service does not exist: [service] %s",
- event.getServiceUuid()));
+ event.getServiceName()));
}
return false;
}
@@ -108,7 +108,7 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor {
if (cluster == null) {
if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceUuid(),
+ log.debug(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
event.getClusterId()));
}
return false;
@@ -118,7 +118,7 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor {
if (cluster.getInstanceContexts(clusterInstance.getInstanceId()) != null) {
if (log.isDebugEnabled()) {
log.debug(String.format("Cluster Instance already exists in service: " +
- "[service] %s [cluster] %s [Instance] %s", event.getServiceUuid(),
+ "[service] %s [cluster] %s [Instance] %s", event.getServiceName(),
event.getClusterId(), clusterInstance.getInstanceId()));
}
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
index e3279cd..9460ea6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
@@ -86,12 +86,12 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor {
} else {
// Apply changes to the topology
- Service service = new Service(event.getServiceName(), event.getServiceType(), event.getServiceUuid());
+ Service service = new Service(event.getServiceName(), event.getServiceType());
service.addPorts(event.getPorts());
topology.addService(service);
if (log.isInfoEnabled()) {
- log.info(String.format("Service created: [service] %s [service-uuid] %s", event.getServiceName(),event.getServiceUuid()));
+ log.info(String.format("Service created: [service] %s", event.getServiceName()));
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
index f9878d6..8fc3376 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
@@ -70,7 +70,7 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
private boolean doProcess(ServiceRemovedEvent event, Topology topology) {
- String serviceName = event.getServiceUuid();
+ String serviceName = event.getServiceName();
// Apply service filter
if (TopologyServiceFilter.apply(serviceName)) {
@@ -81,11 +81,11 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
notifyEventListeners(event);
// Validate event against the existing topology
- Service service = topology.getService(event.getServiceUuid());
+ Service service = topology.getService(event.getServiceName());
if (service == null) {
if (log.isDebugEnabled()) {
log.debug(String.format("Service does not exist: [service] %s",
- event.getServiceUuid()));
+ event.getServiceName()));
}
} else {
@@ -93,7 +93,7 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
topology.removeService(service);
if (log.isInfoEnabled()) {
- log.info(String.format("Service removed: [service] %s", event.getServiceUuid()));
+ log.info(String.format("Service removed: [service] %s", event.getServiceName()));
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/TopologyUpdater.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/TopologyUpdater.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/TopologyUpdater.java
index 5ab2cf5..d7ab46b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/TopologyUpdater.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/TopologyUpdater.java
@@ -108,21 +108,21 @@ public class TopologyUpdater {
/**
* Acquires write lock for a Service
*
- * @param serviceUuid service uuid to acquire write lock
+ * @param serviceName service name to acquire write lock
*/
- public static void acquireWriteLockForService(String serviceUuid) {
+ public static void acquireWriteLockForService(String serviceName) {
// acquire read lock for all Applications
TopologyManager.acquireReadLockForServices();
- TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceUuid, true);
+ TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName, true);
if (topologyServiceLock == null) {
- handleLockNotFound("Topology lock not found for Service " + serviceUuid);
+ handleLockNotFound("Topology lock not found for Service " + serviceName);
} else {
topologyServiceLock.acquireWriteLock();
if (log.isDebugEnabled()) {
- log.debug("Write lock acquired for Service " + serviceUuid);
+ log.debug("Write lock acquired for Service " + serviceName);
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
index 0dc5e67..c2d1c6c 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
@@ -69,7 +69,6 @@ public class MockHealthStatisticsNotifier implements Runnable {
mockMemberContext.getMemberId(), memoryConsumption));
}
healthStatisticsPublisher.publish(
- System.currentTimeMillis(),
mockMemberContext.getClusterId(),
mockMemberContext.getClusterInstanceId(),
mockMemberContext.getNetworkPartitionId(),
@@ -94,7 +93,6 @@ public class MockHealthStatisticsNotifier implements Runnable {
mockMemberContext.getMemberId(), loadAvereage));
}
healthStatisticsPublisher.publish(
- System.currentTimeMillis(),
mockMemberContext.getClusterId(),
mockMemberContext.getClusterInstanceId(),
mockMemberContext.getNetworkPartitionId(),
@@ -118,7 +116,6 @@ public class MockHealthStatisticsNotifier implements Runnable {
mockMemberContext.getMemberId(), requestsInFlight));
}
inFlightRequestPublisher.publish(
- System.currentTimeMillis(),
mockMemberContext.getClusterId(),
mockMemberContext.getClusterInstanceId(),
mockMemberContext.getNetworkPartitionId(),
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.python.cartridge.agent/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java b/components/org.apache.stratos.python.cartridge.agent/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java
new file mode 100644
index 0000000..620e11b
--- /dev/null
+++ b/components/org.apache.stratos.python.cartridge.agent/src/test/java/org/apache/stratos/python.cartridge.agent/test/PythonCartridgeAgentTest.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.python.cartridge.agent.test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.exec.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.domain.LoadBalancingIPType;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.MemberInitializedEvent;
+import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener;
+import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener;
+import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.util.MessagingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+
+import static junit.framework.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class PythonCartridgeAgentTest {
+
+ private static final Log log = LogFactory.getLog(PythonCartridgeAgentTest.class);
+
+ private static final String NEW_LINE = System.getProperty("line.separator");
+ // private static final long TIMEOUT = 1440000;
+ private static final long TIMEOUT = 120000;
+ private static final String CLUSTER_ID = "php.php.domain";
+ private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-1";
+ private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-1";
+ private static final String APP_ID = "application-1";
+ private static final String MEMBER_ID = "php.member-1";
+ private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1";
+ private static final String NETWORK_PARTITION_ID = "network-partition-1";
+ private static final String PARTITION_ID = "partition-1";
+ private static final String TENANT_ID = "-1234";
+ private static final String SERVICE_NAME = "php";
+ public static final String SOURCE_PATH = "/tmp/stratos-pca-test-app-path/";
+
+ private static List<ServerSocket> serverSocketList;
+ private static Map<String, Executor> executorList;
+ private final ArtifactUpdatedEvent artifactUpdatedEvent;
+ private final Boolean expectedResult;
+ private boolean instanceStarted;
+ private boolean instanceActivated;
+ private ByteArrayOutputStreamLocal outputStream;
+ private boolean eventReceiverInitiated = false;
+ private TopologyEventReceiver topologyEventReceiver;
+ private InstanceStatusEventReceiver instanceStatusEventReceiver;
+ private int cepPort = 7712;
+ private BrokerService broker = new BrokerService();
+ private static final String ACTIVEMQ_AMQP_BIND_ADDRESS = "tcp://localhost:61617";
+ private static final String ACTIVEMQ_MQTT_BIND_ADDRESS = "mqtt://localhost:1884";
+ private static final UUID PYTHON_AGENT_DIR_NAME = UUID.randomUUID();
+
+ public PythonCartridgeAgentTest(ArtifactUpdatedEvent artifactUpdatedEvent, Boolean expectedResult) {
+ this.artifactUpdatedEvent = artifactUpdatedEvent;
+ this.expectedResult = expectedResult;
+ }
+
+ /**
+ * Setup method for test class
+ */
+ @BeforeClass
+ public static void oneTimeSetUp() {
+ // Set jndi.properties.dir system property for initializing event publishers and receivers
+ System.setProperty("jndi.properties.dir", getResourcesFolderPath());
+ }
+
+ /**
+ * Setup method for test method testPythonCartridgeAgent
+ */
+ @Before
+ public void setup() {
+ serverSocketList = new ArrayList<ServerSocket>();
+ executorList = new HashMap<String, Executor>();
+ try {
+ broker.addConnector(ACTIVEMQ_AMQP_BIND_ADDRESS);
+ broker.addConnector(ACTIVEMQ_MQTT_BIND_ADDRESS);
+ broker.setBrokerName("testBroker");
+ broker.setDataDirectory(PythonCartridgeAgentTest.class.getResource("/").getPath() +
+ File.separator + ".." + File.separator + PYTHON_AGENT_DIR_NAME + File.separator + "activemq-data");
+ broker.start();
+ log.info("Broker service started!");
+ }
+ catch (Exception e) {
+ log.error("Error while setting up broker service", e);
+ }
+ if (!this.eventReceiverInitiated) {
+ ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 15);
+ topologyEventReceiver = new TopologyEventReceiver();
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
+
+ instanceStatusEventReceiver = new InstanceStatusEventReceiver();
+ instanceStatusEventReceiver.setExecutorService(executorService);
+ instanceStatusEventReceiver.execute();
+
+ this.instanceStarted = false;
+ instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("Instance started event received");
+ instanceStarted = true;
+ }
+ });
+
+ this.instanceActivated = false;
+ instanceStatusEventReceiver.addEventListener(new InstanceActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("Instance activated event received");
+ instanceActivated = true;
+ }
+ });
+
+ this.eventReceiverInitiated = true;
+ }
+ // Simulate CEP server socket
+ startServerSocket(cepPort);
+ String agentPath = setupPythonAgent();
+ log.info("Python agent working directory name: " + PYTHON_AGENT_DIR_NAME);
+ log.info("Starting python cartridge agent...");
+ this.outputStream = executeCommand(
+ "python " + agentPath + "/agent.py > " + getResourcesFolderPath() + File.separator + ".." +
+ File.separator + PYTHON_AGENT_DIR_NAME + File.separator + "cartridge-agent.log");
+ }
+
+ /**
+ * TearDown method for test method testPythonCartridgeAgent
+ */
+ @After
+ public void tearDown() {
+ for (Map.Entry<String, Executor> entry : executorList.entrySet()) {
+ try {
+ String commandText = entry.getKey();
+ Executor executor = entry.getValue();
+ ExecuteWatchdog watchdog = executor.getWatchdog();
+ if (watchdog != null) {
+ log.info("Terminating process: " + commandText);
+ watchdog.destroyProcess();
+ }
+ }
+ catch (Exception ignore) {
+ }
+ }
+ for (ServerSocket serverSocket : serverSocketList) {
+ try {
+ log.info("Stopping socket server: " + serverSocket.getLocalSocketAddress());
+ serverSocket.close();
+ }
+ catch (IOException ignore) {
+ }
+ }
+
+ try {
+ log.info("Deleting source checkout folder...");
+ FileUtils.deleteDirectory(new File(SOURCE_PATH));
+ }
+ catch (Exception ignore) {
+
+ }
+
+ this.instanceStatusEventReceiver.terminate();
+ this.topologyEventReceiver.terminate();
+
+ this.instanceActivated = false;
+ this.instanceStarted = false;
+ try {
+ broker.stop();
+ }
+ catch (Exception e) {
+ log.error("Error while stopping the broker service", e);
+ }
+ }
+
+
+ /**
+ * This method returns a collection of {@link org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent}
+ * objects as parameters to the test
+ *
+ * @return
+ */
+ @Parameterized.Parameters
+ public static Collection getArtifactUpdatedEventsAsParams() {
+ ArtifactUpdatedEvent publicRepoEvent = createTestArtifactUpdatedEvent();
+
+ ArtifactUpdatedEvent privateRepoEvent = createTestArtifactUpdatedEvent();
+ privateRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/testrepo.git");
+ privateRepoEvent.setRepoUserName("testapache2211");
+ privateRepoEvent.setRepoPassword("RExPDGa4GkPJj4kJDzSROQ==");
+
+ ArtifactUpdatedEvent privateRepoEvent2 = createTestArtifactUpdatedEvent();
+ privateRepoEvent2.setRepoURL("https://testapache2211@bitbucket.org/testapache2211/testrepo.git");
+ privateRepoEvent2.setRepoUserName("testapache2211");
+ privateRepoEvent2.setRepoPassword("iF7qT+BKKPE3PGV1TeDsJA==");
+
+ return Arrays.asList(new Object[][]{
+ {publicRepoEvent, true},
+ {privateRepoEvent, true},
+ {privateRepoEvent2, true}
+ });
+
+// return Arrays.asList(new Object[][]{
+// {publicRepoEvent, true}
+// });
+
+ }
+
+ /**
+ * Creates an {@link org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent} object with a public
+ * repository URL
+ *
+ * @return
+ */
+ private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() {
+ ArtifactUpdatedEvent publicRepoEvent = new ArtifactUpdatedEvent();
+ publicRepoEvent.setClusterId(CLUSTER_ID);
+ publicRepoEvent.setTenantId(TENANT_ID);
+ publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git");
+ return publicRepoEvent;
+ }
+
+ @Test(timeout = TIMEOUT)
+ public void testPythonCartridgeAgent() {
+ Thread communicatorThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<String> outputLines = new ArrayList<String>();
+ while (!outputStream.isClosed()) {
+ List<String> newLines = getNewLines(outputLines, outputStream.toString());
+ if (newLines.size() > 0) {
+ for (String line : newLines) {
+ if (line.contains("Subscribed to 'topology/#'")) {
+ sleep(1000);
+ // Send complete topology event
+ log.info("Publishing complete topology event...");
+ Topology topology = createTestTopology();
+ CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
+ publishEvent(completeTopologyEvent);
+ log.info("Complete topology event published");
+
+ sleep(3000);
+ // Publish member initialized event
+ log.info("Publishing member initialized event...");
+ MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent(
+ SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID,
+ PARTITION_ID
+ );
+ publishEvent(memberInitializedEvent);
+ log.info("Member initialized event published");
+
+ // Simulate server socket
+ startServerSocket(8080);
+ }
+ if (line.contains("Artifact repository found")) {
+ // Send artifact updated event
+ publishEvent(artifactUpdatedEvent);
+ }
+
+ if (line.contains("Exception in thread") || line.contains("ERROR")) {
+ //throw new RuntimeException(line);
+ }
+ log.info(line);
+ }
+ }
+ sleep(100);
+ }
+ }
+ });
+
+ communicatorThread.start();
+
+ while (!instanceActivated) {
+ // wait until the instance activated event is received.
+ sleep(2000);
+ }
+
+ assertTrue("Instance started event was not received", instanceStarted);
+ assertTrue("Instance activated event was not received", instanceActivated == this.expectedResult);
+ }
+
+ /**
+ * Publish messaging event
+ *
+ * @param event
+ */
+ private void publishEvent(Event event) {
+ String topicName = MessagingUtil.getMessageTopicName(event);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(topicName);
+ eventPublisher.publish(event);
+ }
+
+ /**
+ * Start server socket
+ *
+ * @param port
+ */
+ private void startServerSocket(final int port) {
+ Thread socketThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ServerSocket serverSocket = new ServerSocket(port);
+ serverSocketList.add(serverSocket);
+ log.info("Server socket started on port: " + port);
+ serverSocket.accept();
+ }
+ catch (IOException e) {
+ String message = "Could not start server socket: [port] " + port;
+ log.error(message, e);
+ throw new RuntimeException(message, e);
+ }
+ }
+ });
+ socketThread.start();
+ }
+
+ /**
+ * Create test topology
+ *
+ * @return
+ */
+ private Topology createTestTopology() {
+ Topology topology = new Topology();
+ Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
+ topology.addService(service);
+
+ Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME,
+ AUTOSCALING_POLICY_NAME, APP_ID);
+ service.addCluster(cluster);
+
+ Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID,
+ CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private,
+ System.currentTimeMillis());
+
+ member.setDefaultPrivateIP("10.0.0.1");
+ member.setDefaultPublicIP("20.0.0.1");
+ Properties properties = new Properties();
+ properties.setProperty("prop1", "value1");
+ member.setProperties(properties);
+ member.setStatus(MemberStatus.Created);
+ cluster.addMember(member);
+
+ return topology;
+ }
+
+ /**
+ * Return new lines found in the output
+ *
+ * @param currentOutputLines current output lines
+ * @param output output
+ * @return
+ */
+ private List<String> getNewLines(List<String> currentOutputLines, String output) {
+ List<String> newLines = new ArrayList<String>();
+
+ if (StringUtils.isNotBlank(output)) {
+ String[] lines = output.split(NEW_LINE);
+ if (lines != null) {
+ for (String line : lines) {
+ if (!currentOutputLines.contains(line)) {
+ currentOutputLines.add(line);
+ newLines.add(line);
+ }
+ }
+ }
+ }
+ return newLines;
+ }
+
+ /**
+ * Sleep current thread
+ *
+ * @param time
+ */
+ private void sleep(long time) {
+ try {
+ Thread.sleep(time);
+ }
+ catch (InterruptedException ignore) {
+ }
+ }
+
+ /**
+ * Copy python agent distribution to a new folder, extract it and copy sample configuration files
+ *
+ * @return
+ */
+ private String setupPythonAgent() {
+ try {
+ log.info("Setting up python cartridge agent...");
+ String srcAgentPath = getResourcesFolderPath() + "/../../src/main/python/cartridge.agent/cartridge.agent";
+ String destAgentPath =
+ getResourcesFolderPath() + File.separator + ".." + File.separator + PYTHON_AGENT_DIR_NAME +
+ "/cartridge.agent";
+ FileUtils.copyDirectory(new File(srcAgentPath), new File(destAgentPath));
+
+ String srcAgentConfPath = getResourcesFolderPath() + "/agent.conf";
+ String destAgentConfPath = destAgentPath + "/agent.conf";
+ FileUtils.copyFile(new File(srcAgentConfPath), new File(destAgentConfPath));
+
+ String srcLoggingIniPath = getResourcesFolderPath() + "/logging.ini";
+ String destLoggingIniPath = destAgentPath + "/logging.ini";
+ FileUtils.copyFile(new File(srcLoggingIniPath), new File(destLoggingIniPath));
+
+ String srcPayloadPath = getResourcesFolderPath() + "/payload";
+ String destPayloadPath = destAgentPath + "/payload";
+ FileUtils.copyDirectory(new File(srcPayloadPath), new File(destPayloadPath));
+
+ log.info("Changing extension scripts permissions");
+ File extensionsPath = new File(destAgentPath + "/extensions/bash");
+ File[] extensions = extensionsPath.listFiles();
+ for (File extension : extensions) {
+ extension.setExecutable(true);
+ }
+
+ log.info("Python cartridge agent setup completed");
+
+ return destAgentPath;
+ }
+ catch (Exception e) {
+ String message = "Could not copy cartridge agent distribution";
+ log.error(message, e);
+ throw new RuntimeException(message, e);
+ }
+ }
+
+ /**
+ * Execute shell command
+ *
+ * @param commandText
+ */
+ private ByteArrayOutputStreamLocal executeCommand(final String commandText) {
+ final ByteArrayOutputStreamLocal outputStream = new ByteArrayOutputStreamLocal();
+ try {
+ CommandLine commandline = CommandLine.parse(commandText);
+ DefaultExecutor exec = new DefaultExecutor();
+ PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
+ exec.setWorkingDirectory(new File(
+ getResourcesFolderPath() + File.separator + ".." + File.separator + PYTHON_AGENT_DIR_NAME));
+ exec.setStreamHandler(streamHandler);
+ ExecuteWatchdog watchdog = new ExecuteWatchdog(TIMEOUT);
+ exec.setWatchdog(watchdog);
+ exec.execute(commandline, new ExecuteResultHandler() {
+ @Override
+ public void onProcessComplete(int i) {
+ log.info(commandText + " process completed");
+ }
+
+ @Override
+ public void onProcessFailed(ExecuteException e) {
+ log.error(commandText + " process failed", e);
+ }
+ });
+ executorList.put(commandText, exec);
+ return outputStream;
+ }
+ catch (Exception e) {
+ log.error(outputStream.toString(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Get resources folder path
+ *
+ * @return
+ */
+ private static String getResourcesFolderPath() {
+ String path = PythonCartridgeAgentTest.class.getResource(File.separator).getPath();
+ return StringUtils.removeEnd(path, File.separator);
+ }
+
+ /**
+ * Implements ByteArrayOutputStream.isClosed() method
+ */
+ private class ByteArrayOutputStreamLocal extends ByteArrayOutputStream {
+ private boolean closed;
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ closed = true;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV40Utils.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV40Utils.java b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV40Utils.java
index eeab1bb..abc02e9 100644
--- a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV40Utils.java
+++ b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV40Utils.java
@@ -69,8 +69,7 @@ public class StratosApiV40Utils {
if (cloudControllerServiceClient != null) {
- Cartridge cartridgeConfig = ObjectConverter.convertCartridgeBeanToStubCartridgeConfig
- (cartridgeDefinitionBean, null, -1);
+ Cartridge cartridgeConfig = ObjectConverter.convertCartridgeBeanToStubCartridgeConfig(cartridgeDefinitionBean);
if (cartridgeConfig == null) {
throw new RestAPIException("Populated CartridgeConfig instance is null, cartridge deployment aborted");
@@ -157,7 +156,7 @@ public class StratosApiV40Utils {
if (autoscalerServiceClient != null) {
org.apache.stratos.autoscaler.stub.autoscale.policy.AutoscalePolicy autoscalePolicy = ObjectConverter.
- convertToCCAutoscalerPojo(autoscalePolicyBean,null,-1234);
+ convertToCCAutoscalerPojo(autoscalePolicyBean);
try {
autoscalerServiceClient
@@ -372,7 +371,7 @@ public class StratosApiV40Utils {
for (String cartridgeType : availableCartridges) {
Cartridge cartridgeInfo = null;
try {
- cartridgeInfo = CloudControllerServiceClient.getInstance().getCartridgeByTenant(cartridgeType,-1234);
+ cartridgeInfo = CloudControllerServiceClient.getInstance().getCartridge(cartridgeType);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Error when calling getCartridgeInfo for " + cartridgeType + ", Error: "