You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2016/05/24 13:55:53 UTC

svn commit: r1745362 - in /sling/trunk/contrib/extensions/distribution/core: ./ src/main/java/org/apache/sling/distribution/agent/impl/ src/main/java/org/apache/sling/distribution/queue/impl/simple/ src/test/java/org/apache/sling/distribution/queue/imp...

Author: tommaso
Date: Tue May 24 13:55:53 2016
New Revision: 1745362

URL: http://svn.apache.org/viewvc?rev=1745362&view=rev
Log:
SLING-5748 - checkpointing memory queue made optional, updated forward agent factory

Added:
    sling/trunk/contrib/extensions/distribution/core/src/test/resources/
    sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint
Modified:
    sling/trunk/contrib/extensions/distribution/core/pom.xml
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java

Modified: sling/trunk/contrib/extensions/distribution/core/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/pom.xml?rev=1745362&r1=1745361&r2=1745362&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/pom.xml (original)
+++ sling/trunk/contrib/extensions/distribution/core/pom.xml Tue May 24 13:55:53 2016
@@ -81,6 +81,15 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-javadoc-plugin</artifactId>
             </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>**/*-checkpoint</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java?rev=1745362&r1=1745361&r2=1745362&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java Tue May 24 13:55:53 2016
@@ -51,7 +51,6 @@ import org.apache.sling.distribution.que
 import org.apache.sling.distribution.queue.impl.ErrorQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.PriorityQueueDispatchingStrategy;
-import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueue;
 import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
 import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueProvider;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
@@ -77,7 +76,7 @@ import org.slf4j.LoggerFactory;
 @Reference(name = "triggers", referenceInterface = DistributionTrigger.class,
         policy = ReferencePolicy.DYNAMIC, cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
         bind = "bindDistributionTrigger", unbind = "unbindDistributionTrigger")
-@Property(name="webconsole.configurationFactory.nameHint", value="Agent name: {name}")
+@Property(name = "webconsole.configurationFactory.nameHint", value = "Agent name: {name}")
 public class ForwardDistributionAgentFactory extends AbstractDistributionAgentFactory {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -166,7 +165,8 @@ public class ForwardDistributionAgentFac
 
     @Property(options = {
             @PropertyOption(name = JobHandlingDistributionQueueProvider.TYPE, value = "Sling Jobs"),
-            @PropertyOption(name = SimpleDistributionQueueProvider.TYPE, value = "In-memory")},
+            @PropertyOption(name = SimpleDistributionQueueProvider.TYPE, value = "In-memory"),
+            @PropertyOption(name = SimpleDistributionQueueProvider.TYPE_CHECKPOINT, value = "In-file")},
             value = "jobs",
             label = "Queue provider", description = "The queue provider implementation."
     )
@@ -231,10 +231,11 @@ public class ForwardDistributionAgentFac
         DistributionQueueProvider queueProvider;
         String queueProviderName = PropertiesUtil.toString(config.get(QUEUE_PROVIDER), JobHandlingDistributionQueueProvider.TYPE);
         if (JobHandlingDistributionQueueProvider.TYPE.equals(queueProviderName)) {
-           queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
-        }
-        else {
-            queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName);
+            queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+        } else if (SimpleDistributionQueueProvider.TYPE.equals(queueProviderName)) {
+            queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, false);
+        } else {
+            queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, true);
         }
 
         DistributionQueueDispatchingStrategy exportQueueStrategy;

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java?rev=1745362&r1=1745361&r2=1745362&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java Tue May 24 13:55:53 2016
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 public class SimpleDistributionQueueProvider implements DistributionQueueProvider {
 
     public static final String TYPE = "simple";
+    public static final String TYPE_CHECKPOINT = "simple-checkpoint";
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -57,23 +58,27 @@ public class SimpleDistributionQueueProv
     private final Scheduler scheduler;
 
     private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
+    private final boolean checkpoint;
     private File checkpointDirectory;
 
-    public SimpleDistributionQueueProvider(Scheduler scheduler, String name) {
+    public SimpleDistributionQueueProvider(Scheduler scheduler, String name, boolean checkpoint) {
+        this.checkpoint = checkpoint;
         if (name == null || scheduler == null) {
             throw new IllegalArgumentException("all arguments are required");
         }
 
-        this.checkpointDirectory = new File(name + "-simple-queues-checkpoints");
-        log.info("creating checkpoint directory {}", checkpointDirectory.getAbsoluteFile());
-        if (checkpointDirectory.exists() && !checkpointDirectory.isDirectory()) {
-            assert checkpointDirectory.delete();
-        }
-        boolean created = false;
-        if (!checkpointDirectory.exists()) {
-            created = checkpointDirectory.mkdir();
+        if (checkpoint) {
+            this.checkpointDirectory = new File(name + "-simple-queues-checkpoints");
+            log.info("creating checkpoint directory {}", checkpointDirectory.getAbsoluteFile());
+            if (checkpointDirectory.exists() && !checkpointDirectory.isDirectory()) {
+                assert checkpointDirectory.delete();
+            }
+            boolean created = false;
+            if (!checkpointDirectory.exists()) {
+                created = checkpointDirectory.mkdir();
+            }
+            log.info("checkpoint directory created: {}, exists {}", created, checkpointDirectory.isDirectory() && checkpointDirectory.exists());
         }
-        log.info("checkpoint directory created: {}, exists {}", created, checkpointDirectory.isDirectory() && checkpointDirectory.exists());
 
         this.scheduler = scheduler;
         this.name = name;
@@ -99,43 +104,53 @@ public class SimpleDistributionQueueProv
 
     public void enableQueueProcessing(@Nonnull DistributionQueueProcessor queueProcessor, String... queueNames) {
 
-        // recover from checkpoints
-        log.debug("recovering from checkpoints if needed");
-        for (final String queueName : queueNames) {
-            log.debug("recovering for queue {}", queueName);
-            DistributionQueue queue = getQueue(queueName);
-            FilenameFilter filenameFilter = new FilenameFilter() {
-                @Override
-                public boolean accept(File file, String name) {
-                    return name.equals(queueName + "-checkpoint");
-                }
-            };
-            for (File qf : checkpointDirectory.listFiles(filenameFilter)) {
-                log.info("recovering from checkpoint {}", qf);
-                try {
-                    LineIterator lineIterator = IOUtils.lineIterator(new FileReader(qf));
-                    while (lineIterator.hasNext()) {
-                        String s = lineIterator.nextLine();
-                        String[] split = s.split(" ");
-                        String id = split[0];
-                        String infoString = split[1];
-                        Map<String, Object> info = new HashMap<String, Object>();
-                        JSONTokener jsonTokener = new JSONTokener(infoString);
-                        JSONObject jsonObject = new JSONObject(jsonTokener);
-                        Iterator<String> keys = jsonObject.keys();
-                        while (keys.hasNext()) {
-                            String key = keys.next();
-                            info.put(key, jsonObject.get(key));
+        if (checkpoint) {
+            // recover from checkpoints
+            log.debug("recovering from checkpoints if needed");
+            for (final String queueName : queueNames) {
+                log.debug("recovering for queue {}", queueName);
+                DistributionQueue queue = getQueue(queueName);
+                FilenameFilter filenameFilter = new FilenameFilter() {
+                    @Override
+                    public boolean accept(File file, String name) {
+                        return name.equals(queueName + "-checkpoint");
+                    }
+                };
+                for (File qf : checkpointDirectory.listFiles(filenameFilter)) {
+                    log.info("recovering from checkpoint {}", qf);
+                    try {
+                        LineIterator lineIterator = IOUtils.lineIterator(new FileReader(qf));
+                        while (lineIterator.hasNext()) {
+                            String s = lineIterator.nextLine();
+                            String[] split = s.split(" ");
+                            String id = split[0];
+                            String infoString = split[1];
+                            Map<String, Object> info = new HashMap<String, Object>();
+                            JSONTokener jsonTokener = new JSONTokener(infoString);
+                            JSONObject jsonObject = new JSONObject(jsonTokener);
+                            Iterator<String> keys = jsonObject.keys();
+                            while (keys.hasNext()) {
+                                String key = keys.next();
+                                info.put(key, jsonObject.get(key));
+                            }
+                            queue.add(new DistributionQueueItem(id, info));
                         }
-                        queue.add(new DistributionQueueItem(id, info));
+                        log.info("recovered {} items from queue {}", queue.getStatus().getItemsCount(), queueName);
+                    } catch (FileNotFoundException e) {
+                        log.warn("could not read checkpoint file {}", qf.getAbsolutePath());
+                    } catch (JSONException e) {
+                        log.warn("could not parse info from checkpoint file {}", qf.getAbsolutePath());
                     }
-                    log.info("recovered {} items from queue {}", queue.getStatus().getItemsCount(), queueName);
-                } catch (FileNotFoundException e) {
-                    log.warn("could not read checkpoint file {}", qf.getAbsolutePath());
-                } catch (JSONException e) {
-                    log.warn("could not parse info from checkpoint file {}", qf.getAbsolutePath());
                 }
             }
+
+            // enable checkpointing
+            for (String queueName : queueNames) {
+                ScheduleOptions options = scheduler.NOW(-1, 15)
+                        .canRunConcurrently(false)
+                        .name(getJobName(queueName + "-checkpoint"));
+                scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory), options);
+            }
         }
 
         // enable processing
@@ -146,27 +161,24 @@ public class SimpleDistributionQueueProv
             scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor), options);
         }
 
-        // enable checkpointing
-        for (String queueName : queueNames) {
-            ScheduleOptions options = scheduler.NOW(-1, 15)
-                    .canRunConcurrently(false)
-                    .name(getJobName(queueName + "-checkpoint"));
-            scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory), options);
-        }
     }
 
     public void disableQueueProcessing() {
         for (DistributionQueue queue : getQueues()) {
             String queueName = queue.getName();
+            // disable queue processing
             if (scheduler.unschedule(getJobName(queueName))) {
                 log.debug("queue processing on {} stopped", queue);
             } else {
                 log.warn("could not disable queue processing on {}", queue);
             }
-            if (scheduler.unschedule(getJobName(queueName) + "-checkpoint")) {
-                log.debug("checkpoint on {} stopped", queue);
-            } else {
-                log.warn("could not disable checkpoint on {}", queue);
+            if (checkpoint) {
+                // disable checkpointing
+                if (scheduler.unschedule(getJobName(queueName) + "-checkpoint")) {
+                    log.debug("checkpoint on {} stopped", queue);
+                } else {
+                    log.warn("could not disable checkpoint on {}", queue);
+                }
             }
         }
     }

Modified: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java?rev=1745362&r1=1745361&r2=1745362&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java Tue May 24 13:55:53 2016
@@ -18,12 +18,16 @@
  */
 package org.apache.sling.distribution.queue.impl.simple;
 
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.queue.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueProcessor;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -36,7 +40,14 @@ public class SimpleDistributionQueueProv
 
     @Test
     public void testGetOrCreateQueue() throws Exception {
-        SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(mock(Scheduler.class), "agentName");
+        SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(mock(Scheduler.class), "agentName", false);
+        DistributionQueue queue = simpledistributionQueueProvider.getQueue("default");
+        assertNotNull(queue);
+    }
+
+    @Test
+    public void testGetOrCreateQueueWithCheckpointing() throws Exception {
+        SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(mock(Scheduler.class), "agentName", true);
         DistributionQueue queue = simpledistributionQueueProvider.getQueue("default");
         assertNotNull(queue);
     }
@@ -48,7 +59,39 @@ public class SimpleDistributionQueueProv
         when(scheduler.NOW(-1, 1)).thenReturn(options);
         when(options.canRunConcurrently(false)).thenReturn(options);
         when(options.name(any(String.class))).thenReturn(options);
-        SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent");
+        SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent", false);
+        DistributionQueueProcessor processor = mock(DistributionQueueProcessor.class);
+        simpledistributionQueueProvider.enableQueueProcessing(processor);
+    }
+
+    @Test
+    public void testEnableQueueProcessingWithCheckpointRecovery() throws Exception {
+        File checkpointDirectory = new File("dummy-agent-simple-queues-checkpoints");
+        File file = new File(getClass().getResource("/dummy-agent-checkpoint").getFile());
+        FileUtils.copyFileToDirectory(file, checkpointDirectory);
+
+        Scheduler scheduler = mock(Scheduler.class);
+        ScheduleOptions options = mock(ScheduleOptions.class);
+        when(scheduler.NOW(-1, 1)).thenReturn(options);
+        when(scheduler.NOW(-1, 15)).thenReturn(options);
+        when(options.canRunConcurrently(false)).thenReturn(options);
+        when(options.name(any(String.class))).thenReturn(options);
+        SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent", true);
+        DistributionQueueProcessor processor = mock(DistributionQueueProcessor.class);
+        simpledistributionQueueProvider.enableQueueProcessing(processor, "dummy-agent");
+        DistributionQueue queue = simpledistributionQueueProvider.getQueue("dummy-agent");
+        assertNotNull(queue);
+        assertEquals(1, queue.getStatus().getItemsCount());
+    }
+
+    @Test
+    public void testEnableQueueProcessingWithCheckpointing() throws Exception {
+        Scheduler scheduler = mock(Scheduler.class);
+        ScheduleOptions options = mock(ScheduleOptions.class);
+        when(scheduler.NOW(-1, 1)).thenReturn(options);
+        when(options.canRunConcurrently(false)).thenReturn(options);
+        when(options.name(any(String.class))).thenReturn(options);
+        SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent", true);
         DistributionQueueProcessor processor = mock(DistributionQueueProcessor.class);
         simpledistributionQueueProvider.enableQueueProcessing(processor);
     }
@@ -60,7 +103,18 @@ public class SimpleDistributionQueueProv
         when(scheduler.NOW(-1, 10)).thenReturn(options);
         when(options.canRunConcurrently(false)).thenReturn(options);
         when(options.name(any(String.class))).thenReturn(options);
-        SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent");
+        SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent", false);
+        simpledistributionQueueProvider.disableQueueProcessing();
+    }
+
+    @Test
+    public void testDisableQueueProcessingWithCheckpointing() throws Exception {
+        Scheduler scheduler = mock(Scheduler.class);
+        ScheduleOptions options = mock(ScheduleOptions.class);
+        when(scheduler.NOW(-1, 10)).thenReturn(options);
+        when(options.canRunConcurrently(false)).thenReturn(options);
+        when(options.name(any(String.class))).thenReturn(options);
+        SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent", true);
         simpledistributionQueueProvider.disableQueueProcessing();
     }
 }
\ No newline at end of file

Added: sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint?rev=1745362&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint Tue May 24 13:55:53 2016
@@ -0,0 +1 @@
+/var/sling/distribution/packages/default/data/dstrpck-1464090256589-70a3fc84-9568-4aeb-ba1d-ffc5affc4332 {"internal.request.startTime":"1464090250095","request.type":"ADD","request.deepPaths":"[Ljava.lang.String;@600b03df","internal.request.id":"DSTRQ1","request.paths":"[Ljava.lang.String;@4c16cdae","internal.request.user":"admin","package.type":"default"}UTF-8
\ No newline at end of file