You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/19 17:44:51 UTC

svn commit: r1024285 - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/impl/jobs/jcr/ src/test/java/org/apache/sling/event/impl/jobs/

Author: cziegeler
Date: Tue Oct 19 15:44:50 2010
New Revision: 1024285

URL: http://svn.apache.org/viewvc?rev=1024285&view=rev
Log:
Ignore emma file
Add new tests for drop and ignore queue
Fix drop queue handling

Added:
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java   (with props)
Modified:
    sling/trunk/bundles/extensions/event/   (props changed)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java

Propchange: sling/trunk/bundles/extensions/event/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Tue Oct 19 15:44:50 2010
@@ -12,3 +12,5 @@ derby.log
 .classpath
 .externalToolBuilders
 maven-eclipse.xml
+
+coverage.em

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1024285&r1=1024284&r2=1024285&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Tue Oct 19 15:44:50 2010
@@ -510,10 +510,6 @@ public class PersistenceHandler implemen
                 Event event = null;
                 try {
                     event = this.readEvent(eventNode, false);
-                    if ( shouldProcess ) {
-                         this.process(event);
-                    }
-
                 } catch (ClassNotFoundException cnfe) {
                     // store path for lazy loading
                     synchronized ( unloadedJobSet ) {
@@ -527,6 +523,7 @@ public class PersistenceHandler implemen
                 if ( event == null ) {
                     try {
                         event = this.readEvent(eventNode, true);
+                        shouldProcess = false;
                     } catch (ClassNotFoundException cnfe) {
                         // this can't occur
                     } catch (RepositoryException re) {
@@ -535,9 +532,11 @@ public class PersistenceHandler implemen
                 }
                 if ( event != null ) {
                     ((DefaultJobManager)this.jobManager).notifyAddJob(new JCRJobEvent(event, this));
+                    if ( shouldProcess ) {
+                        this.process(event);
+                    }
                 }
                 return shouldProcess && event != null;
-
             }
             // if the node is finished, this is usually an unlock event
             ((DefaultJobManager)this.jobManager).notifyRemoveJob(nodePath.substring(this.repositoryPath.length() + 1));

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java?rev=1024285&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java Tue Oct 19 15:44:50 2010
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.SimpleEventAdmin;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.jcr.PersistenceHandler;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobProcessor;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JMock.class)
+public class DropQueueTest extends AbstractJobEventHandlerTest {
+
+    private static final String QUEUE_NAME = "orderedtest";
+    private static final String TOPIC = "sling/test";
+    private static int NUM_JOBS = 10;
+
+    protected Mockery context;
+
+    private InternalQueueConfiguration mainConfiguration;
+
+    public DropQueueTest() {
+        this.context = new JUnit4Mockery();
+    }
+
+    @Override
+    protected Mockery getMockery() {
+        return this.context;
+    }
+
+    @Override
+    protected Hashtable<String, Object> getComponentConfig() {
+        final Hashtable<String, Object> config =  super.getComponentConfig();
+        config.put("cleanup.period", 1); // set clean up to 1 minute
+        config.put("load.delay", 1); // load delay to 1 sec
+        return config;
+    }
+
+    private void createConfiguration(final QueueConfiguration.Type type) {
+        // create a new dictionary with the missing info and do some sanety puts
+        final Map<String, Object> queueProps = new HashMap<String, Object>();
+        queueProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC);
+        queueProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+        queueProps.put(ConfigurationConstants.PROP_TYPE, type);
+        queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, new Integer(3));
+
+        this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
+    }
+
+    @Override
+    protected QueueConfigurationManager createQueueConfigManager() {
+        this.createConfiguration(QueueConfiguration.Type.DROP);
+        return new QueueConfigurationManager() {
+
+            @Override
+            public InternalQueueConfiguration[] getConfigurations() {
+                return new InternalQueueConfiguration[] {mainConfiguration};
+            }
+        };
+    }
+
+    /**
+     * Helper method to create a job event.
+     */
+    private Event getJobEvent() {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(JobUtil.PROPERTY_JOB_TOPIC, TOPIC);
+        return new Event(JobUtil.TOPIC_JOB, props);
+    }
+
+    @org.junit.Test public void testDroppingQueue() throws Exception {
+        final PersistenceHandler jeh = this.handler;
+
+        // set new event admin
+        final AtomicInteger count = new AtomicInteger(0);
+        setEventAdmin(new SimpleEventAdmin(new String[] {TOPIC },
+                new EventHandler[] {
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            JobUtil.processJob(event, new JobProcessor() {
+
+                                public boolean process(Event job) {
+                                    count.incrementAndGet();
+                                    return true;
+                                }
+                            });
+                        }
+                    }}));
+        // we start "some" jobs:
+        for(int i = 0; i < NUM_JOBS; i++ ) {
+            jeh.handleEvent(getJobEvent());
+        }
+        // we wait a little bit
+        Thread.sleep(400);
+        // no jobs queued, none processed and no available
+        assertEquals(0, this.jobManager.getStatistics().getNumberOfQueuedJobs());
+        assertEquals(0, count.get());
+        assertEquals(0, this.jobManager.queryJobs(JobManager.QueryType.ALL, TOPIC).getSize());
+
+        // let'see if restarting helps
+        this.createConfiguration(QueueConfiguration.Type.UNORDERED);
+        this.jobManager.restart();
+        // we wait a little bit
+        Thread.sleep(400);
+        // no jobs queued, none processed and no available
+        assertEquals(0, this.jobManager.getStatistics().getNumberOfQueuedJobs());
+        assertEquals(0, count.get());
+        assertEquals(0, this.jobManager.queryJobs(JobManager.QueryType.ALL, TOPIC).getSize());
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java?rev=1024285&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java Tue Oct 19 15:44:50 2010
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.impl.SimpleEventAdmin;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.jcr.PersistenceHandler;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobProcessor;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(JMock.class)
+public class IgnoreQueueTest extends AbstractJobEventHandlerTest {
+
+    private static final String QUEUE_NAME = "orderedtest";
+    private static final String TOPIC = "sling/test";
+    private static int NUM_JOBS = 10;
+
+    protected Mockery context;
+
+    private InternalQueueConfiguration mainConfiguration;
+
+    public IgnoreQueueTest() {
+        this.context = new JUnit4Mockery();
+    }
+
+    @Override
+    protected Mockery getMockery() {
+        return this.context;
+    }
+
+    @Override
+    protected Hashtable<String, Object> getComponentConfig() {
+        final Hashtable<String, Object> config =  super.getComponentConfig();
+        config.put("cleanup.period", 1); // set clean up to 1 minute
+        config.put("load.delay", 1); // load delay to 1 sec
+        return config;
+    }
+
+    private void createConfiguration(final QueueConfiguration.Type type) {
+        // create a new dictionary with the missing info and do some sanety puts
+        final Map<String, Object> queueProps = new HashMap<String, Object>();
+        queueProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC);
+        queueProps.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+        queueProps.put(ConfigurationConstants.PROP_TYPE, type);
+        queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, new Integer(3));
+
+        this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
+    }
+
+    @Override
+    protected QueueConfigurationManager createQueueConfigManager() {
+        this.createConfiguration(QueueConfiguration.Type.IGNORE);
+        return new QueueConfigurationManager() {
+
+            @Override
+            public InternalQueueConfiguration[] getConfigurations() {
+                return new InternalQueueConfiguration[] {mainConfiguration};
+            }
+        };
+    }
+
+    /**
+     * Helper method to create a job event.
+     */
+    private Event getJobEvent() {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(JobUtil.PROPERTY_JOB_TOPIC, TOPIC);
+        return new Event(JobUtil.TOPIC_JOB, props);
+    }
+
+    @org.junit.Test public void testDroppingQueue() throws Exception {
+        final PersistenceHandler jeh = this.handler;
+
+        // set new event admin
+        final AtomicInteger count = new AtomicInteger(0);
+        setEventAdmin(new SimpleEventAdmin(new String[] {TOPIC },
+                new EventHandler[] {
+                    new EventHandler() {
+                        public void handleEvent(final Event event) {
+                            JobUtil.processJob(event, new JobProcessor() {
+
+                                public boolean process(Event job) {
+                                    count.incrementAndGet();
+                                    return true;
+                                }
+                            });
+                        }
+                    }}));
+        // we start "some" jobs:
+        for(int i = 0; i < NUM_JOBS; i++ ) {
+            jeh.handleEvent(getJobEvent());
+        }
+        // we wait a little bit
+        Thread.sleep(400);
+        // no jobs queued, none processed but available
+        assertEquals(0, this.jobManager.getStatistics().getNumberOfQueuedJobs());
+        assertEquals(0, this.jobManager.getStatistics().getNumberOfProcessedJobs());
+        assertEquals(0, count.get());
+        assertEquals(NUM_JOBS, this.jobManager.queryJobs(JobManager.QueryType.ALL, TOPIC).getSize());
+
+        // let'see if restarting helps
+        this.createConfiguration(QueueConfiguration.Type.UNORDERED);
+        this.jobManager.restart();
+        // we wait
+        while ( count.get() < NUM_JOBS ) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        // no jobs queued, but processed and not available
+        assertEquals(0, this.jobManager.getStatistics().getNumberOfQueuedJobs());
+        assertEquals(NUM_JOBS, this.jobManager.getStatistics().getNumberOfProcessedJobs());
+        assertEquals(NUM_JOBS, count.get());
+        assertEquals(0, this.jobManager.queryJobs(JobManager.QueryType.ALL, TOPIC).getSize());
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain