You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/12/29 17:19:07 UTC
svn commit: r1722178 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/config/
main/java/org/apache/sling/event/impl/jobs/tasks/
test/java/org/apache/sling/event/impl/jobs/config/
Author: cziegeler
Date: Tue Dec 29 16:19:05 2015
New Revision: 1722178
URL: http://svn.apache.org/viewvc?rev=1722178&view=rev
Log:
SLING-5406 : Optimize configuration change handling
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1722178&r1=1722177&r2=1722178&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Tue Dec 29 16:19:05 2015
@@ -176,8 +176,6 @@ public class JobManagerConfiguration {
/** The topology capabilities. */
private volatile TopologyCapabilities topologyCapabilities;
- private final AtomicBoolean firstTopologyEvent = new AtomicBoolean(true);
-
/**
* Activate this component.
* @param props Configuration properties
@@ -239,6 +237,10 @@ public class JobManagerConfiguration {
this.stopProcessing();
}
+ /**
+ * Is this component still active?
+ * @return Active?
+ */
public boolean isActive() {
return this.active.get();
}
@@ -452,11 +454,8 @@ public class JobManagerConfiguration {
* Start processing
* @param eventType The event type
* @param newCaps The new capabilities
- * @param isConfigChange If a configuration change occured.
*/
- private void startProcessing(final Type eventType, final TopologyCapabilities newCaps,
- final boolean isConfigChange,
- final boolean runMaintenanceTasks) {
+ private void startProcessing(final Type eventType, final TopologyCapabilities newCaps) {
logger.debug("Starting job processing...");
// create new capabilities and update view
this.topologyCapabilities = newCaps;
@@ -468,48 +467,48 @@ public class JobManagerConfiguration {
final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask(this);
rt.run();
- }
- final CheckTopologyTask mt = new CheckTopologyTask(this);
- if ( runMaintenanceTasks ) {
- // we run the checker task twice, now and shortly after the topology has changed.
- mt.fullRun(!isConfigChange, isConfigChange);
- }
+ final CheckTopologyTask mt = new CheckTopologyTask(this);
+ mt.fullRun();
- if ( eventType == Type.TOPOLOGY_INIT ) {
notifiyListeners();
} else {
// and run checker again in some seconds (if leader)
// notify listeners afterwards
final Scheduler local = this.scheduler;
if ( local != null ) {
- local.schedule(new Runnable() {
+ final Runnable r = new Runnable() {
@Override
public void run() {
- if ( newCaps == topologyCapabilities ) {
- if ( runMaintenanceTasks ) {
- if ( newCaps.isLeader() && newCaps.isActive() ) {
- mt.assignUnassignedJobs();
- }
- }
+ if ( newCaps == topologyCapabilities && newCaps.isActive()) {
// start listeners
- if ( newCaps.isActive() ) {
- synchronized ( listeners ) {
- notifiyListeners();
- }
+ notifiyListeners();
+ if ( newCaps.isLeader() && newCaps.isActive() ) {
+ final CheckTopologyTask mt = new CheckTopologyTask(JobManagerConfiguration.this);
+ mt.fullRun();
}
}
}
- }, local.AT(new Date(System.currentTimeMillis() + this.backgroundLoadDelay * 1000)));
+ };
+ if ( !local.schedule(r, local.AT(new Date(System.currentTimeMillis() + this.backgroundLoadDelay * 1000))) ) {
+ // if for whatever reason scheduling doesn't work, let's run now
+ r.run();
+ }
}
}
logger.debug("Job processing started");
}
+ /**
+ * Notify all listeners
+ */
private void notifiyListeners() {
- for(final ConfigurationChangeListener l : this.listeners) {
- l.configurationChanged(this.topologyCapabilities != null);
+ synchronized ( this.listeners ) {
+ final TopologyCapabilities caps = this.topologyCapabilities;
+ for(final ConfigurationChangeListener l : this.listeners) {
+ l.configurationChanged(caps != null);
+ }
}
}
@@ -521,46 +520,31 @@ public class JobManagerConfiguration {
public void handleTopologyEvent(final TopologyEvent event) {
this.logger.debug("Received topology event {}", event);
- // queue configuration changed?
- if ( event == null ) {
- final TopologyCapabilities caps = this.topologyCapabilities;
- if ( caps != null && this.isActive() ) {
- this.startProcessing(Type.PROPERTIES_CHANGED, caps, true, true);
- }
- return;
- }
-
- boolean runMaintenanceTasks = true;
// check if there is a change of properties which doesn't affect us
// but we need to use the new view !
+ boolean stopProcessing = true;
if ( event.getType() == Type.PROPERTIES_CHANGED ) {
final Map<String, String> newAllInstances = TopologyCapabilities.getAllInstancesMap(event.getNewView());
if ( this.topologyCapabilities != null && this.topologyCapabilities.isSame(newAllInstances) ) {
- logger.debug("No changes in capabilities - restarting without maintenance tasks");
- runMaintenanceTasks = false;
+ logger.debug("No changes in capabilities - updating topology capabilities with new view");
+ stopProcessing = false;
}
}
- TopologyEvent.Type eventType = event.getType();
- if( this.firstTopologyEvent.compareAndSet(true, false) ) {
- if ( eventType == Type.TOPOLOGY_CHANGED ) {
- eventType = Type.TOPOLOGY_INIT;
- }
- }
- synchronized ( this.listeners ) {
+ final TopologyEvent.Type eventType = event.getType();
- if ( eventType == Type.TOPOLOGY_CHANGING ) {
- this.stopProcessing();
+ if ( eventType == Type.TOPOLOGY_CHANGING ) {
+ this.stopProcessing();
- } else if ( eventType == Type.TOPOLOGY_INIT
- || event.getType() == Type.TOPOLOGY_CHANGED
- || event.getType() == Type.PROPERTIES_CHANGED ) {
+ } else if ( eventType == Type.TOPOLOGY_INIT
+ || event.getType() == Type.TOPOLOGY_CHANGED
+ || event.getType() == Type.PROPERTIES_CHANGED ) {
+ if ( stopProcessing ) {
this.stopProcessing();
-
- this.startProcessing(eventType, new TopologyCapabilities(event.getNewView(), this), false, runMaintenanceTasks);
}
+ this.startProcessing(eventType, new TopologyCapabilities(event.getNewView(), this));
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1722178&r1=1722177&r2=1722178&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Tue Dec 29 16:19:05 2015
@@ -53,9 +53,6 @@ public class QueueConfigurationManager {
@Reference
private MainQueueConfiguration mainQueueConfiguration;
- /** Listener - this is the job manager configuration component. */
- private volatile TopologyHandler changeListener;
-
/**
* Add a new queue configuration.
* @param config A new queue configuration.
@@ -69,7 +66,7 @@ public class QueueConfigurationManager {
/**
* Remove a queue configuration.
- * @param config The queue configuraiton.
+ * @param config The queue configuration.
*/
protected void unbindConfig(final InternalQueueConfiguration config) {
synchronized ( configurations ) {
@@ -80,7 +77,7 @@ public class QueueConfigurationManager {
/**
* Update a queue configuration.
- * @param config The queue configuraiton.
+ * @param config The queue configuration.
*/
protected void updateConfig(final InternalQueueConfiguration config) {
// InternalQueueConfiguration does not implement modified atm,
@@ -100,7 +97,6 @@ public class QueueConfigurationManager {
Collections.sort(configurations);
orderedConfigs = configurations.toArray(new InternalQueueConfiguration[configurations.size()]);
}
- this.updateListener();
}
/**
@@ -170,29 +166,4 @@ public class QueueConfigurationManager {
return result;
}
-
- /**
- * Add a config listener.
- * @param listener
- */
- public void addListener(final TopologyHandler listener) {
- this.changeListener = listener;
- }
-
- /**
- * Remove the config listener.
- */
- public void removeListener() {
- this.changeListener = null;
- }
-
- /**
- * Update the listener.
- */
- private void updateListener() {
- final TopologyHandler l = this.changeListener;
- if ( l != null ) {
- l.queueConfigurationChanged();
- }
- }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyHandler.java?rev=1722178&r1=1722177&r2=1722178&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyHandler.java Tue Dec 29 16:19:05 2015
@@ -55,7 +55,6 @@ public class TopologyHandler implements
@Activate
protected void activate() {
this.isActive.set(true);
- this.configuration.getQueueConfigurationManager().addListener(this);
final Thread thread = new Thread(this, "Apache Sling Job Topology Listener Thread");
thread.setDaemon(true);
@@ -64,7 +63,6 @@ public class TopologyHandler implements
@Deactivate
protected void deactivate() {
- this.configuration.getQueueConfigurationManager().removeListener();
this.isActive.set(false);
this.queue.clear();
try {
@@ -87,20 +85,6 @@ public class TopologyHandler implements
}
}
- /**
- * This method is invoked by the queue configuration manager
- * whenever the queue configuration changes.
- */
- public void queueConfigurationChanged() {
- final QueueItem item = new QueueItem();
- try {
- this.queue.put(item);
- } catch ( final InterruptedException ie) {
- logger.warn("Thread got interrupted.", ie);
- Thread.currentThread().interrupt();
- }
- }
-
@Override
public void run() {
while ( isActive.get() ) {
@@ -112,7 +96,7 @@ public class TopologyHandler implements
Thread.currentThread().interrupt();
isActive.set(false);
}
- if ( isActive.get() && item != null ) {
+ if ( isActive.get() && item != null && item.event != null ) {
final JobManagerConfiguration config = this.configuration;
if ( config != null ) {
config.handleTopologyEvent(item.event);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?rev=1722178&r1=1722177&r2=1722178&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Tue Dec 29 16:19:05 2015
@@ -318,16 +318,13 @@ public class CheckTopologyTask {
/**
* One maintenance run
*/
- public void fullRun(final boolean topologyChanged,
- final boolean configChanged) {
+ public void fullRun() {
if ( this.caps != null ) {
- // if topology changed, reschedule assigned jobs for stopped instances
- if ( topologyChanged ) {
- this.reassignJobsFromStoppedInstances();
- }
+ this.reassignJobsFromStoppedInstances();
+
// check for all topics
this.reassignStaleJobs();
-
+
// try to assign unassigned jobs
this.assignUnassignedJobs();
}
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java?rev=1722178&r1=1722177&r2=1722178&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java Tue Dec 29 16:19:05 2015
@@ -76,8 +76,8 @@ public class JobManagerConfigurationTest
}
public void await() throws Exception {
- if ( !latch.await(5000, TimeUnit.MILLISECONDS) ) {
- throw new Exception("No configuration event within 5 seconds.");
+ if ( !latch.await(8000, TimeUnit.MILLISECONDS) ) {
+ throw new Exception("No configuration event within 8 seconds.");
}
}
@@ -108,6 +108,7 @@ public class JobManagerConfigurationTest
((Runnable)job).run();
}
}, 3000);
+ return true;
}
return false;
}
@@ -224,7 +225,7 @@ public class JobManagerConfigurationTest
assertTrue(ccl.events.get(0));
// change view, followed by change props
- ccl.init(3);
+ ccl.init(2);
final TopologyView view2 = createView();
Mockito.when(initView.isCurrent()).thenReturn(false);
final TopologyEvent change1 = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, initView, view2);
@@ -236,14 +237,13 @@ public class JobManagerConfigurationTest
config.handleTopologyEvent(change2);
ccl.await();
- assertEquals(3, ccl.events.size());
+ assertEquals(2, ccl.events.size());
assertFalse(ccl.events.get(0));
- assertFalse(ccl.events.get(1));
- assertTrue(ccl.events.get(2));
+ assertTrue(ccl.events.get(1));
// we wait another 4 secs to see if there is no another event
Thread.sleep(4000);
- assertEquals(3, ccl.events.size());
+ assertEquals(2, ccl.events.size());
}
}