You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2014/03/10 10:43:18 UTC

[07/20] git commit: implementing an extension to start message broker listeners

implementing an extension to start message broker listeners


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/0adbfef4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/0adbfef4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/0adbfef4

Branch: refs/heads/master
Commit: 0adbfef456c5b9ed33332078ade83b3c52000520
Parents: f66fc7c
Author: Nirmal Fernando <ni...@apache.org>
Authored: Mon Mar 10 14:59:23 2014 +0530
Committer: Nirmal Fernando <ni...@apache.org>
Committed: Mon Mar 10 14:59:23 2014 +0530

----------------------------------------------------------------------
 .../impl/StartListenersExtensionExecutor.java   | 228 +++++++++++++++++++
 1 file changed, 228 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/0adbfef4/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/executor/impl/StartListenersExtensionExecutor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/executor/impl/StartListenersExtensionExecutor.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/executor/impl/StartListenersExtensionExecutor.java
new file mode 100644
index 0000000..47f1151
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/executor/impl/StartListenersExtensionExecutor.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.executor.impl;
+
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.artifact.deployment.synchronizer.RepositoryInformation;
+import org.apache.stratos.cartridge.agent.artifact.deployment.synchronizer.git.impl.GitBasedArtifactRepository;
+import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
+import org.apache.stratos.cartridge.agent.event.publisher.CartridgeAgentEventPublisher;
+import org.apache.stratos.cartridge.agent.executor.ExtensionExecutor;
+import org.apache.stratos.cartridge.agent.util.CartridgeAgentConstants;
+import org.apache.stratos.cartridge.agent.util.CartridgeAgentUtils;
+import org.apache.stratos.cartridge.agent.util.ExtensionUtils;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
+import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent;
+import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
+import org.apache.stratos.messaging.listener.instance.notifier.ArtifactUpdateEventListener;
+import org.apache.stratos.messaging.listener.instance.notifier.InstanceCleanupClusterEventListener;
+import org.apache.stratos.messaging.listener.instance.notifier.InstanceCleanupMemberEventListener;
+import org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain;
+import org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventMessageDelegator;
+import org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventMessageReceiver;
+
+/**
+ * This extension is suppose to start all the event listeners that the Cartridge Agent
+ * needs.
+ * 
+ */
+public class StartListenersExtensionExecutor extends ExtensionExecutor {
+	
+	private static final Log log = LogFactory.getLog(StartListenersExtensionExecutor.class);
+
+	public StartListenersExtensionExecutor() {
+		super(StartListenersExtensionExecutor.class.getName());
+	}
+	
+	public StartListenersExtensionExecutor(List<String> fileNames) {
+		super.setFileNamesToBeExecuted(fileNames);
+	}
+
+	@Override
+	public void execute() {
+
+		String jndiPropertiesDir = System.getProperty(CartridgeAgentConstants.JNDI_PROPERTIES_DIR);
+        if(StringUtils.isBlank(jndiPropertiesDir)) {
+            if(log.isErrorEnabled()){
+                log.error(String.format("System property not found: %s", CartridgeAgentConstants.JNDI_PROPERTIES_DIR));
+            }
+            return;
+        }
+
+        String payloadPath = System.getProperty(CartridgeAgentConstants.PARAM_FILE_PATH);
+        if(StringUtils.isBlank(payloadPath)) {
+            if(log.isErrorEnabled()){
+                log.error(String.format("System property not found: %s", CartridgeAgentConstants.PARAM_FILE_PATH));
+            }
+            return;
+        }
+
+        String extensionsDir = System.getProperty(CartridgeAgentConstants.EXTENSIONS_DIR);
+        if(StringUtils.isBlank(extensionsDir)) {
+            if(log.isWarnEnabled()){
+                log.warn(String.format("System property not found: %s", CartridgeAgentConstants.EXTENSIONS_DIR));
+            }
+        }
+
+        // Start instance notifier listener thread
+        if(log.isDebugEnabled()) {
+            log.debug("Starting instance notifier event message receiver thread");
+        }
+        
+        InstanceNotifierMessageProcessorChain processorChain = new InstanceNotifierMessageProcessorChain();
+        processorChain.addEventListener(new ArtifactUpdateEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                onArtifactUpdateEvent((ArtifactUpdatedEvent) event);
+            }
+        });
+
+        processorChain.addEventListener(new InstanceCleanupMemberEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                String memberIdInPayload = CartridgeAgentConfiguration.getInstance().getMemberId();
+                InstanceCleanupMemberEvent instanceCleanupMemberEvent = (InstanceCleanupMemberEvent)event;
+                if(memberIdInPayload.equals(instanceCleanupMemberEvent.getMemberId())) {
+                    onInstanceCleanupEvent();
+                }
+            }
+        });
+
+        processorChain.addEventListener(new InstanceCleanupClusterEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                String clusterIdInPayload = CartridgeAgentConfiguration.getInstance().getClusterId();
+                InstanceCleanupClusterEvent instanceCleanupClusterEvent = (InstanceCleanupClusterEvent)event;
+                if(clusterIdInPayload.equals(instanceCleanupClusterEvent.getClusterId())) {
+                    onInstanceCleanupEvent();
+                }
+            }
+        });
+        InstanceNotifierEventMessageDelegator messageDelegator = new InstanceNotifierEventMessageDelegator(processorChain);
+        InstanceNotifierEventMessageReceiver messageReceiver = new InstanceNotifierEventMessageReceiver(messageDelegator);
+        Thread messageReceiverThread = new Thread(messageReceiver);
+        messageReceiverThread.start();
+
+        // Wait until message receiver is subscribed to the topic to
+        // send the instance started event
+        while (!messageReceiver.isSubscribed())  {
+            try {
+                Thread.sleep(2000);
+            } catch (InterruptedException e) {
+            }
+        }
+        
+	}
+	
+	private void onArtifactUpdateEvent(ArtifactUpdatedEvent event) {
+        ArtifactUpdatedEvent artifactUpdatedEvent = event;
+        if(log.isInfoEnabled()) {
+            log.info(String.format("Artifact update event received: %s", artifactUpdatedEvent.toString()));
+        }
+
+        String clusterIdInPayload = CartridgeAgentConfiguration.getInstance().getClusterId();
+        String localRepoPath = CartridgeAgentConfiguration.getInstance().getAppPath();
+        String clusterIdInMessage = artifactUpdatedEvent.getClusterId();
+        String repoURL = artifactUpdatedEvent.getRepoURL();
+        String repoPassword = CartridgeAgentUtils.decryptPassword(artifactUpdatedEvent.getRepoPassword());
+        String repoUsername = artifactUpdatedEvent.getRepoUserName();
+        String tenantId = artifactUpdatedEvent.getTenantId();
+        boolean isMultitenant = CartridgeAgentConfiguration.getInstance().isMultitenant();
+
+        if(StringUtils.isNotEmpty(repoURL) && (clusterIdInPayload != null) && clusterIdInPayload.equals(clusterIdInMessage)) {
+            if(log.isInfoEnabled()) {
+                log.info("Executing git checkout");
+            }
+            RepositoryInformation repoInformation = new RepositoryInformation();
+            repoInformation.setRepoUsername(repoUsername);
+            repoInformation.setRepoPassword(repoPassword);
+            repoInformation.setRepoUrl(repoURL);
+            repoInformation.setRepoPath(localRepoPath);
+            repoInformation.setTenantId(tenantId);
+            repoInformation.setMultitenant(isMultitenant);
+//            boolean cloneExists = GitBasedArtifactRepository.getInstance().cloneExists(repoInformation);
+            GitBasedArtifactRepository.getInstance().checkout(repoInformation);
+
+            ExtensionUtils.executeArtifactsUpdatedExtension();
+
+            //TODO I think we do not need to wait till git repo gets cloned, to send the instance activated event. - Nirmal
+//            if(!cloneExists){
+//                // Executed git clone, publish instance activated event
+//                CartridgeAgentEventPublisher.publishInstanceActivatedEvent();
+//            }
+
+            // Start the artifact update task
+            boolean artifactUpdateEnabled = Boolean.parseBoolean(System.getProperty(CartridgeAgentConstants.ENABLE_ARTIFACT_UPDATE));
+            if (artifactUpdateEnabled) {
+
+                long artifactUpdateInterval = 10;
+                // get update interval
+                String artifactUpdateIntervalStr = System.getProperty(CartridgeAgentConstants.ARTIFACT_UPDATE_INTERVAL);
+
+                if (artifactUpdateIntervalStr != null && !artifactUpdateIntervalStr.isEmpty()) {
+                    try {
+                        artifactUpdateInterval = Long.parseLong(artifactUpdateIntervalStr);
+
+                    } catch (NumberFormatException e) {
+                        log.error("Invalid artifact sync interval specified ", e);
+                        artifactUpdateInterval = 10;
+                    }
+                }
+
+                log.info("Artifact updating task enabled, update interval: " + artifactUpdateInterval + "s");
+                GitBasedArtifactRepository.getInstance().scheduleSyncTask(repoInformation, artifactUpdateInterval);
+
+            } else {
+                log.info("Artifact updating task disabled");
+            }
+
+        }
+    }
+
+    private void onInstanceCleanupEvent() {
+        if(log.isInfoEnabled()) {
+            log.info("Executing cleaning up the data in the cartridge instance...");
+        }
+        //sending event on the maintenance mode
+        CartridgeAgentEventPublisher.publishMaintenanceModeEvent();
+
+        //cleaning up the cartridge instance's data
+        ExtensionUtils.executeCleanupExtension();
+        if(log.isInfoEnabled()) {
+            log.info("cleaning up finished in the cartridge instance...");
+        }
+        if(log.isInfoEnabled()) {
+            log.info("publishing ready to shutdown event...");
+        }
+        //publishing the Ready to shutdown event after performing the cleanup
+        CartridgeAgentEventPublisher.publishInstanceReadyToShutdownEvent();
+    }
+
+	@Override
+	public void cleanUp() {
+	}
+
+	
+}