You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/15 15:42:10 UTC

svn commit: r1022914 - in /sling/branches/eventing-3.0/src: main/java/org/apache/sling/event/impl/ main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/config/ main/java/org/apache/sling/event/impl/jobs/jcr/ main/java/...

Author: cziegeler
Date: Fri Oct 15 13:42:09 2010
New Revision: 1022914

URL: http://svn.apache.org/viewvc?rev=1022914&view=rev
Log:
Minor improvements, code quality, add ordered job queue test

Added:
    sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/OrderedJobQueueTest.java   (with props)
Modified:
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1022914&r1=1022913&r2=1022914&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Fri Oct 15 13:42:09 2010
@@ -42,7 +42,7 @@ import org.osgi.service.component.Compon
 public class EventingThreadPool implements ThreadPool {
 
     @Reference
-    protected ThreadPoolManager threadPoolManager;
+    private ThreadPoolManager threadPoolManager;
 
     /** The real thread pool used. */
     private org.apache.sling.commons.threads.ThreadPool threadPool;

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java?rev=1022914&r1=1022913&r2=1022914&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java Fri Oct 15 13:42:09 2010
@@ -25,11 +25,15 @@ public interface JobStatusNotifier {
     String CONTEXT_PROPERTY_NAME = JobStatusNotifier.class.getName();
 
     class NotifierContext {
-        public final JobStatusNotifier notifier;
+        private final JobStatusNotifier notifier;
 
-        public NotifierContext(JobStatusNotifier n) {
+        public NotifierContext(final JobStatusNotifier n) {
             this.notifier = n;
         }
+
+        public JobStatusNotifier getJobStatusNotifier() {
+            return this.notifier;
+        }
     }
 
     /**

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1022914&r1=1022913&r2=1022914&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Fri Oct 15 13:42:09 2010
@@ -31,7 +31,7 @@ import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.event.EventConstants;
 
-public class Utility {
+public abstract class Utility {
 
     /** Allowed characters for a node name */
     private static final String ALLOWED_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz0123456789_,.-+*#!ยค$%&()=[]?";
@@ -76,7 +76,7 @@ public class Utility {
     /**
      * used for the md5
      */
-    public static final char[] hexTable = "0123456789abcdef".toCharArray();
+    private static final char[] HEX_TABLE = "0123456789abcdef".toCharArray();
 
     /**
      * Calculate an MD5 hash of the string given using 'utf-8' encoding.
@@ -111,8 +111,8 @@ public class Utility {
         StringBuilder res = new StringBuilder(digest.length * 2);
         for (int i = 0; i < digest.length; i++) {
             byte b = digest[i];
-            res.append(hexTable[(b >> 4) & 15]);
-            res.append(hexTable[b & 15]);
+            res.append(HEX_TABLE[(b >> 4) & 15]);
+            res.append(HEX_TABLE[b & 15]);
         }
         return res.toString();
     }

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1022914&r1=1022913&r2=1022914&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Fri Oct 15 13:42:09 2010
@@ -143,16 +143,16 @@ public class InternalQueueConfiguration
         } else {
             this.applicationIds = appIds;
         }
-        final String[] topics = OsgiUtil.toStringArray(params.get(ConfigurationConstants.PROP_TOPICS));
-        if ( topics == null
-             || topics.length == 0
-             || (topics.length == 1 && (topics[0] == null || topics[0].length() == 0))) {
+        final String[] topicsParam = OsgiUtil.toStringArray(params.get(ConfigurationConstants.PROP_TOPICS));
+        if ( topicsParam == null
+             || topicsParam.length == 0
+             || (topicsParam.length == 1 && (topicsParam[0] == null || topicsParam[0].length() == 0))) {
             matchers = null;
             this.topics = null;
         } else {
-            final Matcher[] newMatchers = new Matcher[topics.length];
-            for(int i=0; i < topics.length; i++) {
-                String value = topics[i];
+            final Matcher[] newMatchers = new Matcher[topicsParam.length];
+            for(int i=0; i < topicsParam.length; i++) {
+                String value = topicsParam[i];
                 if ( value != null ) {
                     value = value.trim();
                 }
@@ -167,7 +167,7 @@ public class InternalQueueConfiguration
                 }
             }
             matchers = newMatchers;
-            this.topics = topics;
+            this.topics = topicsParam;
         }
         this.serviceRanking = OsgiUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
         this.pid = (String)params.get(Constants.SERVICE_PID);
@@ -276,8 +276,7 @@ public class InternalQueueConfiguration
                 if ( m != null ) {
                     final String rep = m.match(topic);
                     if ( rep != null ) {
-                        final String name = this.name.replace("{0}", rep);
-                        event.queueName = name;
+                        event.queueName = this.name.replace("{0}", rep);
                         return true;
                     }
                 }

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1022914&r1=1022913&r2=1022914&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Fri Oct 15 13:42:09 2010
@@ -134,7 +134,7 @@ public class PersistenceHandler implemen
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     /** The repository path. */
-    protected String repositoryPath;
+    private String repositoryPath;
 
     /** Is the background task still running? */
     private volatile boolean running;
@@ -731,6 +731,8 @@ public class PersistenceHandler implemen
         }
         if ( eventProps.get(JobUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
             eventProps.put(JobUtil.PROPERTY_JOB_RETRY_COUNT, Integer.valueOf(eventProps.get(JobUtil.PROPERTY_JOB_RETRY_COUNT).toString()));
+        } else {
+            eventProps.put(JobUtil.PROPERTY_JOB_RETRY_COUNT, new Integer(0));
         }
         // add application id
         eventProps.put(EventUtil.PROPERTY_APPLICATION, eventNode.getProperty(JCRHelper.NODE_PROPERTY_APPLICATION).getString());

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java?rev=1022914&r1=1022913&r2=1022914&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java Fri Oct 15 13:42:09 2010
@@ -182,7 +182,7 @@ public abstract class JobUtil {
     public static boolean acknowledgeJob(final Event job) {
         final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
         if ( ctx != null ) {
-            if ( !ctx.notifier.sendAcknowledge(job) ) {
+            if ( !ctx.getJobStatusNotifier().sendAcknowledge(job) ) {
                 // if we don't get an ack, someone else is already processing this job.
                 // we process but do not notify the job event handler.
                 LoggerFactory.getLogger(JobUtil.class).info("Someone else is already processing job {}.", job);
@@ -200,7 +200,7 @@ public abstract class JobUtil {
     public static void finishedJob(final Event job) {
         final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
         if ( ctx != null ) {
-            ctx.notifier.finishedJob(job, false);
+            ctx.getJobStatusNotifier().finishedJob(job, false);
         }
     }
 
@@ -212,7 +212,7 @@ public abstract class JobUtil {
     public static boolean rescheduleJob(final Event job) {
         final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
         if ( ctx != null ) {
-            return ctx.notifier.finishedJob(job, true);
+            return ctx.getJobStatusNotifier().finishedJob(job, true);
         }
         return false;
     }
@@ -227,7 +227,7 @@ public abstract class JobUtil {
         boolean notify = true;
         final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
         if ( ctx != null ) {
-            if ( !ctx.notifier.sendAcknowledge(job) ) {
+            if ( !ctx.getJobStatusNotifier().sendAcknowledge(job) ) {
                 // if we don't get an ack, someone else is already processing this job.
                 // we process but do not notify the job event handler.
                 LoggerFactory.getLogger(JobUtil.class).info("Someone else is already processing job {}.", job);

Added: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/OrderedJobQueueTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/OrderedJobQueueTest.java?rev=1022914&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/OrderedJobQueueTest.java (added)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/OrderedJobQueueTest.java Fri Oct 15 13:42:09 2010
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.Barrier;
+import org.apache.sling.event.impl.SimpleEventAdmin;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.jcr.PersistenceHandler;
+import org.apache.sling.event.jobs.JobProcessor;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JMock.class)
+public class OrderedJobQueueTest extends AbstractJobEventHandlerTest {
+
+    private static final String QUEUE_NAME = "orderedtest";
+    private static final String TOPIC = "sling/test";
+    private static int NUM_JOBS = 30;
+
+    protected Mockery context;
+
+    public OrderedJobQueueTest() {
+        this.context = new JUnit4Mockery();
+    }
+
+    @Override
+    protected Mockery getMockery() {
+        return this.context;
+    }
+
+    @Override
+    protected Hashtable<String, Object> getComponentConfig() {
+        final Hashtable<String, Object> config =  super.getComponentConfig();
+        config.put("cleanup.period", 1); // set clean up to 1 minute
+        config.put("load.delay", 1); // load delay to 1 sec
+        return config;
+    }
+
+    @Override
+    protected QueueConfigurationManager createQueueConfigManager() {
+        // create a new dictionary with the missing info and do some sanety puts
+        final Map<String, Object> queueProps = new HashMap<String, Object>();
+        queueProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC + "/*");
+        queueProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+        queueProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED);
+
+        final InternalQueueConfiguration mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
+        return new QueueConfigurationManager() {
+
+            @Override
+            public InternalQueueConfiguration[] getConfigurations() {
+                return new InternalQueueConfiguration[] {mainConfiguration};
+            }
+        };
+    }
+
+    /**
+     * Helper method to create a job event.
+     */
+    private Event getJobEvent(final String subTopic, final String id) {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(JobUtil.PROPERTY_JOB_TOPIC, TOPIC + '/' + subTopic);
+        if ( id != null ) {
+            props.put(JobUtil.PROPERTY_JOB_NAME, id);
+        }
+        return new Event(JobUtil.TOPIC_JOB, props);
+    }
+
+    /**
+     * Helper method to create a job event.
+     */
+    private Event getJobEvent(final String subTopic) {
+        return this.getJobEvent(subTopic, null);
+    }
+
+    @org.junit.Test public void testOrderedQueue() throws Exception {
+        final PersistenceHandler jeh = this.handler;
+
+        // we first send one event to get the queue started
+        final Barrier cb = new Barrier(2);
+        setEventAdmin(new SimpleEventAdmin(new String[] {TOPIC + '*'},
+                new EventHandler[] {
+                    new EventHandler() {
+                        public void handleEvent(Event event) {
+                            JobUtil.acknowledgeJob(event);
+                            JobUtil.finishedJob(event);
+                            cb.block();
+                        }
+
+                    }
+                }));
+        jeh.handleEvent(getJobEvent("a"));
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+
+        // get the queue
+        final Queue q = this.jobManager.getQueue(QUEUE_NAME);
+        assertNotNull("Queue should exist!", q);
+        // suspend it
+        q.suspend();
+        // set new event admin
+        final AtomicInteger count = new AtomicInteger(0);
+        final AtomicInteger parallelCount = new AtomicInteger(0);
+        setEventAdmin(new SimpleEventAdmin(new String[] {TOPIC + '*',
+                JobUtil.TOPIC_JOB_FINISHED},
+                new EventHandler[] {
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            JobUtil.processJob(event, new JobProcessor() {
+
+                                public boolean process(Event job) {
+                                    if ( parallelCount.incrementAndGet() > 1 ) {
+                                        return false;
+                                    }
+                                    final String topic = (String)job.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+                                    if ( topic.endsWith("sub1") ) {
+                                        final int i = (Integer)job.getProperty(JobUtil.PROPERTY_JOB_RETRY_COUNT);
+                                        if ( i == 0 ) {
+                                            parallelCount.decrementAndGet();
+                                            return false;
+                                        }
+                                    }
+                                    try {
+                                        Thread.sleep(30);
+                                    } catch (InterruptedException ie) {
+                                        // ignore
+                                    }
+                                    parallelCount.decrementAndGet();
+                                    return true;
+                                }
+                            });
+                        }
+                    },
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            count.incrementAndGet();
+                        }
+                    }}));
+        // we start "some" jobs:
+        for(int i = 0; i < NUM_JOBS; i++ ) {
+            final String subTopic = "sub" + (i % 10);
+            jeh.handleEvent(getJobEvent(subTopic));
+        }
+        // start the queue
+        q.resume();
+        while ( count.get() < NUM_JOBS ) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        assertEquals("Finished count", NUM_JOBS, count.get());
+        // we started one event before the test, so add one
+        assertEquals("Finished count", NUM_JOBS + 1, this.jobManager.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Finished count", NUM_JOBS + 1, q.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Failed count", NUM_JOBS / 10, q.getStatistics().getNumberOfFailedJobs());
+        assertEquals("Cancelled count", 0, q.getStatistics().getNumberOfCancelledJobs());
+    }
+}

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/OrderedJobQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/OrderedJobQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/OrderedJobQueueTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain