You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/15 08:53:30 UTC

svn commit: r1022833 - in /sling/branches/eventing-3.0/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/config/ main/java/org/apache/sling/event/impl/jobs/console/ main/java/org/apache/sling/event/impl/jobs/qu...

Author: cziegeler
Date: Fri Oct 15 06:53:29 2010
New Revision: 1022833

URL: http://svn.apache.org/viewvc?rev=1022833&view=rev
Log:
Allow topic matching with trailing slash (like event admin topic configs)
Remove all starts a background thread for actual removing
Create only three level hierarchy for jobs
Add a method to retrieve queue state info in a generic way

Modified:
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
    sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
    sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Fri Oct 15 06:53:29 2010
@@ -128,22 +128,26 @@ public class Utility {
             // we create an md from the job id - we use the first 6 bytes to
             // create sub directories
             final String md5 = md5(jobId);
-            sb.append(md5.substring(0, 2));
+            sb.append(md5.charAt(0));
+            sb.append(md5.charAt(1));
+            sb.append(md5.charAt(2));
             sb.append('/');
-            sb.append(md5.substring(2, 4));
-            sb.append('/');
-            sb.append(md5.substring(4, 6));
+            sb.append(md5.charAt(3));
+            sb.append(md5.charAt(4));
+            sb.append(md5.charAt(5));
             sb.append('/');
             sb.append(filter(jobId));
         } else {
             // create a path from the uuid - we use the first 6 bytes to
             // create sub directories
             final String uuid = UUID.randomUUID().toString();
-            sb.append(uuid.substring(0, 2));
-            sb.append('/');
-            sb.append(uuid.substring(2, 4));
+            sb.append(uuid.charAt(0));
+            sb.append(uuid.charAt(1));
+            sb.append(uuid.charAt(2));
             sb.append('/');
-            sb.append(uuid.substring(5, 7));
+            sb.append(uuid.charAt(3));
+            sb.append(uuid.charAt(5));
+            sb.append(uuid.charAt(6));
             sb.append("/Job_");
             sb.append(uuid.substring(8, 17));
         }

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Fri Oct 15 06:53:29 2010
@@ -158,9 +158,9 @@ public class InternalQueueConfiguration
                 }
                 if ( value != null && value.length() > 0 ) {
                     if ( value.endsWith(".") ) {
-                        newMatchers[i] = new PackageMatcher(value.substring(0, value.length() - 1));
+                        newMatchers[i] = new PackageMatcher(value);
                     } else if ( value.endsWith("*") ) {
-                        newMatchers[i] = new SubPackageMatcher(value.substring(0, value.length() - 1));
+                        newMatchers[i] = new SubPackageMatcher(value);
                     } else {
                         newMatchers[i] = new ClassMatcher(value);
                     }
@@ -404,37 +404,71 @@ public class InternalQueueConfiguration
             ", isValid=" + this.isValid() + "}";
     }
 
+    /**
+     * Internal interface for topic matching
+     */
     private static interface Matcher {
+        /** Check if the topic matches and return the variable part - null if not matching. */
         String match(String topic);
     }
+
+    /** Package matcher - the topic must be in the same package. */
     private static final class PackageMatcher implements Matcher {
+
         private final String packageName;
 
         public PackageMatcher(final String name) {
-            this.packageName = name;
+            // remove last char and maybe a trailing slash
+            int lastPos = name.length() - 1;
+            if ( lastPos > 0 && name.charAt(lastPos - 1) == '/' ) {
+                lastPos--;
+            }
+            this.packageName = name.substring(0, lastPos);
         }
+
+        /**
+         * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
+         */
         public String match(final String topic) {
             final int pos = topic.lastIndexOf('/');
             return pos > -1 && topic.substring(0, pos).equals(packageName) ? topic.substring(pos + 1) : null;
         }
     }
+
+    /** Sub package matcher - the topic must be in the same package or a sub package. */
     private static final class SubPackageMatcher implements Matcher {
         private final String packageName;
 
         public SubPackageMatcher(final String name) {
-            this.packageName = name + '/';
+            // remove last char and maybe a trailing slash
+            int lastPos = name.length() - 1;
+            if ( lastPos > 0 && name.charAt(lastPos - 1) == '/' ) {
+                this.packageName = name.substring(0, lastPos);
+            } else {
+                this.packageName = name.substring(0, lastPos) + '/';
+            }
         }
+
+        /**
+         * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
+         */
         public String match(final String topic) {
             final int pos = topic.lastIndexOf('/');
             return pos > -1 && topic.substring(0, pos + 1).startsWith(this.packageName) ? topic.substring(this.packageName.length()) : null;
         }
     }
+
+    /** The topic must match exactly. */
     private static final class ClassMatcher implements Matcher {
         private final String className;
 
         public ClassMatcher(final String name) {
             this.className = name;
         }
+
+        /**
+         * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
+         */
         public String match(String topic) {
             return this.className.equals(topic) ? "" : null;
         }

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java Fri Oct 15 06:53:29 2010
@@ -177,7 +177,7 @@ public class WebConsolePlugin extends Ht
             pw.printf("<tr><td>Processed Jobs</td><td>%s</td><td colspan='2'>&nbsp</td></tr>", s.getNumberOfProcessedJobs());
             pw.printf("<tr><td>Average Processing Time</td><td>%s</td><td colspan='2'>&nbsp</td></tr>", formatTime(s.getAverageProcessingTime()));
             pw.printf("<tr><td>Average Waiting Time</td><td>%s</td><td colspan='2'>&nbsp</td></tr>", formatTime(s.getAverageWaitingTime()));
-            pw.printf("<tr><td>Status Info</td><td colspan='3'>%s</td></tr>", escape(q.getStatusInfo()));
+            pw.printf("<tr><td>Status Info</td><td colspan='3'>%s</td></tr>", escape(q.getStateInfo()));
             pw.println("</tbody></table>");
             pw.println("<br/>");
         }
@@ -357,7 +357,7 @@ public class WebConsolePlugin extends Ht
             pw.printf("Processed Jobs : %s%n", s.getNumberOfProcessedJobs());
             pw.printf("Average Processing Time : %s%n", formatTime(s.getAverageProcessingTime()));
             pw.printf("Average Waiting Time : %s%n", formatTime(s.getAverageWaitingTime()));
-            pw.printf("Status Info : %s%n", q.getStatusInfo());
+            pw.printf("Status Info : %s%n", q.getStateInfo());
             pw.println("Configuration");
             pw.printf("Type : %s%n", formatType(c.getType()));
             pw.printf("Topics : %s%n", formatArrayAsText(c.getTopics()));

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Fri Oct 15 06:53:29 2010
@@ -19,6 +19,7 @@
 package org.apache.sling.event.impl.jobs.queues;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -105,7 +106,10 @@ public abstract class AbstractJobQueue
         this.environment = environment;
     }
 
-    public String getStatusInfo() {
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getStateInfo()
+     */
+    public String getStateInfo() {
         return "isWaiting=" + this.isWaiting + ", markedForCleanUp=" + this.markedForCleanUp + ", suspendedSince=" + this.suspendedSince.longValue();
     }
 
@@ -504,16 +508,6 @@ public abstract class AbstractJobQueue
         this.queueName = name;
     }
 
-    protected abstract JobEvent start(final JobEvent event);
-
-    protected abstract void put(final JobEvent event);
-
-    protected abstract JobEvent take();
-
-    protected abstract boolean isEmpty();
-
-    protected abstract void notifyFinished(final JobEvent rescheduleInfo);
-
     /**
      * Reschedule a job.
      */
@@ -556,16 +550,27 @@ public abstract class AbstractJobQueue
     /**
      * @see org.apache.sling.event.jobs.Queue#removeAll()
      */
-    public void removeAll() {
+    public synchronized void removeAll() {
+        // we suspend the queue
         final boolean wasSuspended = this.isSuspended();
         this.suspend();
-        while ( !this.isEmpty() ) {
-            final JobEvent event = this.take();
-            if ( event != null ) {
-                event.remove();
-            }
-        }
+        // we copy all events and remove them in the background
+        final Collection<JobEvent> events = this.removeAllJobs();
         this.clearQueued();
+        final Thread t = new Thread(new Runnable() {
+
+                /**
+                 * @see java.lang.Runnable#run()
+                 */
+                public void run() {
+                    for(final JobEvent job : events) {
+                        job.remove();
+                    }
+                }
+            }, "Queue RemoveAll Thread for " + this.queueName);
+        t.setDaemon(true);
+        t.start();
+        // start queue again
         if ( !wasSuspended ) {
             this.resume();
         }
@@ -577,5 +582,37 @@ public abstract class AbstractJobQueue
     public void clear() {
         this.clearQueued();
     }
+
+    /**
+     * @see org.apache.sling.event.jobs.Queue#getState(java.lang.String)
+     */
+    public Object getState(final String key) {
+        // not supported for now
+        return null;
+    }
+
+    /**
+     * Put another job into the queue.
+     */
+    protected abstract void put(final JobEvent event);
+
+    /**
+     * Get another job from the queue.
+     */
+    protected abstract JobEvent take();
+
+    /**
+     * Is the queue empty?
+     */
+    protected abstract boolean isEmpty();
+
+    /**
+     * Remove all events from the queue and return them.
+     */
+    protected abstract Collection<JobEvent> removeAllJobs();
+
+    protected abstract JobEvent start(final JobEvent event);
+
+    protected abstract void notifyFinished(final JobEvent rescheduleInfo);
 }
 

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Fri Oct 15 06:53:29 2010
@@ -45,8 +45,8 @@ public abstract class AbstractParallelJo
     }
 
     @Override
-    public String getStatusInfo() {
-        return super.getStatusInfo() + ", jobCount=" + this.jobCount;
+    public String getStateInfo() {
+        return super.getStateInfo() + ", jobCount=" + this.jobCount;
     }
 
     @Override

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Fri Oct 15 06:53:29 2010
@@ -18,6 +18,9 @@
  */
 package org.apache.sling.event.impl.jobs.queues;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -38,7 +41,7 @@ public final class OrderedJobQueue exten
     private JobEvent jobEvent;
 
     /** Marker indicating that this queue is currently sleeping. */
-    private volatile boolean isSleeping = false;
+    private volatile long isSleepingUntil = -1;
 
     /** The sleeping thread. */
     private volatile Thread sleepingThread;
@@ -53,8 +56,8 @@ public final class OrderedJobQueue exten
     }
 
     @Override
-    public String getStatusInfo() {
-        return super.getStatusInfo() + ", isSleeping=" + this.isSleeping;
+    public String getStateInfo() {
+        return super.getStateInfo() + ", isSleepingUntil=" + this.isSleepingUntil;
     }
 
     @Override
@@ -68,21 +71,19 @@ public final class OrderedJobQueue exten
         return rescheduleInfo;
     }
 
-    private void setSleeping(boolean flag) {
-        this.isSleeping = flag;
-        if ( !flag ) {
-            this.sleepingThread = null;
-        }
+    private void setNotSleeping() {
+        this.isSleepingUntil = -1;
+        this.sleepingThread = null;
     }
 
-    private void setSleeping(Thread sleepingThread) {
+    private void setSleeping(final Thread sleepingThread, final long delay) {
         this.sleepingThread = sleepingThread;
-        this.setSleeping(true);
+        this.isSleepingUntil = System.currentTimeMillis() + delay;
     }
 
     @Override
     public void resume() {
-        if ( this.isSleeping ) {
+        if ( this.isSleepingUntil == -1 ) {
             final Thread thread = this.sleepingThread;
             if ( thread != null ) {
                 thread.interrupt();
@@ -158,14 +159,14 @@ public final class OrderedJobQueue exten
             delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
         }
         if ( delay > 0 ) {
-            setSleeping(Thread.currentThread());
+            this.setSleeping(Thread.currentThread(), delay);
             try {
                 this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
                 Thread.sleep(delay);
             } catch (InterruptedException e) {
                 this.ignoreException(e);
             } finally {
-                setSleeping(false);
+                this.setNotSleeping();
             }
         }
         return info;
@@ -180,9 +181,24 @@ public final class OrderedJobQueue exten
     }
 
     @Override
-    public void removeAll() {
+    public synchronized void removeAll() {
         this.jobEvent = null;
         super.removeAll();
     }
+
+    @Override
+    protected Collection<JobEvent> removeAllJobs() {
+        final List<JobEvent> events = new ArrayList<JobEvent>(this.queue);
+        this.queue.clear();
+        return events;
+    }
+
+    @Override
+    public Object getState(final String key) {
+        if ( "isSleepingUntil".equals(key) ) {
+            return this.isSleepingUntil;
+        }
+        return super.getState(key);
+    }
 }
 

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Fri Oct 15 06:53:29 2010
@@ -18,6 +18,9 @@
  */
 package org.apache.sling.event.impl.jobs.queues;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -75,5 +78,12 @@ public final class ParallelJobQueue exte
         this.queue.clear();
         super.clear();
     }
+
+    @Override
+    protected Collection<JobEvent> removeAllJobs() {
+        final List<JobEvent> events = new ArrayList<JobEvent>(this.queue);
+        this.queue.clear();
+        return events;
+    }
 }
 

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Fri Oct 15 06:53:29 2010
@@ -19,6 +19,7 @@
 package org.apache.sling.event.impl.jobs.queues;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -59,8 +60,8 @@ public final class TopicRoundRobinJobQue
     }
 
     @Override
-    public String getStatusInfo() {
-        return super.getStatusInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext=" + this.isWaitingForNext;
+    public String getStateInfo() {
+        return super.getStateInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext=" + this.isWaitingForNext;
     }
 
     @Override
@@ -143,5 +144,19 @@ public final class TopicRoundRobinJobQue
         }
         super.clear();
     }
+
+    @Override
+    protected Collection<JobEvent> removeAllJobs() {
+        final List<JobEvent> events = new ArrayList<JobEvent>();
+        synchronized ( this.topicMap ) {
+            for(final List<JobEvent> l : this.topicMap.values() ) {
+                events.addAll(l);
+            }
+            this.eventCount = 0;
+            this.topics.clear();
+            this.topicMap.clear();
+        }
+        return events;
+    }
 }
 

Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java Fri Oct 15 06:53:29 2010
@@ -36,11 +36,6 @@ public interface Queue {
     Statistics getStatistics();
 
     /**
-     * Return some information about the current status of the queue.
-     */
-    String getStatusInfo();
-
-    /**
      * Get the corresponding configuration.
      */
     QueueConfiguration getConfiguration();
@@ -83,4 +78,17 @@ public interface Queue {
      * all outstanding jobs (but no notifications are send).
      */
     void removeAll();
+
+    /**
+     * Return some information about the current state of the queue. This
+     * method is meant to see the internal state of the queue for debugging
+     * or monitoring purposes.
+     */
+    String getStateInfo();
+
+    /**
+     * For monitoring purposes and possible extensions from the different
+     * queue types. This method allows to query state information.
+     */
+    Object getState(final String key);
 }

Modified: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java (original)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java Fri Oct 15 06:53:29 2010
@@ -77,7 +77,7 @@ public class InternalQueueConfigurationT
         assertFalse(c.match(getJobEvent("t/x")));
     }
 
-    @org.junit.Test public void testTopicMatchersStart() {
+    @org.junit.Test public void testTopicMatchersStar() {
         final Map<String, Object> p = new HashMap<String, Object>();
         p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a*"});
         p.put(ConfigurationConstants.PROP_NAME, "test");
@@ -122,6 +122,51 @@ public class InternalQueueConfigurationT
         assertEquals("test-queue-d", d.queueName);
     }
 
+    @org.junit.Test public void testTopicMatchersDotAndSlash() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/."});
+        p.put(ConfigurationConstants.PROP_NAME, "test");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertTrue(c.isValid());
+        assertTrue(c.match(getJobEvent("a/b")));
+        assertTrue(c.match(getJobEvent("a/c")));
+        assertFalse(c.match(getJobEvent("a")));
+        assertFalse(c.match(getJobEvent("a/b/c")));
+        assertFalse(c.match(getJobEvent("t")));
+        assertFalse(c.match(getJobEvent("t/x")));
+    }
+
+    @org.junit.Test public void testTopicMatchersStarAndSlash() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/*"});
+        p.put(ConfigurationConstants.PROP_NAME, "test");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertTrue(c.isValid());
+        assertTrue(c.match(getJobEvent("a/b")));
+        assertTrue(c.match(getJobEvent("a/c")));
+        assertFalse(c.match(getJobEvent("a")));
+        assertTrue(c.match(getJobEvent("a/b/c")));
+        assertFalse(c.match(getJobEvent("t")));
+        assertFalse(c.match(getJobEvent("t/x")));
+    }
+
+    @org.junit.Test public void testTopicMatcherAndReplacementAndSlash() {
+        final Map<String, Object> p = new HashMap<String, Object>();
+        p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/."});
+        p.put(ConfigurationConstants.PROP_NAME, "test-queue-{0}");
+
+        InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+        assertTrue(c.isValid());
+        final JobEvent b = getJobEvent("a/b");
+        assertTrue(c.match(b));
+        assertEquals("test-queue-b", b.queueName);
+        final JobEvent d = getJobEvent("a/d");
+        assertTrue(c.match(d));
+        assertEquals("test-queue-d", d.queueName);
+    }
+
     @org.junit.Test public void testNoTopicMatchers() {
         final Map<String, Object> p = new HashMap<String, Object>();
         p.put(ConfigurationConstants.PROP_NAME, "test");