You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/15 08:53:30 UTC
svn commit: r1022833 - in /sling/branches/eventing-3.0/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/jobs/config/
main/java/org/apache/sling/event/impl/jobs/console/
main/java/org/apache/sling/event/impl/jobs/qu...
Author: cziegeler
Date: Fri Oct 15 06:53:29 2010
New Revision: 1022833
URL: http://svn.apache.org/viewvc?rev=1022833&view=rev
Log:
Allow topic matching with trailing slash (like event admin topic configs)
Remove all starts a background thread for actual removing
Create only three level hierarchy for jobs
Add a method to retrieve queue state info in a generic way
Modified:
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Fri Oct 15 06:53:29 2010
@@ -128,22 +128,26 @@ public class Utility {
// we create an md from the job id - we use the first 6 bytes to
// create sub directories
final String md5 = md5(jobId);
- sb.append(md5.substring(0, 2));
+ sb.append(md5.charAt(0));
+ sb.append(md5.charAt(1));
+ sb.append(md5.charAt(2));
sb.append('/');
- sb.append(md5.substring(2, 4));
- sb.append('/');
- sb.append(md5.substring(4, 6));
+ sb.append(md5.charAt(3));
+ sb.append(md5.charAt(4));
+ sb.append(md5.charAt(5));
sb.append('/');
sb.append(filter(jobId));
} else {
// create a path from the uuid - we use the first 6 bytes to
// create sub directories
final String uuid = UUID.randomUUID().toString();
- sb.append(uuid.substring(0, 2));
- sb.append('/');
- sb.append(uuid.substring(2, 4));
+ sb.append(uuid.charAt(0));
+ sb.append(uuid.charAt(1));
+ sb.append(uuid.charAt(2));
sb.append('/');
- sb.append(uuid.substring(5, 7));
+ sb.append(uuid.charAt(3));
+ sb.append(uuid.charAt(5));
+ sb.append(uuid.charAt(6));
sb.append("/Job_");
sb.append(uuid.substring(8, 17));
}
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Fri Oct 15 06:53:29 2010
@@ -158,9 +158,9 @@ public class InternalQueueConfiguration
}
if ( value != null && value.length() > 0 ) {
if ( value.endsWith(".") ) {
- newMatchers[i] = new PackageMatcher(value.substring(0, value.length() - 1));
+ newMatchers[i] = new PackageMatcher(value);
} else if ( value.endsWith("*") ) {
- newMatchers[i] = new SubPackageMatcher(value.substring(0, value.length() - 1));
+ newMatchers[i] = new SubPackageMatcher(value);
} else {
newMatchers[i] = new ClassMatcher(value);
}
@@ -404,37 +404,71 @@ public class InternalQueueConfiguration
", isValid=" + this.isValid() + "}";
}
+ /**
+ * Internal interface for topic matching
+ */
private static interface Matcher {
+ /** Check if the topic matches and return the variable part - null if not matching. */
String match(String topic);
}
+
+ /** Package matcher - the topic must be in the same package. */
private static final class PackageMatcher implements Matcher {
+
private final String packageName;
public PackageMatcher(final String name) {
- this.packageName = name;
+ // remove last char and maybe a trailing slash
+ int lastPos = name.length() - 1;
+ if ( lastPos > 0 && name.charAt(lastPos - 1) == '/' ) {
+ lastPos--;
+ }
+ this.packageName = name.substring(0, lastPos);
}
+
+ /**
+ * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
+ */
public String match(final String topic) {
final int pos = topic.lastIndexOf('/');
return pos > -1 && topic.substring(0, pos).equals(packageName) ? topic.substring(pos + 1) : null;
}
}
+
+ /** Sub package matcher - the topic must be in the same package or a sub package. */
private static final class SubPackageMatcher implements Matcher {
private final String packageName;
public SubPackageMatcher(final String name) {
- this.packageName = name + '/';
+ // remove last char and maybe a trailing slash
+ int lastPos = name.length() - 1;
+ if ( lastPos > 0 && name.charAt(lastPos - 1) == '/' ) {
+ this.packageName = name.substring(0, lastPos);
+ } else {
+ this.packageName = name.substring(0, lastPos) + '/';
+ }
}
+
+ /**
+ * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
+ */
public String match(final String topic) {
final int pos = topic.lastIndexOf('/');
return pos > -1 && topic.substring(0, pos + 1).startsWith(this.packageName) ? topic.substring(this.packageName.length()) : null;
}
}
+
+ /** The topic must match exactly. */
private static final class ClassMatcher implements Matcher {
private final String className;
public ClassMatcher(final String name) {
this.className = name;
}
+
+ /**
+ * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
+ */
public String match(String topic) {
return this.className.equals(topic) ? "" : null;
}
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java Fri Oct 15 06:53:29 2010
@@ -177,7 +177,7 @@ public class WebConsolePlugin extends Ht
pw.printf("<tr><td>Processed Jobs</td><td>%s</td><td colspan='2'> </td></tr>", s.getNumberOfProcessedJobs());
pw.printf("<tr><td>Average Processing Time</td><td>%s</td><td colspan='2'> </td></tr>", formatTime(s.getAverageProcessingTime()));
pw.printf("<tr><td>Average Waiting Time</td><td>%s</td><td colspan='2'> </td></tr>", formatTime(s.getAverageWaitingTime()));
- pw.printf("<tr><td>Status Info</td><td colspan='3'>%s</td></tr>", escape(q.getStatusInfo()));
+ pw.printf("<tr><td>Status Info</td><td colspan='3'>%s</td></tr>", escape(q.getStateInfo()));
pw.println("</tbody></table>");
pw.println("<br/>");
}
@@ -357,7 +357,7 @@ public class WebConsolePlugin extends Ht
pw.printf("Processed Jobs : %s%n", s.getNumberOfProcessedJobs());
pw.printf("Average Processing Time : %s%n", formatTime(s.getAverageProcessingTime()));
pw.printf("Average Waiting Time : %s%n", formatTime(s.getAverageWaitingTime()));
- pw.printf("Status Info : %s%n", q.getStatusInfo());
+ pw.printf("Status Info : %s%n", q.getStateInfo());
pw.println("Configuration");
pw.printf("Type : %s%n", formatType(c.getType()));
pw.printf("Topics : %s%n", formatArrayAsText(c.getTopics()));
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Fri Oct 15 06:53:29 2010
@@ -19,6 +19,7 @@
package org.apache.sling.event.impl.jobs.queues;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Iterator;
@@ -105,7 +106,10 @@ public abstract class AbstractJobQueue
this.environment = environment;
}
- public String getStatusInfo() {
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getStateInfo()
+ */
+ public String getStateInfo() {
return "isWaiting=" + this.isWaiting + ", markedForCleanUp=" + this.markedForCleanUp + ", suspendedSince=" + this.suspendedSince.longValue();
}
@@ -504,16 +508,6 @@ public abstract class AbstractJobQueue
this.queueName = name;
}
- protected abstract JobEvent start(final JobEvent event);
-
- protected abstract void put(final JobEvent event);
-
- protected abstract JobEvent take();
-
- protected abstract boolean isEmpty();
-
- protected abstract void notifyFinished(final JobEvent rescheduleInfo);
-
/**
* Reschedule a job.
*/
@@ -556,16 +550,27 @@ public abstract class AbstractJobQueue
/**
* @see org.apache.sling.event.jobs.Queue#removeAll()
*/
- public void removeAll() {
+ public synchronized void removeAll() {
+ // we suspend the queue
final boolean wasSuspended = this.isSuspended();
this.suspend();
- while ( !this.isEmpty() ) {
- final JobEvent event = this.take();
- if ( event != null ) {
- event.remove();
- }
- }
+ // we copy all events and remove them in the background
+ final Collection<JobEvent> events = this.removeAllJobs();
this.clearQueued();
+ final Thread t = new Thread(new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ for(final JobEvent job : events) {
+ job.remove();
+ }
+ }
+ }, "Queue RemoveAll Thread for " + this.queueName);
+ t.setDaemon(true);
+ t.start();
+ // start queue again
if ( !wasSuspended ) {
this.resume();
}
@@ -577,5 +582,37 @@ public abstract class AbstractJobQueue
public void clear() {
this.clearQueued();
}
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#getState(java.lang.String)
+ */
+ public Object getState(final String key) {
+ // not supported for now
+ return null;
+ }
+
+ /**
+ * Put another job into the queue.
+ */
+ protected abstract void put(final JobEvent event);
+
+ /**
+ * Get another job from the queue.
+ */
+ protected abstract JobEvent take();
+
+ /**
+ * Is the queue empty?
+ */
+ protected abstract boolean isEmpty();
+
+ /**
+ * Remove all events from the queue and return them.
+ */
+ protected abstract Collection<JobEvent> removeAllJobs();
+
+ protected abstract JobEvent start(final JobEvent event);
+
+ protected abstract void notifyFinished(final JobEvent rescheduleInfo);
}
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java Fri Oct 15 06:53:29 2010
@@ -45,8 +45,8 @@ public abstract class AbstractParallelJo
}
@Override
- public String getStatusInfo() {
- return super.getStatusInfo() + ", jobCount=" + this.jobCount;
+ public String getStateInfo() {
+ return super.getStateInfo() + ", jobCount=" + this.jobCount;
}
@Override
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Fri Oct 15 06:53:29 2010
@@ -18,6 +18,9 @@
*/
package org.apache.sling.event.impl.jobs.queues;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -38,7 +41,7 @@ public final class OrderedJobQueue exten
private JobEvent jobEvent;
/** Marker indicating that this queue is currently sleeping. */
- private volatile boolean isSleeping = false;
+ private volatile long isSleepingUntil = -1;
/** The sleeping thread. */
private volatile Thread sleepingThread;
@@ -53,8 +56,8 @@ public final class OrderedJobQueue exten
}
@Override
- public String getStatusInfo() {
- return super.getStatusInfo() + ", isSleeping=" + this.isSleeping;
+ public String getStateInfo() {
+ return super.getStateInfo() + ", isSleepingUntil=" + this.isSleepingUntil;
}
@Override
@@ -68,21 +71,19 @@ public final class OrderedJobQueue exten
return rescheduleInfo;
}
- private void setSleeping(boolean flag) {
- this.isSleeping = flag;
- if ( !flag ) {
- this.sleepingThread = null;
- }
+ private void setNotSleeping() {
+ this.isSleepingUntil = -1;
+ this.sleepingThread = null;
}
- private void setSleeping(Thread sleepingThread) {
+ private void setSleeping(final Thread sleepingThread, final long delay) {
this.sleepingThread = sleepingThread;
- this.setSleeping(true);
+ this.isSleepingUntil = System.currentTimeMillis() + delay;
}
@Override
public void resume() {
- if ( this.isSleeping ) {
+ if ( this.isSleepingUntil == -1 ) {
final Thread thread = this.sleepingThread;
if ( thread != null ) {
thread.interrupt();
@@ -158,14 +159,14 @@ public final class OrderedJobQueue exten
delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
}
if ( delay > 0 ) {
- setSleeping(Thread.currentThread());
+ this.setSleeping(Thread.currentThread(), delay);
try {
this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
Thread.sleep(delay);
} catch (InterruptedException e) {
this.ignoreException(e);
} finally {
- setSleeping(false);
+ this.setNotSleeping();
}
}
return info;
@@ -180,9 +181,24 @@ public final class OrderedJobQueue exten
}
@Override
- public void removeAll() {
+ public synchronized void removeAll() {
this.jobEvent = null;
super.removeAll();
}
+
+ @Override
+ protected Collection<JobEvent> removeAllJobs() {
+ final List<JobEvent> events = new ArrayList<JobEvent>(this.queue);
+ this.queue.clear();
+ return events;
+ }
+
+ @Override
+ public Object getState(final String key) {
+ if ( "isSleepingUntil".equals(key) ) {
+ return this.isSleepingUntil;
+ }
+ return super.getState(key);
+ }
}
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Fri Oct 15 06:53:29 2010
@@ -18,6 +18,9 @@
*/
package org.apache.sling.event.impl.jobs.queues;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -75,5 +78,12 @@ public final class ParallelJobQueue exte
this.queue.clear();
super.clear();
}
+
+ @Override
+ protected Collection<JobEvent> removeAllJobs() {
+ final List<JobEvent> events = new ArrayList<JobEvent>(this.queue);
+ this.queue.clear();
+ return events;
+ }
}
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Fri Oct 15 06:53:29 2010
@@ -19,6 +19,7 @@
package org.apache.sling.event.impl.jobs.queues;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -59,8 +60,8 @@ public final class TopicRoundRobinJobQue
}
@Override
- public String getStatusInfo() {
- return super.getStatusInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext=" + this.isWaitingForNext;
+ public String getStateInfo() {
+ return super.getStateInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext=" + this.isWaitingForNext;
}
@Override
@@ -143,5 +144,19 @@ public final class TopicRoundRobinJobQue
}
super.clear();
}
+
+ @Override
+ protected Collection<JobEvent> removeAllJobs() {
+ final List<JobEvent> events = new ArrayList<JobEvent>();
+ synchronized ( this.topicMap ) {
+ for(final List<JobEvent> l : this.topicMap.values() ) {
+ events.addAll(l);
+ }
+ this.eventCount = 0;
+ this.topics.clear();
+ this.topicMap.clear();
+ }
+ return events;
+ }
}
Modified: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java (original)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java Fri Oct 15 06:53:29 2010
@@ -36,11 +36,6 @@ public interface Queue {
Statistics getStatistics();
/**
- * Return some information about the current status of the queue.
- */
- String getStatusInfo();
-
- /**
* Get the corresponding configuration.
*/
QueueConfiguration getConfiguration();
@@ -83,4 +78,17 @@ public interface Queue {
* all outstanding jobs (but no notifications are send).
*/
void removeAll();
+
+ /**
+ * Return some information about the current state of the queue. This
+ * method is meant to see the internal state of the queue for debugging
+ * or monitoring purposes.
+ */
+ String getStateInfo();
+
+ /**
+ * For monitoring purposes and possible extensions from the different
+ * queue types. This method allows to query state information.
+ */
+ Object getState(final String key);
}
Modified: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java?rev=1022833&r1=1022832&r2=1022833&view=diff
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java (original)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java Fri Oct 15 06:53:29 2010
@@ -77,7 +77,7 @@ public class InternalQueueConfigurationT
assertFalse(c.match(getJobEvent("t/x")));
}
- @org.junit.Test public void testTopicMatchersStart() {
+ @org.junit.Test public void testTopicMatchersStar() {
final Map<String, Object> p = new HashMap<String, Object>();
p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a*"});
p.put(ConfigurationConstants.PROP_NAME, "test");
@@ -122,6 +122,51 @@ public class InternalQueueConfigurationT
assertEquals("test-queue-d", d.queueName);
}
+ @org.junit.Test public void testTopicMatchersDotAndSlash() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/."});
+ p.put(ConfigurationConstants.PROP_NAME, "test");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertTrue(c.isValid());
+ assertTrue(c.match(getJobEvent("a/b")));
+ assertTrue(c.match(getJobEvent("a/c")));
+ assertFalse(c.match(getJobEvent("a")));
+ assertFalse(c.match(getJobEvent("a/b/c")));
+ assertFalse(c.match(getJobEvent("t")));
+ assertFalse(c.match(getJobEvent("t/x")));
+ }
+
+ @org.junit.Test public void testTopicMatchersStarAndSlash() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/*"});
+ p.put(ConfigurationConstants.PROP_NAME, "test");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertTrue(c.isValid());
+ assertTrue(c.match(getJobEvent("a/b")));
+ assertTrue(c.match(getJobEvent("a/c")));
+ assertFalse(c.match(getJobEvent("a")));
+ assertTrue(c.match(getJobEvent("a/b/c")));
+ assertFalse(c.match(getJobEvent("t")));
+ assertFalse(c.match(getJobEvent("t/x")));
+ }
+
+ @org.junit.Test public void testTopicMatcherAndReplacementAndSlash() {
+ final Map<String, Object> p = new HashMap<String, Object>();
+ p.put(ConfigurationConstants.PROP_TOPICS, new String[] {"a/."});
+ p.put(ConfigurationConstants.PROP_NAME, "test-queue-{0}");
+
+ InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(p);
+ assertTrue(c.isValid());
+ final JobEvent b = getJobEvent("a/b");
+ assertTrue(c.match(b));
+ assertEquals("test-queue-b", b.queueName);
+ final JobEvent d = getJobEvent("a/d");
+ assertTrue(c.match(d));
+ assertEquals("test-queue-d", d.queueName);
+ }
+
@org.junit.Test public void testNoTopicMatchers() {
final Map<String, Object> p = new HashMap<String, Object>();
p.put(ConfigurationConstants.PROP_NAME, "test");