You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/11/06 10:24:24 UTC

svn commit: r1637057 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/config/ test/java/org/apache/sling/event/impl/jobs/config/ test/java/org/apache/sling/event/it/

Author: cziegeler
Date: Thu Nov  6 09:24:24 2014
New Revision: 1637057

URL: http://svn.apache.org/r1637057
Log:
SLING-4133 : Allow job consumers to register for a topic and all subtopics

Added:
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java   (with props)
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java?rev=1637057&r1=1637056&r2=1637057&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java Thu Nov  6 09:24:24 2014
@@ -188,6 +188,26 @@ public class TopologyCapabilities {
     }
 
     /**
+     * Add instances to the list if not already included
+     */
+    private void addAll(final List<InstanceDescription> potentialTargets, final List<InstanceDescription> newTargets) {
+        if ( newTargets != null ) {
+            for(final InstanceDescription desc : newTargets) {
+                boolean found = false;
+                for(final InstanceDescription existingDesc : potentialTargets) {
+                    if ( desc.getSlingId().equals(existingDesc.getSlingId()) ) {
+                        found = true;
+                        break;
+                    }
+                }
+                if ( !found ) {
+                    potentialTargets.add(desc);
+                }
+            }
+        }
+    }
+
+    /**
      * Return the potential targets (Sling IDs) sorted by ID
      * @return A list of instance descriptions. The list might be empty.
      */
@@ -196,18 +216,21 @@ public class TopologyCapabilities {
         final List<InstanceDescription> potentialTargets = new ArrayList<InstanceDescription>();
 
         // first: topic targets - directly handling the topic
-        final List<InstanceDescription> topicTargets = this.instanceCapabilities.get(jobTopic);
-        if ( topicTargets != null ) {
-            potentialTargets.addAll(topicTargets);
-        }
+        addAll(potentialTargets, this.instanceCapabilities.get(jobTopic));
+
         // second: category targets - handling the topic category
-        final int pos = jobTopic.lastIndexOf('/');
+        int pos = jobTopic.lastIndexOf('/');
         if ( pos > 0 ) {
             final String category = jobTopic.substring(0, pos + 1).concat("*");
-            final List<InstanceDescription> categoryTargets = this.instanceCapabilities.get(category);
-            if ( categoryTargets != null ) {
-                potentialTargets.addAll(categoryTargets);
-            }
+            addAll(potentialTargets, this.instanceCapabilities.get(category));
+
+            // search deep consumers (since 1.2 of the consumer package)
+            do {
+                final String subCategory = jobTopic.substring(0, pos + 1).concat("**");
+                addAll(potentialTargets, this.instanceCapabilities.get(subCategory));
+
+                pos = jobTopic.lastIndexOf('/', pos - 1);
+            } while ( pos > 0 );
         }
         // third: bridged consumers
         final List<InstanceDescription> bridgedTargets = (jobProperties != null && jobProperties.containsKey(JobImpl.PROPERTY_BRIDGED_EVENT) ? this.instanceCapabilities.get("/") : null);

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java?rev=1637057&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java Thu Nov  6 09:24:24 2014
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyView;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TopologyCapabilitiesTest {
+
+    private TopologyCapabilities caps;
+
+    @Before
+    public void setup() {
+        // local cluster view
+        final ClusterView cv = Mockito.mock(ClusterView.class);
+        Mockito.when(cv.getId()).thenReturn("cluster");
+
+        // local description
+        final InstanceDescription local = Mockito.mock(InstanceDescription.class);
+        Mockito.when(local.isLeader()).thenReturn(true);
+        Mockito.when(local.getSlingId()).thenReturn("local");
+        Mockito.when(local.getProperty(TopologyCapabilities.PROPERTY_TOPICS)).thenReturn("foo,bar/*,a/**,d/1/2,d/1/*,d/**");
+        Mockito.when(local.getClusterView()).thenReturn(cv);
+
+        // topology view
+        final TopologyView tv = Mockito.mock(TopologyView.class);
+        Mockito.when(tv.getInstances()).thenReturn(Collections.singleton(local));
+        Mockito.when(tv.getLocalInstance()).thenReturn(local);
+
+        final JobManagerConfiguration config = Mockito.mock(JobManagerConfiguration.class);
+
+        caps = new TopologyCapabilities(tv, config);
+    }
+
+    @Test public void testMatching() {
+        assertEquals(1, caps.getPotentialTargets("foo", null).size());
+        assertEquals(0, caps.getPotentialTargets("foo/a", null).size());
+        assertEquals(0, caps.getPotentialTargets("bar", null).size());
+        assertEquals(1, caps.getPotentialTargets("bar/foo", null).size());
+        assertEquals(0, caps.getPotentialTargets("bar/foo/a", null).size());
+        assertEquals(1, caps.getPotentialTargets("a/b", null).size());
+        assertEquals(1, caps.getPotentialTargets("a/b(c", null).size());
+        assertEquals(0, caps.getPotentialTargets("x", null).size());
+        assertEquals(0, caps.getPotentialTargets("x/y", null).size());
+        assertEquals(1, caps.getPotentialTargets("d/1/2", null).size());
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java?rev=1637057&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java Thu Nov  6 09:24:24 2014
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutionResult;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerMethod.class)
+public class TopicMatchingTest extends AbstractJobHandlingTest {
+
+    public static final String TOPIC = "sling/test/a";
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+    }
+
+    @Override
+    @After
+    public void cleanup() {
+        super.cleanup();
+    }
+
+    /**
+     * Test simple pattern matching
+     */
+    @Test(timeout = DEFAULT_TEST_TIMEOUT)
+    public void testSimpleMatching() throws Exception {
+        final AtomicInteger finishedCount = new AtomicInteger();
+
+        final ServiceRegistration reg = this.registerJobExecutor("sling/test/*",
+                new JobExecutor() {
+
+                    @Override
+                    public JobExecutionResult process(final Job job, final JobExecutionContext context) {
+                        return context.result().succeeded();
+                    }
+                });
+        final ServiceRegistration eventHandler = this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(final Event event) {
+                        finishedCount.incrementAndGet();
+                    }
+                });
+
+        try {
+            this.getJobManager().addJob(TOPIC, null);
+            while ( finishedCount.get() == 0 ) {
+                this.sleep(10);
+            }
+        } finally {
+            reg.unregister();
+            eventHandler.unregister();
+        }
+    }
+
+    /**
+     * Test deep pattern matching
+     */
+    @Test(timeout = DEFAULT_TEST_TIMEOUT)
+    public void testDeepMatching() throws Exception {
+        final AtomicInteger finishedCount = new AtomicInteger();
+
+        final ServiceRegistration reg = this.registerJobExecutor("sling/**",
+                new JobExecutor() {
+
+                    @Override
+                    public JobExecutionResult process(final Job job, final JobExecutionContext context) {
+                        return context.result().succeeded();
+                    }
+                });
+        final ServiceRegistration eventHandler = this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(final Event event) {
+                        finishedCount.incrementAndGet();
+                    }
+                });
+
+        try {
+            this.getJobManager().addJob(TOPIC, null);
+            while ( finishedCount.get() == 0 ) {
+                this.sleep(10);
+            }
+        } finally {
+            reg.unregister();
+            eventHandler.unregister();
+        }
+    }
+
+    /**
+     * Test ordering of matchers
+     */
+    @Test(timeout = DEFAULT_TEST_TIMEOUT)
+    public void testOrdering() throws Exception {
+        final AtomicInteger count1 = new AtomicInteger();
+        final AtomicInteger count2 = new AtomicInteger();
+        final AtomicInteger count3 = new AtomicInteger();
+
+        final ServiceRegistration reg1 = this.registerJobExecutor("sling/**",
+                new JobExecutor() {
+
+                    @Override
+                    public JobExecutionResult process(final Job job, final JobExecutionContext context) {
+                        count1.incrementAndGet();
+                        return context.result().succeeded();
+                    }
+                });
+        final ServiceRegistration reg2 = this.registerJobExecutor("sling/test/*",
+                new JobExecutor() {
+
+                    @Override
+                    public JobExecutionResult process(final Job job, final JobExecutionContext context) {
+                        count2.incrementAndGet();
+                        return context.result().succeeded();
+                    }
+                });
+        final ServiceRegistration reg3 = this.registerJobExecutor(TOPIC,
+                new JobExecutor() {
+
+                    @Override
+                    public JobExecutionResult process(final Job job, final JobExecutionContext context) {
+                        count3.incrementAndGet();
+                        return context.result().succeeded();
+                    }
+                });
+
+        // first test, all three registered, reg3 should get the precedence
+        this.getJobManager().addJob(TOPIC, null);
+        while ( count3.get() != 1 ) {
+            this.sleep(10);
+        }
+
+        // second test, unregister reg3, now it should be reg2
+        reg3.unregister();
+        this.getJobManager().addJob(TOPIC, null);
+        while ( count2.get() != 1 ) {
+            this.sleep(10);
+        }
+
+        // third test, unregister reg2, reg1 is now the only one
+        reg2.unregister();
+        this.getJobManager().addJob(TOPIC, null);
+        while ( count1.get() != 1 ) {
+            this.sleep(10);
+        }
+        reg1.unregister();
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url