You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2016/05/24 13:55:53 UTC
svn commit: r1745362 - in /sling/trunk/contrib/extensions/distribution/core:
./ src/main/java/org/apache/sling/distribution/agent/impl/
src/main/java/org/apache/sling/distribution/queue/impl/simple/
src/test/java/org/apache/sling/distribution/queue/imp...
Author: tommaso
Date: Tue May 24 13:55:53 2016
New Revision: 1745362
URL: http://svn.apache.org/viewvc?rev=1745362&view=rev
Log:
SLING-5748 - checkpointing memory queue made optional, updated forward agent factory
Added:
sling/trunk/contrib/extensions/distribution/core/src/test/resources/
sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint
Modified:
sling/trunk/contrib/extensions/distribution/core/pom.xml
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java
Modified: sling/trunk/contrib/extensions/distribution/core/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/pom.xml?rev=1745362&r1=1745361&r2=1745362&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/pom.xml (original)
+++ sling/trunk/contrib/extensions/distribution/core/pom.xml Tue May 24 13:55:53 2016
@@ -81,6 +81,15 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
</plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/*-checkpoint</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
</plugins>
</build>
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java?rev=1745362&r1=1745361&r2=1745362&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java Tue May 24 13:55:53 2016
@@ -51,7 +51,6 @@ import org.apache.sling.distribution.que
import org.apache.sling.distribution.queue.impl.ErrorQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.PriorityQueueDispatchingStrategy;
-import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueue;
import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueProvider;
import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
@@ -77,7 +76,7 @@ import org.slf4j.LoggerFactory;
@Reference(name = "triggers", referenceInterface = DistributionTrigger.class,
policy = ReferencePolicy.DYNAMIC, cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
bind = "bindDistributionTrigger", unbind = "unbindDistributionTrigger")
-@Property(name="webconsole.configurationFactory.nameHint", value="Agent name: {name}")
+@Property(name = "webconsole.configurationFactory.nameHint", value = "Agent name: {name}")
public class ForwardDistributionAgentFactory extends AbstractDistributionAgentFactory {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -166,7 +165,8 @@ public class ForwardDistributionAgentFac
@Property(options = {
@PropertyOption(name = JobHandlingDistributionQueueProvider.TYPE, value = "Sling Jobs"),
- @PropertyOption(name = SimpleDistributionQueueProvider.TYPE, value = "In-memory")},
+ @PropertyOption(name = SimpleDistributionQueueProvider.TYPE, value = "In-memory"),
+ @PropertyOption(name = SimpleDistributionQueueProvider.TYPE_CHECKPOINT, value = "In-file")},
value = "jobs",
label = "Queue provider", description = "The queue provider implementation."
)
@@ -231,10 +231,11 @@ public class ForwardDistributionAgentFac
DistributionQueueProvider queueProvider;
String queueProviderName = PropertiesUtil.toString(config.get(QUEUE_PROVIDER), JobHandlingDistributionQueueProvider.TYPE);
if (JobHandlingDistributionQueueProvider.TYPE.equals(queueProviderName)) {
- queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
- }
- else {
- queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName);
+ queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+ } else if (SimpleDistributionQueueProvider.TYPE.equals(queueProviderName)) {
+ queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, false);
+ } else {
+ queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, true);
}
DistributionQueueDispatchingStrategy exportQueueStrategy;
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java?rev=1745362&r1=1745361&r2=1745362&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java Tue May 24 13:55:53 2016
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
public class SimpleDistributionQueueProvider implements DistributionQueueProvider {
public static final String TYPE = "simple";
+ public static final String TYPE_CHECKPOINT = "simple-checkpoint";
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -57,23 +58,27 @@ public class SimpleDistributionQueueProv
private final Scheduler scheduler;
private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
+ private final boolean checkpoint;
private File checkpointDirectory;
- public SimpleDistributionQueueProvider(Scheduler scheduler, String name) {
+ public SimpleDistributionQueueProvider(Scheduler scheduler, String name, boolean checkpoint) {
+ this.checkpoint = checkpoint;
if (name == null || scheduler == null) {
throw new IllegalArgumentException("all arguments are required");
}
- this.checkpointDirectory = new File(name + "-simple-queues-checkpoints");
- log.info("creating checkpoint directory {}", checkpointDirectory.getAbsoluteFile());
- if (checkpointDirectory.exists() && !checkpointDirectory.isDirectory()) {
- assert checkpointDirectory.delete();
- }
- boolean created = false;
- if (!checkpointDirectory.exists()) {
- created = checkpointDirectory.mkdir();
+ if (checkpoint) {
+ this.checkpointDirectory = new File(name + "-simple-queues-checkpoints");
+ log.info("creating checkpoint directory {}", checkpointDirectory.getAbsoluteFile());
+ if (checkpointDirectory.exists() && !checkpointDirectory.isDirectory()) {
+ assert checkpointDirectory.delete();
+ }
+ boolean created = false;
+ if (!checkpointDirectory.exists()) {
+ created = checkpointDirectory.mkdir();
+ }
+ log.info("checkpoint directory created: {}, exists {}", created, checkpointDirectory.isDirectory() && checkpointDirectory.exists());
}
- log.info("checkpoint directory created: {}, exists {}", created, checkpointDirectory.isDirectory() && checkpointDirectory.exists());
this.scheduler = scheduler;
this.name = name;
@@ -99,43 +104,53 @@ public class SimpleDistributionQueueProv
public void enableQueueProcessing(@Nonnull DistributionQueueProcessor queueProcessor, String... queueNames) {
- // recover from checkpoints
- log.debug("recovering from checkpoints if needed");
- for (final String queueName : queueNames) {
- log.debug("recovering for queue {}", queueName);
- DistributionQueue queue = getQueue(queueName);
- FilenameFilter filenameFilter = new FilenameFilter() {
- @Override
- public boolean accept(File file, String name) {
- return name.equals(queueName + "-checkpoint");
- }
- };
- for (File qf : checkpointDirectory.listFiles(filenameFilter)) {
- log.info("recovering from checkpoint {}", qf);
- try {
- LineIterator lineIterator = IOUtils.lineIterator(new FileReader(qf));
- while (lineIterator.hasNext()) {
- String s = lineIterator.nextLine();
- String[] split = s.split(" ");
- String id = split[0];
- String infoString = split[1];
- Map<String, Object> info = new HashMap<String, Object>();
- JSONTokener jsonTokener = new JSONTokener(infoString);
- JSONObject jsonObject = new JSONObject(jsonTokener);
- Iterator<String> keys = jsonObject.keys();
- while (keys.hasNext()) {
- String key = keys.next();
- info.put(key, jsonObject.get(key));
+ if (checkpoint) {
+ // recover from checkpoints
+ log.debug("recovering from checkpoints if needed");
+ for (final String queueName : queueNames) {
+ log.debug("recovering for queue {}", queueName);
+ DistributionQueue queue = getQueue(queueName);
+ FilenameFilter filenameFilter = new FilenameFilter() {
+ @Override
+ public boolean accept(File file, String name) {
+ return name.equals(queueName + "-checkpoint");
+ }
+ };
+ for (File qf : checkpointDirectory.listFiles(filenameFilter)) {
+ log.info("recovering from checkpoint {}", qf);
+ try {
+ LineIterator lineIterator = IOUtils.lineIterator(new FileReader(qf));
+ while (lineIterator.hasNext()) {
+ String s = lineIterator.nextLine();
+ String[] split = s.split(" ");
+ String id = split[0];
+ String infoString = split[1];
+ Map<String, Object> info = new HashMap<String, Object>();
+ JSONTokener jsonTokener = new JSONTokener(infoString);
+ JSONObject jsonObject = new JSONObject(jsonTokener);
+ Iterator<String> keys = jsonObject.keys();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ info.put(key, jsonObject.get(key));
+ }
+ queue.add(new DistributionQueueItem(id, info));
}
- queue.add(new DistributionQueueItem(id, info));
+ log.info("recovered {} items from queue {}", queue.getStatus().getItemsCount(), queueName);
+ } catch (FileNotFoundException e) {
+ log.warn("could not read checkpoint file {}", qf.getAbsolutePath());
+ } catch (JSONException e) {
+ log.warn("could not parse info from checkpoint file {}", qf.getAbsolutePath());
}
- log.info("recovered {} items from queue {}", queue.getStatus().getItemsCount(), queueName);
- } catch (FileNotFoundException e) {
- log.warn("could not read checkpoint file {}", qf.getAbsolutePath());
- } catch (JSONException e) {
- log.warn("could not parse info from checkpoint file {}", qf.getAbsolutePath());
}
}
+
+ // enable checkpointing
+ for (String queueName : queueNames) {
+ ScheduleOptions options = scheduler.NOW(-1, 15)
+ .canRunConcurrently(false)
+ .name(getJobName(queueName + "-checkpoint"));
+ scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory), options);
+ }
}
// enable processing
@@ -146,27 +161,24 @@ public class SimpleDistributionQueueProv
scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor), options);
}
- // enable checkpointing
- for (String queueName : queueNames) {
- ScheduleOptions options = scheduler.NOW(-1, 15)
- .canRunConcurrently(false)
- .name(getJobName(queueName + "-checkpoint"));
- scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory), options);
- }
}
public void disableQueueProcessing() {
for (DistributionQueue queue : getQueues()) {
String queueName = queue.getName();
+ // disable queue processing
if (scheduler.unschedule(getJobName(queueName))) {
log.debug("queue processing on {} stopped", queue);
} else {
log.warn("could not disable queue processing on {}", queue);
}
- if (scheduler.unschedule(getJobName(queueName) + "-checkpoint")) {
- log.debug("checkpoint on {} stopped", queue);
- } else {
- log.warn("could not disable checkpoint on {}", queue);
+ if (checkpoint) {
+ // disable checkpointing
+ if (scheduler.unschedule(getJobName(queueName) + "-checkpoint")) {
+ log.debug("checkpoint on {} stopped", queue);
+ } else {
+ log.warn("could not disable checkpoint on {}", queue);
+ }
}
}
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java?rev=1745362&r1=1745361&r2=1745362&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProviderTest.java Tue May 24 13:55:53 2016
@@ -18,12 +18,16 @@
*/
package org.apache.sling.distribution.queue.impl.simple;
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.queue.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueProcessor;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@@ -36,7 +40,14 @@ public class SimpleDistributionQueueProv
@Test
public void testGetOrCreateQueue() throws Exception {
- SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(mock(Scheduler.class), "agentName");
+ SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(mock(Scheduler.class), "agentName", false);
+ DistributionQueue queue = simpledistributionQueueProvider.getQueue("default");
+ assertNotNull(queue);
+ }
+
+ @Test
+ public void testGetOrCreateQueueWithCheckpointing() throws Exception {
+ SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(mock(Scheduler.class), "agentName", true);
DistributionQueue queue = simpledistributionQueueProvider.getQueue("default");
assertNotNull(queue);
}
@@ -48,7 +59,39 @@ public class SimpleDistributionQueueProv
when(scheduler.NOW(-1, 1)).thenReturn(options);
when(options.canRunConcurrently(false)).thenReturn(options);
when(options.name(any(String.class))).thenReturn(options);
- SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent");
+ SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent", false);
+ DistributionQueueProcessor processor = mock(DistributionQueueProcessor.class);
+ simpledistributionQueueProvider.enableQueueProcessing(processor);
+ }
+
+ @Test
+ public void testEnableQueueProcessingWithCheckpointRecovery() throws Exception {
+ File checkpointDirectory = new File("dummy-agent-simple-queues-checkpoints");
+ File file = new File(getClass().getResource("/dummy-agent-checkpoint").getFile());
+ FileUtils.copyFileToDirectory(file, checkpointDirectory);
+
+ Scheduler scheduler = mock(Scheduler.class);
+ ScheduleOptions options = mock(ScheduleOptions.class);
+ when(scheduler.NOW(-1, 1)).thenReturn(options);
+ when(scheduler.NOW(-1, 15)).thenReturn(options);
+ when(options.canRunConcurrently(false)).thenReturn(options);
+ when(options.name(any(String.class))).thenReturn(options);
+ SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent", true);
+ DistributionQueueProcessor processor = mock(DistributionQueueProcessor.class);
+ simpledistributionQueueProvider.enableQueueProcessing(processor, "dummy-agent");
+ DistributionQueue queue = simpledistributionQueueProvider.getQueue("dummy-agent");
+ assertNotNull(queue);
+ assertEquals(1, queue.getStatus().getItemsCount());
+ }
+
+ @Test
+ public void testEnableQueueProcessingWithCheckpointing() throws Exception {
+ Scheduler scheduler = mock(Scheduler.class);
+ ScheduleOptions options = mock(ScheduleOptions.class);
+ when(scheduler.NOW(-1, 1)).thenReturn(options);
+ when(options.canRunConcurrently(false)).thenReturn(options);
+ when(options.name(any(String.class))).thenReturn(options);
+ SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent", true);
DistributionQueueProcessor processor = mock(DistributionQueueProcessor.class);
simpledistributionQueueProvider.enableQueueProcessing(processor);
}
@@ -60,7 +103,18 @@ public class SimpleDistributionQueueProv
when(scheduler.NOW(-1, 10)).thenReturn(options);
when(options.canRunConcurrently(false)).thenReturn(options);
when(options.name(any(String.class))).thenReturn(options);
- SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent");
+ SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent", false);
+ simpledistributionQueueProvider.disableQueueProcessing();
+ }
+
+ @Test
+ public void testDisableQueueProcessingWithCheckpointing() throws Exception {
+ Scheduler scheduler = mock(Scheduler.class);
+ ScheduleOptions options = mock(ScheduleOptions.class);
+ when(scheduler.NOW(-1, 10)).thenReturn(options);
+ when(options.canRunConcurrently(false)).thenReturn(options);
+ when(options.name(any(String.class))).thenReturn(options);
+ SimpleDistributionQueueProvider simpledistributionQueueProvider = new SimpleDistributionQueueProvider(scheduler, "dummy-agent", true);
simpledistributionQueueProvider.disableQueueProcessing();
}
}
\ No newline at end of file
Added: sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint?rev=1745362&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint Tue May 24 13:55:53 2016
@@ -0,0 +1 @@
+/var/sling/distribution/packages/default/data/dstrpck-1464090256589-70a3fc84-9568-4aeb-ba1d-ffc5affc4332 {"internal.request.startTime":"1464090250095","request.type":"ADD","request.deepPaths":"[Ljava.lang.String;@600b03df","internal.request.id":"DSTRQ1","request.paths":"[Ljava.lang.String;@4c16cdae","internal.request.user":"admin","package.type":"default"}UTF-8
\ No newline at end of file