You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by br...@apache.org on 2013/08/23 15:11:19 UTC

svn commit: r1516836 - in /ace/trunk/org.apache.ace.agent: src/org/apache/ace/agent/ src/org/apache/ace/agent/impl/ test/org/apache/ace/agent/impl/

Author: bramk
Date: Fri Aug 23 13:11:19 2013
New Revision: 1516836

URL: http://svn.apache.org/r1516836
Log:
ACE-347 Added download first strategy to default installer

Modified:
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadHandle.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/HandlerBase.java
    ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadHandle.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadHandle.java?rev=1516836&r1=1516835&r2=1516836&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadHandle.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadHandle.java Fri Aug 23 13:11:19 2013
@@ -55,7 +55,7 @@ public interface DownloadHandle {
      * reason.
      * 
      */
-    interface CompletedListener {
+    interface ResultListener {
         /**
          * Called when a download terminates.
          * 
@@ -78,7 +78,7 @@ public interface DownloadHandle {
      * @param listener The completion listener.
      * @return this
      */
-    DownloadHandle setCompletionListener(CompletedListener listener);
+    DownloadHandle setCompletionListener(ResultListener listener);
 
     /**
      * Starts the download.

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java?rev=1516836&r1=1516835&r2=1516836&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java Fri Aug 23 13:11:19 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.ace.agent.impl;
 
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Set;
@@ -31,6 +32,9 @@ import org.apache.ace.agent.AgentControl
 import org.apache.ace.agent.AgentUpdateHandler;
 import org.apache.ace.agent.ConfigurationHandler;
 import org.apache.ace.agent.DeploymentHandler;
+import org.apache.ace.agent.DownloadHandle;
+import org.apache.ace.agent.DownloadResult;
+import org.apache.ace.agent.DownloadState;
 import org.apache.ace.agent.FeedbackChannel;
 import org.apache.ace.agent.RetryAfterException;
 import org.osgi.framework.Version;
@@ -66,7 +70,7 @@ public class DefaultController implement
      * flags and potentially restarting a Deployment Package download. A value of 1 or less disables the retry behavior.
      */
     public static final String CONFIG_KEY_UPDATERETRIES = CONFIG_KEY_BASE + ".updateRetries";
-    public static final long CONFIG_DEFAULT_UPDATERETRIES = 1;
+    public static final long CONFIG_DEFAULT_UPDATERETRIES = 2;
 
     /**
      * UpdateStreaming flag; When set Deployment Packages are installed directly from the download stream reducing
@@ -92,7 +96,8 @@ public class DefaultController implement
     public static final boolean CONFIG_DEFAULT_FIXPACKAGES = true;
 
     private volatile AgentContext m_agentContext;
-    private volatile ScheduledFuture<?> m_future;
+    private volatile ScheduledFuture<?> m_scheduledFuture;
+    private volatile UpdateInstaller m_updateInstaller;
 
     @Override
     public void start(AgentContext agentContext) throws Exception {
@@ -113,6 +118,9 @@ public class DefaultController implement
 
     public void stop() {
         unscheduleRun();
+        if (m_updateInstaller != null) {
+            m_updateInstaller.reset();
+        }
     }
 
     public void run() {
@@ -122,8 +130,8 @@ public class DefaultController implement
         long interval = configurationHandler.getLong(CONFIG_KEY_SYNCINTERVAL, CONFIG_DEFAULT_SYNCINTERVAL);
         try {
             runSafeFeedback();
-            runSafeAgent();
-            runSafeUpdate();
+            runSafeAgentUpdate();
+            runSafeDeploymentUpdate();
         }
         catch (RetryAfterException e) {
             interval = e.getSeconds();
@@ -139,76 +147,303 @@ public class DefaultController implement
         m_agentContext.logDebug(COMPONENT_IDENTIFIER, "Sync completed. Rescheduled in %d seconds", interval);
     }
 
-    private void runSafeAgent() throws RetryAfterException, IOException {
-        AgentUpdateHandler deploymentHandler = m_agentContext.getAgentUpdateHandler();
+    private void runSafeFeedback() throws RetryAfterException, IOException {
+        AgentControl agentControl = m_agentContext.getAgentControl();
+
+        m_agentContext.logDebug(COMPONENT_IDENTIFIER, "Synchronizing feedback channels");
+        Set<String> channelNames = agentControl.getFeedbackHandler().getChannelNames();
+        for (String channelName : channelNames) {
+            FeedbackChannel channel = agentControl.getFeedbackHandler().getChannel(channelName);
+            if (channel != null) {
+                channel.sendFeedback();
+            }
+        }
+    }
+
+    private void runSafeAgentUpdate() throws RetryAfterException, IOException {
+        AgentUpdateHandler updateHandler = m_agentContext.getAgentUpdateHandler();
 
         m_agentContext.logDebug(COMPONENT_IDENTIFIER, "Checking for agent update");
-        Version current = deploymentHandler.getInstalledVersion();
-        SortedSet<Version> available = deploymentHandler.getAvailableVersions();
+        Version current = updateHandler.getInstalledVersion();
+        SortedSet<Version> available = updateHandler.getAvailableVersions();
         Version highest = Version.emptyVersion;
         if (available != null && !available.isEmpty()) {
             highest = available.last();
         }
         if (highest.compareTo(current) > 0) {
             m_agentContext.logInfo(COMPONENT_IDENTIFIER, "Installing agent update %s => %s", current, highest);
-            InputStream inputStream = deploymentHandler.getInputStream(highest);
-            deploymentHandler.install(inputStream);
+            InputStream inputStream = updateHandler.getInputStream(highest);
+            updateHandler.install(inputStream);
         }
         else {
             m_agentContext.logDebug(COMPONENT_IDENTIFIER, "No agent update available for version %s", current);
         }
     }
 
-    private void runSafeUpdate() throws RetryAfterException, IOException {
-        ConfigurationHandler configurationHandler = m_agentContext.getConfigurationHandler();
-        DeploymentHandler deploymentHandler = m_agentContext.getDeploymentHandler();
+    private void runSafeDeploymentUpdate() throws RetryAfterException, IOException {
 
         m_agentContext.logDebug(COMPONENT_IDENTIFIER, "Checking for deployment update");
+        DeploymentHandler deploymentHandler = m_agentContext.getDeploymentHandler();
         Version current = deploymentHandler.getInstalledVersion();
         SortedSet<Version> available = deploymentHandler.getAvailableVersions();
         Version highest = Version.emptyVersion;
         if (available != null && !available.isEmpty()) {
             highest = available.last();
         }
-        if (highest.compareTo(current) > 0) {
-            m_agentContext.logInfo(COMPONENT_IDENTIFIER, "Installing deployment update %s => %s", current, highest);
+        if (highest.compareTo(current) < 1) {
+            m_agentContext.logDebug(COMPONENT_IDENTIFIER, "No deployment update available for version %s", current);
+            return;
+        }
 
-            // FIXME handle downloads
-            // boolean streaming = configurationHandler.getBoolean(CONFIG_KEY_STREAMING_UPDATES,
-            // CONFIG_DEFAULT_UPDATE_STREAMING);
-            boolean fixPackage = configurationHandler.getBoolean(CONFIG_KEY_FIXPACKAGES, CONFIG_DEFAULT_FIXPACKAGES);
-            InputStream inputStream = deploymentHandler.getInputStream(highest, fixPackage);
-            try {
-                deploymentHandler.deployPackage(inputStream);
+        ConfigurationHandler configurationHandler = m_agentContext.getConfigurationHandler();
+        boolean fixPackage = configurationHandler.getBoolean(CONFIG_KEY_FIXPACKAGES, CONFIG_DEFAULT_FIXPACKAGES);
+        boolean updateStreaming = configurationHandler.getBoolean(CONFIG_KEY_UPDATESTREAMING, CONFIG_DEFAULT_UPDATESTREAMING);
+        long maxRetries = configurationHandler.getLong(CONFIG_KEY_UPDATERETRIES, CONFIG_DEFAULT_UPDATERETRIES);
+
+        getUpdateInstaller(updateStreaming).installUpdate(current, highest, fixPackage, maxRetries);
+    }
+
+    private UpdateInstaller getUpdateInstaller(boolean streaming) {
+        if (streaming) {
+            if (m_updateInstaller == null) {
+                m_updateInstaller = new StreamingUpdateInstaller(this);
             }
-            finally {
-                inputStream.close();
+            else if (!(m_updateInstaller instanceof StreamingUpdateInstaller)) {
+                m_updateInstaller.reset();
+                m_updateInstaller = new StreamingUpdateInstaller(this);
             }
         }
         else {
-            m_agentContext.logDebug(COMPONENT_IDENTIFIER, "No deployment update available for version %s", current);
+            if (m_updateInstaller == null) {
+                m_updateInstaller = new DownloadUpdateInstaller(this);
+            }
+            if (!(m_updateInstaller instanceof DownloadUpdateInstaller)) {
+                m_updateInstaller.reset();
+                m_updateInstaller = new DownloadUpdateInstaller(this);
+            }
         }
+        return m_updateInstaller;
     }
 
-    private void runSafeFeedback() throws RetryAfterException, IOException {
-        AgentControl agentControl = m_agentContext.getAgentControl();
+    private void scheduleRun(long seconds) {
+        unscheduleRun();
+        m_scheduledFuture = m_agentContext.getExecutorService().schedule(this, seconds, TimeUnit.SECONDS);
+    }
 
-        m_agentContext.logDebug(COMPONENT_IDENTIFIER, "Synchronizing feedback channels");
-        Set<String> channelNames = agentControl.getFeedbackHandler().getChannelNames();
-        for (String channelName : channelNames) {
-            FeedbackChannel channel = agentControl.getFeedbackHandler().getChannel(channelName);
-            if (channel != null) {
-                channel.sendFeedback();
+    private void unscheduleRun() {
+        if (m_scheduledFuture != null)
+            m_scheduledFuture.cancel(true);
+    }
+
+    private AgentContext getAgentContext() {
+        return m_agentContext;
+    }
+
+    /**
+     * Base class for internal installer strategies. This implementation handles max update retry contraints and
+     * delegates the rest to concrete implementations.
+     */
+    abstract static class UpdateInstaller {
+
+        private final DefaultController m_controller;
+        private Version m_lastVersion = null;
+        private int m_failureCount = 0;
+
+        public UpdateInstaller(DefaultController controller) {
+            m_controller = controller;
+        }
+
+        protected final DefaultController getController() {
+            return m_controller;
+        }
+
+        public final void installUpdate(Version fromVersion, Version toVersion, boolean fixPackage, long maxRetries) throws RetryAfterException {
+            if (m_lastVersion != null && toVersion.equals(m_lastVersion)) {
+                if (m_failureCount >= maxRetries) {
+                    getController().getAgentContext().logInfo(COMPONENT_IDENTIFIER,
+                        "Ignoring deployment update %s => %s because max retries reached %d", fromVersion, toVersion, maxRetries);
+                    return;
+                }
+            }
+            else {
+                m_lastVersion = toVersion;
+                m_failureCount = 0;
+            }
+            try {
+                doInstallUpdate(fromVersion, toVersion, fixPackage);
+            }
+            catch (RetryAfterException e) {
+                // The server is busy. Re-throw so the controller can abort the sync and reschedule.
+                throw (e);
+
+            }
+            catch (IOException e) {
+                // Just increment the failure count and asume the concrete implementation logged.
+                m_failureCount++;
             }
         }
+
+        public final void reset() {
+            m_lastVersion = null;
+            m_failureCount = 0;
+            doReset();
+        }
+
+        protected abstract void doInstallUpdate(Version from, Version to, boolean fix) throws RetryAfterException, IOException;
+
+        protected abstract void doReset();
     }
 
-    private void scheduleRun(long seconds) {
-        m_future = m_agentContext.getExecutorService().schedule(this, seconds, TimeUnit.SECONDS);
+    /**
+     * UpdateInstaller that provides streaming deployment package install. The install is blocking.
+     */
+    static class StreamingUpdateInstaller extends UpdateInstaller {
+
+        public StreamingUpdateInstaller(DefaultController controller) {
+            super(controller);
+        }
+
+        @Override
+        public void doInstallUpdate(Version from, Version to, boolean fix) throws RetryAfterException, IOException {
+
+            getController().getAgentContext().logInfo(COMPONENT_IDENTIFIER,
+                "Installing streaming deployment update %s => %s", from, to);
+
+            DeploymentHandler deploymentHandler = getController().getAgentContext().getDeploymentHandler();
+            InputStream inputStream = null;
+            try {
+                inputStream = deploymentHandler.getInputStream(to, fix);
+                deploymentHandler.deployPackage(inputStream);
+                return;
+            }
+            catch (IOException e) {
+                getController().getAgentContext().logWarning(COMPONENT_IDENTIFIER,
+                    "IOException opening/streaming package inputstream", e);
+                throw e;
+            }
+            finally {
+                if (inputStream != null)
+                    try {
+                        inputStream.close();
+                    }
+                    catch (Exception e) {
+                        getController().getAgentContext().logWarning(COMPONENT_IDENTIFIER,
+                            "Exception while closing streaming package inputstream", e);
+                    }
+            }
+        }
+
+        @Override
+        protected void doReset() {
+        }
     }
 
-    private void unscheduleRun() {
-        if (m_future != null)
-            m_future.cancel(true);
+    /**
+     * UpdateInstaller that provides download deployment package install. The install is non-blocking. Upon download
+     * completion this installer will reschedule the controller.
+     */
+    static class DownloadUpdateInstaller extends UpdateInstaller implements DownloadHandle.ProgressListener, DownloadHandle.ResultListener {
+
+        // active download state
+        private volatile DownloadHandle m_downloadHandle;
+        private volatile DownloadResult m_downloadResult = null;
+        private volatile Version m_downloadVersion;
+        private volatile long m_downloadLength = 0;
+        private volatile long m_downloadProgress = 0;
+
+        public DownloadUpdateInstaller(DefaultController controller) {
+            super(controller);
+        }
+
+        @Override
+        public void doInstallUpdate(Version fromVersion, Version toVersion, boolean fixPackage) throws RetryAfterException, IOException {
+
+            AgentContext agentContext = getController().getAgentContext();
+            DeploymentHandler deploymentHandler = agentContext.getDeploymentHandler();
+
+            if (m_downloadHandle != null && !m_downloadVersion.equals(toVersion)) {
+
+                agentContext.logInfo(COMPONENT_IDENTIFIER,
+                    "Cancelling deployment package download for %s because a newer version is available", m_downloadVersion);
+                m_downloadHandle.discard();
+                m_downloadHandle = null;
+            }
+
+            if (m_downloadHandle == null) {
+
+                agentContext.logInfo(COMPONENT_IDENTIFIER, "Starting deployment package download %s => %s", fromVersion, toVersion);
+                m_downloadVersion = toVersion;
+                m_downloadHandle = agentContext.getDeploymentHandler().getDownloadHandle(toVersion, fixPackage)
+                    .setProgressListener(this).setCompletionListener(this).start();
+            }
+            else {
+
+                if (m_downloadResult == null) {
+                    agentContext.logInfo(COMPONENT_IDENTIFIER,
+                        "Deployment package download for %s is in progress %d / %d", toVersion, m_downloadProgress, m_downloadLength);
+                }
+                else if (m_downloadResult.getState() == DownloadState.FAILED) {
+                    agentContext.logWarning(COMPONENT_IDENTIFIER,
+                        "Deployment package download for %s is FAILED. Clearing for retry");
+                    m_downloadHandle.discard();
+                    m_downloadHandle = null;
+                    throw new IOException("Download failed");
+                }
+                else if (m_downloadResult.getState() == DownloadState.STOPPED) {
+                    agentContext.logWarning(COMPONENT_IDENTIFIER,
+                        "Deployment package download for %s is STOPPED. Trying to resume");
+                    m_downloadResult = null;
+                    m_downloadHandle.start();
+                }
+                else if (m_downloadResult.getState() == DownloadState.SUCCESSFUL) {
+
+                    agentContext.logInfo(COMPONENT_IDENTIFIER,
+                        "Installing downloaded deployment update %s => %s", fromVersion, toVersion);
+
+                    InputStream inputStream = new FileInputStream(m_downloadResult.getFile());
+                    try {
+                        deploymentHandler.deployPackage(inputStream);
+                    }
+                    finally {
+                        m_downloadHandle.discard();
+                        m_downloadHandle = null;
+                        inputStream.close();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void doReset() {
+            if (m_downloadHandle != null) {
+                getController().getAgentContext().logInfo(COMPONENT_IDENTIFIER,
+                    "Cancelling deployment package download for version %s because of reset", m_downloadVersion);
+                m_downloadHandle.discard();
+            }
+            clearDownloadState();
+        }
+
+        @Override
+        public void progress(long contentLength, long progress) {
+            m_downloadLength = contentLength;
+            m_downloadProgress = progress;
+        }
+
+        @Override
+        public void completed(DownloadResult result) {
+            m_downloadResult = result;
+            getController().getAgentContext().logInfo(COMPONENT_IDENTIFIER,
+                "Deployment package completed for version %s. Rescheduling the controller to run in %d seconds", m_downloadVersion, 5);
+            getController().scheduleRun(5);
+        }
+
+        private void clearDownloadState() {
+            if (m_downloadHandle != null) {
+                m_downloadHandle.discard();
+            }
+            m_downloadHandle = null;
+            m_downloadResult = null;
+            m_downloadVersion = null;
+        }
     }
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java?rev=1516836&r1=1516835&r2=1516836&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java Fri Aug 23 13:11:19 2013
@@ -48,7 +48,7 @@ class DownloadHandleImpl implements Down
     private volatile File m_file;
 
     private volatile ProgressListener m_progressListener;
-    private volatile CompletedListener m_completionListener;
+    private volatile ResultListener m_completionListener;
 
     private volatile DownloadResult m_downloadResult;
 
@@ -69,7 +69,7 @@ class DownloadHandleImpl implements Down
     }
 
     @Override
-    public DownloadHandle setCompletionListener(CompletedListener listener) {
+    public DownloadHandle setCompletionListener(ResultListener listener) {
         m_completionListener = listener;
         return this;
     }
@@ -183,7 +183,7 @@ class DownloadHandleImpl implements Down
             }
     }
 
-    private static void callCompletionListener(CompletedListener listener, DownloadResult result) {
+    private static void callCompletionListener(ResultListener listener, DownloadResult result) {
         if (listener != null && result != null)
             try {
                 listener.completed(result);

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/HandlerBase.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/HandlerBase.java?rev=1516836&r1=1516835&r2=1516836&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/HandlerBase.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/HandlerBase.java Fri Aug 23 13:11:19 2013
@@ -33,11 +33,13 @@ public abstract class HandlerBase implem
     @Override
     public final void start(AgentContext agentContext) throws Exception {
         m_agentContext = agentContext;
+        m_agentContext.logDebug(m_componentIdentifier, "Starting");
         onStart();
     }
 
     @Override
     public final void stop() throws Exception {
+        m_agentContext.logDebug(m_componentIdentifier, "Stopping");
         m_agentContext = null;
         onStop();
     }

Modified: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java?rev=1516836&r1=1516835&r2=1516836&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java (original)
+++ ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java Fri Aug 23 13:11:19 2013
@@ -47,7 +47,7 @@ import javax.servlet.http.HttpServletRes
 
 import org.apache.ace.agent.AgentContext;
 import org.apache.ace.agent.DownloadHandle;
-import org.apache.ace.agent.DownloadHandle.CompletedListener;
+import org.apache.ace.agent.DownloadHandle.ResultListener;
 import org.apache.ace.agent.DownloadHandle.ProgressListener;
 import org.apache.ace.agent.DownloadHandler;
 import org.apache.ace.agent.DownloadResult;
@@ -139,7 +139,7 @@ public class DownloadHandlerTest extends
         final CountDownLatch latch = new CountDownLatch(1);
         final List<DownloadResult> holder = new ArrayList<DownloadResult>();
         final DownloadHandle handle = m_downloadHandler.getHandle(m_200url)
-            .setCompletionListener(new CompletedListener() {
+            .setCompletionListener(new ResultListener() {
                 @Override
                 public void completed(DownloadResult result) {
                     holder.add(result);