You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/11/06 10:24:24 UTC
svn commit: r1637057 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/config/
test/java/org/apache/sling/event/impl/jobs/config/
test/java/org/apache/sling/event/it/
Author: cziegeler
Date: Thu Nov 6 09:24:24 2014
New Revision: 1637057
URL: http://svn.apache.org/r1637057
Log:
SLING-4133 : Allow job consumers to register for a topic and all subtopics
Added:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java (with props)
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java?rev=1637057&r1=1637056&r2=1637057&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java Thu Nov 6 09:24:24 2014
@@ -188,6 +188,26 @@ public class TopologyCapabilities {
}
/**
+ * Add instances to the list if not already included
+ */
+ private void addAll(final List<InstanceDescription> potentialTargets, final List<InstanceDescription> newTargets) {
+ if ( newTargets != null ) {
+ for(final InstanceDescription desc : newTargets) {
+ boolean found = false;
+ for(final InstanceDescription existingDesc : potentialTargets) {
+ if ( desc.getSlingId().equals(existingDesc.getSlingId()) ) {
+ found = true;
+ break;
+ }
+ }
+ if ( !found ) {
+ potentialTargets.add(desc);
+ }
+ }
+ }
+ }
+
+ /**
* Return the potential targets (Sling IDs) sorted by ID
* @return A list of instance descriptions. The list might be empty.
*/
@@ -196,18 +216,21 @@ public class TopologyCapabilities {
final List<InstanceDescription> potentialTargets = new ArrayList<InstanceDescription>();
// first: topic targets - directly handling the topic
- final List<InstanceDescription> topicTargets = this.instanceCapabilities.get(jobTopic);
- if ( topicTargets != null ) {
- potentialTargets.addAll(topicTargets);
- }
+ addAll(potentialTargets, this.instanceCapabilities.get(jobTopic));
+
// second: category targets - handling the topic category
- final int pos = jobTopic.lastIndexOf('/');
+ int pos = jobTopic.lastIndexOf('/');
if ( pos > 0 ) {
final String category = jobTopic.substring(0, pos + 1).concat("*");
- final List<InstanceDescription> categoryTargets = this.instanceCapabilities.get(category);
- if ( categoryTargets != null ) {
- potentialTargets.addAll(categoryTargets);
- }
+ addAll(potentialTargets, this.instanceCapabilities.get(category));
+
+ // search deep consumers (since 1.2 of the consumer package)
+ do {
+ final String subCategory = jobTopic.substring(0, pos + 1).concat("**");
+ addAll(potentialTargets, this.instanceCapabilities.get(subCategory));
+
+ pos = jobTopic.lastIndexOf('/', pos - 1);
+ } while ( pos > 0 );
}
// third: bridged consumers
final List<InstanceDescription> bridgedTargets = (jobProperties != null && jobProperties.containsKey(JobImpl.PROPERTY_BRIDGED_EVENT) ? this.instanceCapabilities.get("/") : null);
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java?rev=1637057&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java Thu Nov 6 09:24:24 2014
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyView;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TopologyCapabilitiesTest {
+
+ private TopologyCapabilities caps;
+
+ @Before
+ public void setup() {
+ // local cluster view
+ final ClusterView cv = Mockito.mock(ClusterView.class);
+ Mockito.when(cv.getId()).thenReturn("cluster");
+
+ // local description
+ final InstanceDescription local = Mockito.mock(InstanceDescription.class);
+ Mockito.when(local.isLeader()).thenReturn(true);
+ Mockito.when(local.getSlingId()).thenReturn("local");
+ Mockito.when(local.getProperty(TopologyCapabilities.PROPERTY_TOPICS)).thenReturn("foo,bar/*,a/**,d/1/2,d/1/*,d/**");
+ Mockito.when(local.getClusterView()).thenReturn(cv);
+
+ // topology view
+ final TopologyView tv = Mockito.mock(TopologyView.class);
+ Mockito.when(tv.getInstances()).thenReturn(Collections.singleton(local));
+ Mockito.when(tv.getLocalInstance()).thenReturn(local);
+
+ final JobManagerConfiguration config = Mockito.mock(JobManagerConfiguration.class);
+
+ caps = new TopologyCapabilities(tv, config);
+ }
+
+ @Test public void testMatching() {
+ assertEquals(1, caps.getPotentialTargets("foo", null).size());
+ assertEquals(0, caps.getPotentialTargets("foo/a", null).size());
+ assertEquals(0, caps.getPotentialTargets("bar", null).size());
+ assertEquals(1, caps.getPotentialTargets("bar/foo", null).size());
+ assertEquals(0, caps.getPotentialTargets("bar/foo/a", null).size());
+ assertEquals(1, caps.getPotentialTargets("a/b", null).size());
+ assertEquals(1, caps.getPotentialTargets("a/b(c", null).size());
+ assertEquals(0, caps.getPotentialTargets("x", null).size());
+ assertEquals(0, caps.getPotentialTargets("x/y", null).size());
+ assertEquals(1, caps.getPotentialTargets("d/1/2", null).size());
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java?rev=1637057&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java Thu Nov 6 09:24:24 2014
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerMethod.class)
+public class TopicMatchingTest extends AbstractJobHandlingTest {
+
+ public static final String TOPIC = "sling/test/a";
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+ }
+
+ @Override
+ @After
+ public void cleanup() {
+ super.cleanup();
+ }
+
+ /**
+ * Test simple pattern matching
+ */
+ @Test(timeout = DEFAULT_TEST_TIMEOUT)
+ public void testSimpleMatching() throws Exception {
+ final AtomicInteger finishedCount = new AtomicInteger();
+
+ final ServiceRegistration reg = this.registerJobExecutor("sling/test/*",
+ new JobExecutor() {
+
+ @Override
+ public JobExecutionResult process(final Job job, final JobExecutionContext context) {
+ return context.result().succeeded();
+ }
+ });
+ final ServiceRegistration eventHandler = this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(final Event event) {
+ finishedCount.incrementAndGet();
+ }
+ });
+
+ try {
+ this.getJobManager().addJob(TOPIC, null);
+ while ( finishedCount.get() == 0 ) {
+ this.sleep(10);
+ }
+ } finally {
+ reg.unregister();
+ eventHandler.unregister();
+ }
+ }
+
+ /**
+ * Test deep pattern matching
+ */
+ @Test(timeout = DEFAULT_TEST_TIMEOUT)
+ public void testDeepMatching() throws Exception {
+ final AtomicInteger finishedCount = new AtomicInteger();
+
+ final ServiceRegistration reg = this.registerJobExecutor("sling/**",
+ new JobExecutor() {
+
+ @Override
+ public JobExecutionResult process(final Job job, final JobExecutionContext context) {
+ return context.result().succeeded();
+ }
+ });
+ final ServiceRegistration eventHandler = this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(final Event event) {
+ finishedCount.incrementAndGet();
+ }
+ });
+
+ try {
+ this.getJobManager().addJob(TOPIC, null);
+ while ( finishedCount.get() == 0 ) {
+ this.sleep(10);
+ }
+ } finally {
+ reg.unregister();
+ eventHandler.unregister();
+ }
+ }
+
+ /**
+ * Test ordering of matchers
+ */
+ @Test(timeout = DEFAULT_TEST_TIMEOUT)
+ public void testOrdering() throws Exception {
+ final AtomicInteger count1 = new AtomicInteger();
+ final AtomicInteger count2 = new AtomicInteger();
+ final AtomicInteger count3 = new AtomicInteger();
+
+ final ServiceRegistration reg1 = this.registerJobExecutor("sling/**",
+ new JobExecutor() {
+
+ @Override
+ public JobExecutionResult process(final Job job, final JobExecutionContext context) {
+ count1.incrementAndGet();
+ return context.result().succeeded();
+ }
+ });
+ final ServiceRegistration reg2 = this.registerJobExecutor("sling/test/*",
+ new JobExecutor() {
+
+ @Override
+ public JobExecutionResult process(final Job job, final JobExecutionContext context) {
+ count2.incrementAndGet();
+ return context.result().succeeded();
+ }
+ });
+ final ServiceRegistration reg3 = this.registerJobExecutor(TOPIC,
+ new JobExecutor() {
+
+ @Override
+ public JobExecutionResult process(final Job job, final JobExecutionContext context) {
+ count3.incrementAndGet();
+ return context.result().succeeded();
+ }
+ });
+
+ // first test, all three registered, reg3 should get the precedence
+ this.getJobManager().addJob(TOPIC, null);
+ while ( count3.get() != 1 ) {
+ this.sleep(10);
+ }
+
+ // second test, unregister reg3, now it should be reg2
+ reg3.unregister();
+ this.getJobManager().addJob(TOPIC, null);
+ while ( count2.get() != 1 ) {
+ this.sleep(10);
+ }
+
+ // third test, unregister reg2, reg1 is now the only one
+ reg2.unregister();
+ this.getJobManager().addJob(TOPIC, null);
+ while ( count1.get() != 1 ) {
+ this.sleep(10);
+ }
+ reg1.unregister();
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url