You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 13:59:08 UTC
svn commit: r1632281 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
JobManagerConfiguration.java JobManagerImpl.java TestLogger.java
Utility.java topics/TopicManager.java topology/CheckTopologyTask.java
Author: cziegeler
Date: Thu Oct 16 11:59:07 2014
New Revision: 1632281
URL: http://svn.apache.org/r1632281
Log:
SLING-4048 : Avoid keeping jobs in memory. Fix bridged events (WiP)
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java Thu Oct 16 11:59:07 2014
@@ -256,8 +256,7 @@ public class JobManagerConfiguration {
final String topic,
final String jobId,
final Map<String, Object> jobProperties) {
- final boolean isBridged = (jobProperties != null ? jobProperties.containsKey(JobImpl.PROPERTY_BRIDGED_EVENT) : false);
- final String topicName = (isBridged ? JobImpl.PROPERTY_BRIDGED_EVENT : topic.replace('/', '.'));
+ final String topicName = topic.replace('/', '.');
final StringBuilder sb = new StringBuilder();
if ( targetId != null ) {
sb.append(this.assignedJobsPath);
@@ -340,7 +339,7 @@ public class JobManagerConfiguration {
* @return The complete storage path
*/
public String getStoragePath(final JobImpl finishedJob, final boolean isSuccess) {
- final String topicName = (finishedJob.isBridgedEvent() ? JobImpl.PROPERTY_BRIDGED_EVENT : finishedJob.getTopic().replace('/', '.'));
+ final String topicName = finishedJob.getTopic().replace('/', '.');
final StringBuilder sb = new StringBuilder();
if ( isSuccess ) {
sb.append(this.storedSuccessfulJobsPath);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Oct 16 11:59:07 2014
@@ -976,7 +976,7 @@ public class JobManagerImpl
// create path and resource
properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
if ( logger.isDebugEnabled() ) {
- logger.debug("Storing new job {} at {}", properties, path);
+ logger.debug("Storing new job {} at {}", Utility.toString(jobTopic, jobName, properties), path);
}
ResourceHelper.getOrCreateResource(resolver,
path,
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java Thu Oct 16 11:59:07 2014
@@ -5,7 +5,7 @@ import org.slf4j.Marker;
public class TestLogger implements Logger {
- private final boolean DEBUG = false;
+ private final boolean DEBUG = true;
private final Logger logger;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Thu Oct 16 11:59:07 2014
@@ -148,8 +148,8 @@ public abstract class Utility {
}
}
}
-
}
+
/**
* Improved toString method for a job.
* This method prints out the job topic and all of the properties.
@@ -175,18 +175,21 @@ public abstract class Utility {
* This method prints out the job topic and all of the properties.
*/
public static String toString(final Job job) {
- final StringBuilder sb = new StringBuilder("Sling Job ");
- sb.append("[topic=");
- sb.append(job.getTopic());
- sb.append(", id=");
- sb.append(job.getId());
- if ( job.getName() != null ) {
- sb.append(", name=");
- sb.append(job.getName());
+ if ( job != null ) {
+ final StringBuilder sb = new StringBuilder("Sling Job ");
+ sb.append("[topic=");
+ sb.append(job.getTopic());
+ sb.append(", id=");
+ sb.append(job.getId());
+ if ( job.getName() != null ) {
+ sb.append(", name=");
+ sb.append(job.getName());
+ }
+ appendProperties(sb, ((JobImpl)job).getProperties());
+ sb.append("]");
+ return sb.toString();
}
- appendProperties(sb, ((JobImpl)job).getProperties());
- sb.append("]");
- return sb.toString();
+ return "<null>";
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java Thu Oct 16 11:59:07 2014
@@ -19,9 +19,7 @@
package org.apache.sling.event.impl.jobs.topics;
import java.util.Collections;
-import java.util.Dictionary;
import java.util.HashMap;
-import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -32,8 +30,9 @@ import java.util.concurrent.atomic.Atomi
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
-import org.apache.sling.api.SlingConstants;
+import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
@@ -43,6 +42,7 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobManagerImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.TestLogger;
+import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.jobs.queues.QueueManager;
@@ -51,9 +51,8 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.topology.TopologyHandler;
import org.apache.sling.event.impl.support.BatchResourceRemover;
import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.NotificationConstants;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
@@ -65,7 +64,9 @@ import org.slf4j.LoggerFactory;
*
* TODO - Check syncing of take/update/stop. This might not be 100% correct yet.
*/
-@Component
+@Component(immediate=true)
+@Service(value=EventHandler.class)
+@Property(name=EventConstants.EVENT_TOPIC, value=NotificationConstants.TOPIC_JOB_ADDED)
public class TopicManager implements EventHandler, TopologyAware {
/** Logger. */
@@ -92,9 +93,6 @@ public class TopicManager implements Eve
/** A set of all topics. Access needs synchronization. */
private final Set<String> topics = new TreeSet<String>();
- /** Service registration for the event handler. */
- private volatile ServiceRegistration eventHandlerRegistration;
-
/** Marker if a new topic has been added. */
private final AtomicBoolean topicsChanged = new AtomicBoolean(false);
@@ -105,32 +103,18 @@ public class TopicManager implements Eve
/**
* Activate this component.
- * Register an event handler.
*/
@Activate
protected void activate(final BundleContext bundleContext) {
- final Dictionary<String, Object> properties = new Hashtable<String, Object>();
- properties.put(Constants.SERVICE_DESCRIPTION, "Apache Sling Job Topic Manager Event Handler");
- properties.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
- properties.put(EventConstants.EVENT_TOPIC, SlingConstants.TOPIC_RESOURCE_ADDED);
- properties.put(EventConstants.EVENT_FILTER,
- "(" + SlingConstants.PROPERTY_PATH + "=" +
- this.configuration.getLocalJobsPath() + "/*)");
- this.eventHandlerRegistration = bundleContext.registerService(EventHandler.class.getName(), this, properties);
this.topologyHandler.addListener(this);
}
/**
* Deactivate this component.
- * Unregister the event handler.
*/
@Deactivate
protected void deactivate() {
this.topologyHandler.removeListener(this);
- if ( this.eventHandlerRegistration != null ) {
- this.eventHandlerRegistration.unregister();
- this.eventHandlerRegistration = null;
- }
}
/**
@@ -165,16 +149,8 @@ public class TopicManager implements Eve
*/
@Override
public void handleEvent(final Event event) {
- final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
- if ( this.configuration.isLocalJob(path) ) {
- final int topicStart = this.configuration.getLocalJobsPath().length() + 1;
- final int topicEnd = path.indexOf("/", topicStart);
- final String topic;
- if ( topicEnd == -1 ) {
- topic = path.substring(topicStart).replace('.', '/');
- } else {
- topic = path.substring(topicStart, topicEnd).replace('.', '/');
- }
+ final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
+ if ( topic != null ) {
boolean changed = false;
synchronized ( topics ) {
final int len = topics.size();
@@ -272,7 +248,9 @@ public class TopicManager implements Eve
} finally {
this.queueLocks.remove(queueName);
}
- logger.debug("Took new job for {} : {}", queueName, result);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Took new job for {} : {}", queueName, Utility.toString(result));
+ }
return (result != null ? new JobHandler( result, (JobManagerImpl)this.jobManager) : null);
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java Thu Oct 16 11:59:07 2014
@@ -18,6 +18,7 @@
*/
package org.apache.sling.event.impl.jobs.topology;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -117,6 +118,9 @@ public class CheckTopologyTask {
}
}
+ /** Properties to include bridge job consumers for the quick test. */
+ private static final Map<String, Object> BRIDGED_JOB = Collections.singletonMap(JobImpl.PROPERTY_BRIDGED_EVENT, (Object)Boolean.TRUE);
+
/**
* Try to assign all jobs from the jobs root.
* The jobs are stored by topic
@@ -136,15 +140,8 @@ public class CheckTopologyTask {
final String topicName = topicResource.getName().replace('.', '/');
logger.debug("Found topic {}", topicName);
- final String checkTopic;
- if ( topicName.equals(JobImpl.PROPERTY_BRIDGED_EVENT) ) {
- checkTopic = "/";
- } else {
- checkTopic = topicName;
- }
-
// first check if there is an instance for these topics
- final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null);
+ final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, BRIDGED_JOB);
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
final QueueInfo info = this.queueConfigManager.getQueueInfo(topicName);
logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);