You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/12/29 17:19:07 UTC

svn commit: r1722178 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/config/ main/java/org/apache/sling/event/impl/jobs/tasks/ test/java/org/apache/sling/event/impl/jobs/config/

Author: cziegeler
Date: Tue Dec 29 16:19:05 2015
New Revision: 1722178

URL: http://svn.apache.org/viewvc?rev=1722178&view=rev
Log:
SLING-5406 : Optimize configuration change handling

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1722178&r1=1722177&r2=1722178&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Tue Dec 29 16:19:05 2015
@@ -176,8 +176,6 @@ public class JobManagerConfiguration {
     /** The topology capabilities. */
     private volatile TopologyCapabilities topologyCapabilities;
 
-    private final AtomicBoolean firstTopologyEvent = new AtomicBoolean(true);
-
     /**
      * Activate this component.
      * @param props Configuration properties
@@ -239,6 +237,10 @@ public class JobManagerConfiguration {
         this.stopProcessing();
     }
 
+    /**
+     * Is this component still active?
+     * @return Active?
+     */
     public boolean isActive() {
         return this.active.get();
     }
@@ -452,11 +454,8 @@ public class JobManagerConfiguration {
      * Start processing
      * @param eventType The event type
      * @param newCaps The new capabilities
-     * @param isConfigChange If a configuration change occured.
      */
-    private void startProcessing(final Type eventType, final TopologyCapabilities newCaps,
-            final boolean isConfigChange,
-            final boolean runMaintenanceTasks) {
+    private void startProcessing(final Type eventType, final TopologyCapabilities newCaps) {
         logger.debug("Starting job processing...");
         // create new capabilities and update view
         this.topologyCapabilities = newCaps;
@@ -468,48 +467,48 @@ public class JobManagerConfiguration {
 
             final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask(this);
             rt.run();
-        }
 
-        final CheckTopologyTask mt = new CheckTopologyTask(this);
-        if ( runMaintenanceTasks ) {
-            // we run the checker task twice, now and shortly after the topology has changed.
-            mt.fullRun(!isConfigChange, isConfigChange);
-        }
+            final CheckTopologyTask mt = new CheckTopologyTask(this);
+            mt.fullRun();
 
-        if ( eventType == Type.TOPOLOGY_INIT ) {
             notifiyListeners();
         } else {
             // and run checker again in some seconds (if leader)
             // notify listeners afterwards
             final Scheduler local = this.scheduler;
             if ( local != null ) {
-                local.schedule(new Runnable() {
+                final Runnable r = new Runnable() {
 
                     @Override
                     public void run() {
-                        if ( newCaps == topologyCapabilities ) {
-                            if ( runMaintenanceTasks ) {
-                                if ( newCaps.isLeader() && newCaps.isActive() ) {
-                                    mt.assignUnassignedJobs();
-                                }
-                            }
+                        if ( newCaps == topologyCapabilities && newCaps.isActive()) {
                             // start listeners
-                            if ( newCaps.isActive() ) {
-                                synchronized ( listeners ) {
-                                    notifiyListeners();
-                                }
+                            notifiyListeners();
+                            if ( newCaps.isLeader() && newCaps.isActive() ) {
+                                final CheckTopologyTask mt = new CheckTopologyTask(JobManagerConfiguration.this);
+                                mt.fullRun();
                             }
                         }
                     }
-                }, local.AT(new Date(System.currentTimeMillis() + this.backgroundLoadDelay * 1000)));
+                };
+                if ( !local.schedule(r, local.AT(new Date(System.currentTimeMillis() + this.backgroundLoadDelay * 1000))) ) {
+                    // if for whatever reason scheduling doesn't work, let's run now
+                    r.run();
+                }
             }
         }
         logger.debug("Job processing started");
     }
 
+    /**
+     * Notify all listeners
+     */
     private void notifiyListeners() {
-        for(final ConfigurationChangeListener l : this.listeners) {
-            l.configurationChanged(this.topologyCapabilities != null);
+        synchronized ( this.listeners ) {
+            final TopologyCapabilities caps = this.topologyCapabilities;
+            for(final ConfigurationChangeListener l : this.listeners) {
+                l.configurationChanged(caps != null);
+            }
         }
     }
 
@@ -521,46 +520,31 @@ public class JobManagerConfiguration {
     public void handleTopologyEvent(final TopologyEvent event) {
         this.logger.debug("Received topology event {}", event);
 
-        // queue configuration changed?
-        if ( event == null ) {
-            final TopologyCapabilities caps = this.topologyCapabilities;
-            if ( caps != null && this.isActive() ) {
-                this.startProcessing(Type.PROPERTIES_CHANGED, caps, true, true);
-            }
-            return;
-        }
-
-        boolean runMaintenanceTasks = true;
         // check if there is a change of properties which doesn't affect us
         // but we need to use the new view !
+        boolean stopProcessing = true;
         if ( event.getType() == Type.PROPERTIES_CHANGED ) {
             final Map<String, String> newAllInstances = TopologyCapabilities.getAllInstancesMap(event.getNewView());
             if ( this.topologyCapabilities != null && this.topologyCapabilities.isSame(newAllInstances) ) {
-                logger.debug("No changes in capabilities - restarting without maintenance tasks");
-                runMaintenanceTasks = false;
+                logger.debug("No changes in capabilities - updating topology capabilities with new view");
+                stopProcessing = false;
             }
         }
 
-        TopologyEvent.Type eventType = event.getType();
-        if( this.firstTopologyEvent.compareAndSet(true, false) ) {
-            if ( eventType == Type.TOPOLOGY_CHANGED ) {
-                eventType = Type.TOPOLOGY_INIT;
-            }
-        }
-        synchronized ( this.listeners ) {
+        final TopologyEvent.Type eventType = event.getType();
 
-            if ( eventType == Type.TOPOLOGY_CHANGING ) {
-               this.stopProcessing();
+        if ( eventType == Type.TOPOLOGY_CHANGING ) {
+           this.stopProcessing();
 
-            } else if ( eventType == Type.TOPOLOGY_INIT
-                || event.getType() == Type.TOPOLOGY_CHANGED
-                || event.getType() == Type.PROPERTIES_CHANGED ) {
+        } else if ( eventType == Type.TOPOLOGY_INIT
+            || event.getType() == Type.TOPOLOGY_CHANGED
+            || event.getType() == Type.PROPERTIES_CHANGED ) {
 
+            if ( stopProcessing ) {
                 this.stopProcessing();
-
-                this.startProcessing(eventType, new TopologyCapabilities(event.getNewView(), this), false, runMaintenanceTasks);
             }
 
+            this.startProcessing(eventType, new TopologyCapabilities(event.getNewView(), this));
         }
     }
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1722178&r1=1722177&r2=1722178&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Tue Dec 29 16:19:05 2015
@@ -53,9 +53,6 @@ public class QueueConfigurationManager {
     @Reference
     private MainQueueConfiguration mainQueueConfiguration;
 
-    /** Listener - this is the job manager configuration component. */
-    private volatile TopologyHandler changeListener;
-
     /**
      * Add a new queue configuration.
      * @param config A new queue configuration.
@@ -69,7 +66,7 @@ public class QueueConfigurationManager {
 
     /**
      * Remove a queue configuration.
-     * @param config The queue configuraiton.
+     * @param config The queue configuration.
      */
     protected void unbindConfig(final InternalQueueConfiguration config) {
         synchronized ( configurations ) {
@@ -80,7 +77,7 @@ public class QueueConfigurationManager {
 
     /**
      * Update a queue configuration.
-     * @param config The queue configuraiton.
+     * @param config The queue configuration.
      */
     protected void updateConfig(final InternalQueueConfiguration config) {
         // InternalQueueConfiguration does not implement modified atm,
@@ -100,7 +97,6 @@ public class QueueConfigurationManager {
             Collections.sort(configurations);
             orderedConfigs = configurations.toArray(new InternalQueueConfiguration[configurations.size()]);
         }
-        this.updateListener();
     }
 
     /**
@@ -170,29 +166,4 @@ public class QueueConfigurationManager {
 
         return result;
     }
-
-    /**
-     * Add a config listener.
-     * @param listener
-     */
-    public void addListener(final TopologyHandler listener) {
-        this.changeListener = listener;
-    }
-
-    /**
-     * Remove the config listener.
-     */
-    public void removeListener() {
-        this.changeListener = null;
-    }
-
-    /**
-     * Update the listener.
-     */
-    private void updateListener() {
-        final TopologyHandler l = this.changeListener;
-        if ( l != null ) {
-            l.queueConfigurationChanged();
-        }
-    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyHandler.java?rev=1722178&r1=1722177&r2=1722178&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyHandler.java Tue Dec 29 16:19:05 2015
@@ -55,7 +55,6 @@ public class TopologyHandler implements
     @Activate
     protected void activate() {
         this.isActive.set(true);
-        this.configuration.getQueueConfigurationManager().addListener(this);
         final Thread thread = new Thread(this, "Apache Sling Job Topology Listener Thread");
         thread.setDaemon(true);
 
@@ -64,7 +63,6 @@ public class TopologyHandler implements
 
     @Deactivate
     protected void deactivate() {
-        this.configuration.getQueueConfigurationManager().removeListener();
         this.isActive.set(false);
         this.queue.clear();
         try {
@@ -87,20 +85,6 @@ public class TopologyHandler implements
         }
     }
 
-    /**
-     * This method is invoked by the queue configuration manager
-     * whenever the queue configuration changes.
-     */
-    public void queueConfigurationChanged() {
-        final QueueItem item = new QueueItem();
-        try {
-            this.queue.put(item);
-        } catch ( final InterruptedException ie) {
-            logger.warn("Thread got interrupted.", ie);
-            Thread.currentThread().interrupt();
-        }
-    }
-
     @Override
     public void run() {
         while ( isActive.get() ) {
@@ -112,7 +96,7 @@ public class TopologyHandler implements
                 Thread.currentThread().interrupt();
                 isActive.set(false);
             }
-            if ( isActive.get() && item != null ) {
+            if ( isActive.get() && item != null && item.event != null ) {
                 final JobManagerConfiguration config = this.configuration;
                 if ( config != null ) {
                     config.handleTopologyEvent(item.event);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?rev=1722178&r1=1722177&r2=1722178&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Tue Dec 29 16:19:05 2015
@@ -318,16 +318,13 @@ public class CheckTopologyTask {
     /**
      * One maintenance run
      */
-    public void fullRun(final boolean topologyChanged,
-                        final boolean configChanged) {
+    public void fullRun() {
         if ( this.caps != null ) {
-            // if topology changed, reschedule assigned jobs for stopped instances
-            if ( topologyChanged ) {
-                this.reassignJobsFromStoppedInstances();
-            }
+            this.reassignJobsFromStoppedInstances();
+
             // check for all topics
             this.reassignStaleJobs();
-    
+
             // try to assign unassigned jobs
             this.assignUnassignedJobs();
         }

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java?rev=1722178&r1=1722177&r2=1722178&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java Tue Dec 29 16:19:05 2015
@@ -76,8 +76,8 @@ public class JobManagerConfigurationTest
         }
 
         public void await() throws Exception {
-            if ( !latch.await(5000, TimeUnit.MILLISECONDS) ) {
-                throw new Exception("No configuration event within 5 seconds.");
+            if ( !latch.await(8000, TimeUnit.MILLISECONDS) ) {
+                throw new Exception("No configuration event within 8 seconds.");
             }
         }
 
@@ -108,6 +108,7 @@ public class JobManagerConfigurationTest
                             ((Runnable)job).run();
                         }
                     }, 3000);
+                    return true;
                 }
                 return false;
             }
@@ -224,7 +225,7 @@ public class JobManagerConfigurationTest
         assertTrue(ccl.events.get(0));
 
         // change view, followed by change props
-        ccl.init(3);
+        ccl.init(2);
         final TopologyView view2 = createView();
         Mockito.when(initView.isCurrent()).thenReturn(false);
         final TopologyEvent change1 = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, initView, view2);
@@ -236,14 +237,13 @@ public class JobManagerConfigurationTest
         config.handleTopologyEvent(change2);
 
         ccl.await();
-        assertEquals(3, ccl.events.size());
+        assertEquals(2, ccl.events.size());
         assertFalse(ccl.events.get(0));
-        assertFalse(ccl.events.get(1));
-        assertTrue(ccl.events.get(2));
+        assertTrue(ccl.events.get(1));
 
         // we wait another 4 secs to see if there is no another event
         Thread.sleep(4000);
-        assertEquals(3, ccl.events.size());
+        assertEquals(2, ccl.events.size());
 
     }
 }