You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/01/05 11:53:01 UTC

svn commit: r895980 - in /sling/trunk/bundles/extensions/event/src/main: java/org/apache/sling/event/impl/JobEventHandler.java resources/OSGI-INF/metatype/metatype.properties

Author: cziegeler
Date: Tue Jan  5 10:52:53 2010
New Revision: 895980

URL: http://svn.apache.org/viewvc?rev=895980&view=rev
Log:
SLING-1002 : Reduce memory consumption and improve startup behaviour of the job handler

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=895980&r1=895979&r2=895980&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Tue Jan  5 10:52:53 2010
@@ -163,9 +163,42 @@
     /** Number of jobs to load from the repository on startup in one go. */
     private long maxLoadJobs;
 
+    /** @scr.property valueRef="DEFAULT_MAXIMUM_LOAD_JOBS" */
+    private static final String CONFIG_PROPERTY_MAX_LOAD_JOBS = "max.load.jobs";
+
+    /** Default maximum load jobs. */
+    private static final long DEFAULT_MAXIMUM_LOAD_JOBS = 1000;
+
     /** Threshold - if the queue is lower than this threshold the repository is checked for events. */
     private long loadThreshold;
 
+    /** @scr.property valueRef="DEFAULT_LOAD_THRESHOLD" */
+    private static final String CONFIG_PROPERTY_LOAD_THREASHOLD = "load.threshold";
+
+    /** Default load threshold. */
+    private static final long DEFAULT_LOAD_THRESHOLD = 400;
+
+    /** The background loader waits this time of seconds after startup before loading events from the repository. (in secs) */
+    private long backgroundLoadDelay;
+
+    /** @scr.property valueRef="DEFAULT_BACKGROUND_LOAD_DELAY" */
+    private static final String CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY = "load.delay";
+
+    /** Default background load delay. */
+    private static final long DEFAULT_BACKGROUND_LOAD_DELAY = 30;
+
+    /** The background loader waits this time of seconds between loads from the repository. (in secs) */
+    private long backgroundCheckDelay;
+
+    /** @scr.property valueRef="DEFAULT_BACKGROUND_CHECK_DELAY" */
+    private static final String CONFIG_PROPERTY_BACKGROUND_CHECK_DELAY = "load.checkdelay";
+
+    /** Default background check delay. */
+    private static final long DEFAULT_BACKGROUND_CHECK_DELAY = 240;
+
+    /** Time when this service has been started. */
+    private long startTime;
+
     /**
      * Activate this component.
      * @param context
@@ -180,9 +213,14 @@
         this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES);
         this.waitForAckMs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_WAIT_FOR_ACK), DEFAULT_WAIT_FOR_ACK) * 1000;
         this.maximumParallelJobs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS), DEFAULT_MAXIMUM_PARALLEL_JOBS);
+        this.maxLoadJobs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_MAX_LOAD_JOBS), DEFAULT_MAXIMUM_LOAD_JOBS);
+        this.loadThreshold = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_LOAD_THREASHOLD), DEFAULT_LOAD_THRESHOLD);
+        this.backgroundLoadDelay = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY), DEFAULT_BACKGROUND_LOAD_DELAY);
+        this.backgroundCheckDelay = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_CHECK_DELAY), DEFAULT_BACKGROUND_CHECK_DELAY);
         this.componentContext = context;
         super.activate(context);
         JOB_THREAD_POOL = this.threadPool;
+        this.startTime = System.currentTimeMillis();
         // start background thread which loads jobs from the repository
         this.threadPool.execute(new Runnable() {
             public void run() {
@@ -256,25 +294,27 @@
     private void loadJobsInTheBackground() {
         // give the system some time to start
         try {
-            Thread.sleep(1000 * 30); // 30 secs
+            Thread.sleep(1000 * this.backgroundLoadDelay); // default is 30 seconds
         } catch (InterruptedException e) {
             this.ignoreException(e);
         }
         // are we still running?
         if ( this.running ) {
+            logger.debug("Starting background loading.");
             long loadSince = -1;
             do {
                 loadSince = this.loadJobs(loadSince);
                 if ( this.running && loadSince > -1 ) {
                     do {
                         try {
-                            Thread.sleep(1000 * 240);
+                            Thread.sleep(1000 * this.backgroundCheckDelay); // default is 240 seconds
                         } catch (InterruptedException e) {
                             this.ignoreException(e);
                         }
                     } while ( this.running && this.queue.size() > this.loadThreshold );
                 }
             } while (this.running && loadSince > -1);
+            logger.debug("Finished background loading.");
         }
     }
 
@@ -1118,24 +1158,35 @@
         final long maxLoad = (since == -1 ? this.maxLoadJobs : this.maxLoadJobs - this.queue.size());
         // sanity check
         if ( maxLoad > 0 ) {
+            logger.debug("Loading from repository since {} and max {}", since, maxLoad);
             try {
                 final QueryManager qManager = this.backgroundSession.getWorkspace().getQueryManager();
                 final StringBuilder buffer = new StringBuilder("/jcr:root");
                 buffer.append(this.repositoryPath);
                 buffer.append("//element(*, ");
                 buffer.append(this.getEventNodeType());
+                buffer.append(") [not(@");
+                buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
                 buffer.append(")");
                 if ( since != -1 ) {
                     final Calendar beforeDate = Calendar.getInstance();
                     beforeDate.setTimeInMillis(since);
                     final String dateString = ISO8601.format(beforeDate);
-                    buffer.append("[@");
+                    buffer.append(" and @");
                     buffer.append(EventHelper.NODE_PROPERTY_CREATED);
                     buffer.append(" >= xs:dateTime('");
                     buffer.append(dateString);
-                    buffer.append("')]");
+                    buffer.append("')");
                 }
-                buffer.append(" order by @");
+                final Calendar startDate = Calendar.getInstance();
+                startDate.setTimeInMillis(this.startTime);
+                final String dateString = ISO8601.format(startDate);
+                buffer.append(" and @");
+                buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+                buffer.append(" < xs:dateTime('");
+                buffer.append(dateString);
+                buffer.append("')");
+                buffer.append("] order by @");
                 buffer.append(EventHelper.NODE_PROPERTY_CREATED);
                 buffer.append(" ascending");
                 final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
@@ -1173,6 +1224,7 @@
                 if ( !result.hasNext() ) {
                     eventCreated = -1;
                 }
+                logger.debug("Loaded {} jobs and new since {}", count, eventCreated);
             } catch (RepositoryException re) {
                 this.logger.error("Exception during initial loading of stored jobs.", re);
             }

Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=895980&r1=895979&r2=895980&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Tue Jan  5 10:52:53 2010
@@ -74,6 +74,21 @@
 max.parallel.jobs.description = The maximum number of parallel jobs started for the main \
  queue.
 
+max.load.jobs.name = Max Load Jobs
+max.load.jobs.description = The maximum amount of jobs being loaded from the repository on startup. \
+ Default is 1000 jobs.
+
+load.threshold.name = Load Threshold
+load.threshold.description = If the queue is lower than this threshold the repository is checked \
+ for events. The default value is 400. This works together with the maximum load jobs.
+
+load.delay.name = Background Load Delay
+load.delay.description = The background loader waits this time of seconds after startup before \
+ loading events from the repository. Default value is 30 seconds.
+
+load.checkdelay.name = Background Check Delay
+load.checkdelay.description = The background loader sleeps this time of seconds before \
+ checking the repository for jobs. Default value is 240 seconds.
 
 #
 # Event Pool