You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 13:59:08 UTC

svn commit: r1632281 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: JobManagerConfiguration.java JobManagerImpl.java TestLogger.java Utility.java topics/TopicManager.java topology/CheckTopologyTask.java

Author: cziegeler
Date: Thu Oct 16 11:59:07 2014
New Revision: 1632281

URL: http://svn.apache.org/r1632281
Log:
SLING-4048 : Avoid keeping jobs in memory. Fix bridged events (WiP)

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java Thu Oct 16 11:59:07 2014
@@ -256,8 +256,7 @@ public class JobManagerConfiguration {
             final String topic,
             final String jobId,
             final Map<String, Object> jobProperties) {
-        final boolean isBridged = (jobProperties != null ? jobProperties.containsKey(JobImpl.PROPERTY_BRIDGED_EVENT) : false);
-        final String topicName = (isBridged ? JobImpl.PROPERTY_BRIDGED_EVENT : topic.replace('/', '.'));
+        final String topicName = topic.replace('/', '.');
         final StringBuilder sb = new StringBuilder();
         if ( targetId != null ) {
             sb.append(this.assignedJobsPath);
@@ -340,7 +339,7 @@ public class JobManagerConfiguration {
      * @return The complete storage path
      */
     public String getStoragePath(final JobImpl finishedJob, final boolean isSuccess) {
-        final String topicName = (finishedJob.isBridgedEvent() ? JobImpl.PROPERTY_BRIDGED_EVENT : finishedJob.getTopic().replace('/', '.'));
+        final String topicName = finishedJob.getTopic().replace('/', '.');
         final StringBuilder sb = new StringBuilder();
         if ( isSuccess ) {
             sb.append(this.storedSuccessfulJobsPath);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Oct 16 11:59:07 2014
@@ -976,7 +976,7 @@ public class JobManagerImpl
         // create path and resource
         properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
         if ( logger.isDebugEnabled() ) {
-            logger.debug("Storing new job {} at {}", properties, path);
+            logger.debug("Storing new job {} at {}", Utility.toString(jobTopic, jobName, properties), path);
         }
         ResourceHelper.getOrCreateResource(resolver,
                 path,

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java Thu Oct 16 11:59:07 2014
@@ -5,7 +5,7 @@ import org.slf4j.Marker;
 
 public class TestLogger implements Logger {
 
-    private final boolean DEBUG = false;
+    private final boolean DEBUG = true;
 
     private final Logger logger;
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Thu Oct 16 11:59:07 2014
@@ -148,8 +148,8 @@ public abstract class Utility {
                 }
             }
         }
-
     }
+
     /**
      * Improved toString method for a job.
      * This method prints out the job topic and all of the properties.
@@ -175,18 +175,21 @@ public abstract class Utility {
      * This method prints out the job topic and all of the properties.
      */
     public static String toString(final Job job) {
-        final StringBuilder sb = new StringBuilder("Sling Job ");
-        sb.append("[topic=");
-        sb.append(job.getTopic());
-        sb.append(", id=");
-        sb.append(job.getId());
-        if ( job.getName() != null ) {
-            sb.append(", name=");
-            sb.append(job.getName());
+        if ( job != null ) {
+            final StringBuilder sb = new StringBuilder("Sling Job ");
+            sb.append("[topic=");
+            sb.append(job.getTopic());
+            sb.append(", id=");
+            sb.append(job.getId());
+            if ( job.getName() != null ) {
+                sb.append(", name=");
+                sb.append(job.getName());
+            }
+            appendProperties(sb, ((JobImpl)job).getProperties());
+            sb.append("]");
+            return sb.toString();
         }
-        appendProperties(sb, ((JobImpl)job).getProperties());
-        sb.append("]");
-        return sb.toString();
+        return "<null>";
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java Thu Oct 16 11:59:07 2014
@@ -19,9 +19,7 @@
 package org.apache.sling.event.impl.jobs.topics;
 
 import java.util.Collections;
-import java.util.Dictionary;
 import java.util.HashMap;
-import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -32,8 +30,9 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
-import org.apache.sling.api.SlingConstants;
+import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -43,6 +42,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobManagerImpl;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.TestLogger;
+import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.jobs.queues.QueueManager;
@@ -51,9 +51,8 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.topology.TopologyHandler;
 import org.apache.sling.event.impl.support.BatchResourceRemover;
 import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.NotificationConstants;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventConstants;
 import org.osgi.service.event.EventHandler;
@@ -65,7 +64,9 @@ import org.slf4j.LoggerFactory;
  *
  * TODO - Check syncing of take/update/stop. This might not be 100% correct yet.
  */
-@Component
+@Component(immediate=true)
+@Service(value=EventHandler.class)
+@Property(name=EventConstants.EVENT_TOPIC, value=NotificationConstants.TOPIC_JOB_ADDED)
 public class TopicManager implements EventHandler, TopologyAware {
 
     /** Logger. */
@@ -92,9 +93,6 @@ public class TopicManager implements Eve
     /** A set of all topics. Access needs synchronization. */
     private final Set<String> topics = new TreeSet<String>();
 
-    /** Service registration for the event handler. */
-    private volatile ServiceRegistration eventHandlerRegistration;
-
     /** Marker if a new topic has been added. */
     private final AtomicBoolean topicsChanged = new AtomicBoolean(false);
 
@@ -105,32 +103,18 @@ public class TopicManager implements Eve
 
     /**
      * Activate this component.
-     * Register an event handler.
      */
     @Activate
     protected void activate(final BundleContext bundleContext) {
-        final Dictionary<String, Object> properties = new Hashtable<String, Object>();
-        properties.put(Constants.SERVICE_DESCRIPTION, "Apache Sling Job Topic Manager Event Handler");
-        properties.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
-        properties.put(EventConstants.EVENT_TOPIC, SlingConstants.TOPIC_RESOURCE_ADDED);
-        properties.put(EventConstants.EVENT_FILTER,
-                "(" + SlingConstants.PROPERTY_PATH + "=" +
-                      this.configuration.getLocalJobsPath() + "/*)");
-        this.eventHandlerRegistration = bundleContext.registerService(EventHandler.class.getName(), this, properties);
         this.topologyHandler.addListener(this);
     }
 
     /**
      * Deactivate this component.
-     * Unregister the event handler.
      */
     @Deactivate
     protected void deactivate() {
         this.topologyHandler.removeListener(this);
-        if ( this.eventHandlerRegistration != null ) {
-            this.eventHandlerRegistration.unregister();
-            this.eventHandlerRegistration = null;
-        }
     }
 
     /**
@@ -165,16 +149,8 @@ public class TopicManager implements Eve
      */
     @Override
     public void handleEvent(final Event event) {
-        final String path = (String)event.getProperty(SlingConstants.PROPERTY_PATH);
-        if ( this.configuration.isLocalJob(path) ) {
-            final int topicStart = this.configuration.getLocalJobsPath().length() + 1;
-            final int topicEnd = path.indexOf("/", topicStart);
-            final String topic;
-            if ( topicEnd == -1 ) {
-                topic = path.substring(topicStart).replace('.', '/');
-            } else {
-                topic = path.substring(topicStart, topicEnd).replace('.', '/');
-            }
+        final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
+        if ( topic != null ) {
             boolean changed = false;
             synchronized ( topics ) {
                 final int len = topics.size();
@@ -272,7 +248,9 @@ public class TopicManager implements Eve
         } finally {
             this.queueLocks.remove(queueName);
         }
-        logger.debug("Took new job for {} : {}", queueName, result);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Took new job for {} : {}", queueName, Utility.toString(result));
+        }
         return (result != null ? new JobHandler( result, (JobManagerImpl)this.jobManager) : null);
     }
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java?rev=1632281&r1=1632280&r2=1632281&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java Thu Oct 16 11:59:07 2014
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.event.impl.jobs.topology;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -117,6 +118,9 @@ public class CheckTopologyTask {
         }
     }
 
+    /** Properties to include bridge job consumers for the quick test. */
+    private static final Map<String, Object> BRIDGED_JOB = Collections.singletonMap(JobImpl.PROPERTY_BRIDGED_EVENT, (Object)Boolean.TRUE);
+
     /**
      * Try to assign all jobs from the jobs root.
      * The jobs are stored by topic
@@ -136,15 +140,8 @@ public class CheckTopologyTask {
             final String topicName = topicResource.getName().replace('.', '/');
             logger.debug("Found topic {}", topicName);
 
-            final String checkTopic;
-            if ( topicName.equals(JobImpl.PROPERTY_BRIDGED_EVENT) ) {
-                checkTopic = "/";
-            } else {
-                checkTopic = topicName;
-            }
-
             // first check if there is an instance for these topics
-            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null);
+            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, BRIDGED_JOB);
             if ( potentialTargets != null && potentialTargets.size() > 0 ) {
                 final QueueInfo info = this.queueConfigManager.getQueueInfo(topicName);
                 logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);