You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2017/09/11 15:59:43 UTC
svn commit: r1808044 - in
/sling/trunk/contrib/extensions/distribution/core/src:
main/java/org/apache/sling/distribution/queue/impl/simple/
test/java/org/apache/sling/distribution/queue/impl/simple/ test/resources/
Author: tmaret
Date: Mon Sep 11 15:59:43 2017
New Revision: 1808044
URL: http://svn.apache.org/viewvc?rev=1808044&view=rev
Log:
SLING-7120 - Checkpoints not stored in file when using In-file queue
* Fix parsing for number types
* remove un-needed charset in checkpoints entries
Added:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapper.java
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapperTest.java
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueCheckpoint.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint
Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapper.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapper.java?rev=1808044&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapper.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapper.java Mon Sep 11 15:59:43 2017
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.impl.simple;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.text.NumberFormat;
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+import javax.json.JsonString;
+import javax.json.JsonValue;
+import javax.json.stream.JsonGenerator;
+
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+
+/**
+ * Serialize/Unserialize {@link DistributionQueueItem} items.
+ */
+public class QueueItemMapper {
+
+ DistributionQueueItem readQueueItem(String line) {
+ String[] split = line.split(" ", 2);
+ if (split.length != 2) {
+ throw new IllegalArgumentException("Invalid item found " + line);
+ }
+ String packageId = split[0];
+ String infoString = split[1];
+
+ Map<String, Object> info = new HashMap<String, Object>();
+
+ JsonReader reader = Json.createReader(new StringReader(infoString));
+ JsonObject jsonObject = reader.readObject();
+ NumberFormat numberFormat = NumberFormat.getInstance();
+
+ for (Map.Entry<String, JsonValue> entry : jsonObject.entrySet()) {
+ if (entry.getValue().getValueType().equals(JsonValue.ValueType.ARRAY)) {
+ JsonArray value = jsonObject.getJsonArray(entry.getKey());
+ String[] a = new String[value.size()];
+ for (int i = 0; i < a.length; i++) {
+ a[i] = value.getString(i);
+ }
+ info.put(entry.getKey(), a);
+ } else if (JsonValue.NULL.equals(entry.getValue())) {
+ info.put(entry.getKey(), null);
+ } else if (entry.getValue().getValueType().equals(JsonValue.ValueType.NUMBER)) {
+ try {
+ Number n = numberFormat.parse(entry.getValue().toString());
+ info.put(entry.getKey(), n);
+ } catch (ParseException e) {
+ throw new IllegalStateException("Failed to read queue item", e);
+ }
+ } else {
+ info.put(entry.getKey(), ((JsonString) entry.getValue()).getString());
+ }
+ }
+
+ return new DistributionQueueItem(packageId, info);
+ }
+
+ String writeQueueItem(DistributionQueueItem item) {
+ String packageId = item.getPackageId();
+ StringWriter w = new StringWriter();
+ JsonGenerator jsonWriter = Json.createGenerator(w);
+ jsonWriter.writeStartObject();
+ for (Map.Entry<String, Object> entry : item.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ if (value instanceof String[]) {
+ jsonWriter.writeStartArray(entry.getKey());
+ for (String s : ((String[]) value)) {
+ jsonWriter.write(s);
+ }
+ jsonWriter.writeEnd();
+ } else if (value == null) {
+ jsonWriter.write(key, JsonValue.NULL);
+ } else if (value instanceof String) {
+ jsonWriter.write(key, (String) value);
+ } else if (value instanceof Boolean) {
+ jsonWriter.write(key, (Boolean) value);
+ } else if (value instanceof Integer) {
+ jsonWriter.write(key, (Integer) value);
+ } else if (value instanceof Float) {
+ jsonWriter.write(key, (Float) value);
+ }else if (value instanceof Double) {
+ jsonWriter.write(key, (Double) value);
+ } else if (value instanceof Long) {
+ jsonWriter.write(key, (Long) value);
+ } else {
+ jsonWriter.write(key, String.valueOf(value));
+ }
+ }
+ jsonWriter.writeEnd();
+ jsonWriter.close();
+
+ return packageId + " " + w.toString();
+ }
+}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueCheckpoint.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueCheckpoint.java?rev=1808044&r1=1808043&r2=1808044&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueCheckpoint.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueCheckpoint.java Mon Sep 11 15:59:43 2017
@@ -20,14 +20,9 @@ package org.apache.sling.distribution.qu
import java.io.File;
import java.io.FileOutputStream;
-import java.io.StringWriter;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.LinkedList;
-import java.util.Map;
-
-import javax.json.Json;
-import javax.json.stream.JsonGenerator;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.queue.DistributionQueue;
@@ -55,6 +50,7 @@ class SimpleDistributionQueueCheckpoint
public void run() {
String fileName = queue.getName() + "-checkpoint";
File checkpointFile = new File(checkpointDirectory, fileName + "-new");
+ QueueItemMapper mapper = new QueueItemMapper();
log.debug("started checkpointing");
try {
@@ -66,30 +62,11 @@ class SimpleDistributionQueueCheckpoint
FileOutputStream fileOutputStream = new FileOutputStream(checkpointFile);
for (DistributionQueueEntry queueEntry : queue.getItems(0, -1)) {
DistributionQueueItem item = queueEntry.getItem();
- String packageId = item.getPackageId();
- StringWriter w = new StringWriter();
- JsonGenerator jsonWriter = Json.createGenerator(w);
- jsonWriter.writeStartObject();
- for (Map.Entry<String, Object> entry : item.entrySet()) {
-
- Object value = entry.getValue();
- boolean isArray = value instanceof String[];
- if (isArray) {
- jsonWriter.writeStartArray(entry.getKey());
- for (String s : ((String[]) value)) {
- jsonWriter.write(s);
- }
- jsonWriter.writeEnd();
- } else {
- jsonWriter.write(entry.getKey(), (String) value);
- }
- }
- jsonWriter.writeEnd();
- jsonWriter.close();
- lines.add(packageId + " " + w.toString());
+ String line = mapper.writeQueueItem(item);
+ lines.add(line);
}
log.debug("parsed {} items", lines.size());
- IOUtils.writeLines(lines, Charset.defaultCharset().name(), fileOutputStream, Charset.defaultCharset());
+ IOUtils.writeLines(lines, null, fileOutputStream, Charset.defaultCharset());
fileOutputStream.flush();
fileOutputStream.close();
boolean success = checkpointFile.renameTo(new File(checkpointDirectory, fileName));
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java?rev=1808044&r1=1808043&r2=1808044&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java Mon Sep 11 15:59:43 2017
@@ -19,21 +19,13 @@
package org.apache.sling.distribution.queue.impl.simple;
import javax.annotation.Nonnull;
-import javax.json.Json;
-import javax.json.JsonArray;
import javax.json.JsonException;
-import javax.json.JsonObject;
-import javax.json.JsonReader;
-import javax.json.JsonString;
-import javax.json.JsonValue;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FilenameFilter;
-import java.io.StringReader;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -117,6 +109,7 @@ public class SimpleDistributionQueueProv
if (checkpoint) {
// recover from checkpoints
+ QueueItemMapper mapper = new QueueItemMapper();
log.debug("recovering from checkpoints if needed");
for (final String queueName : queueNames) {
log.debug("recovering for queue {}", queueName);
@@ -132,32 +125,9 @@ public class SimpleDistributionQueueProv
try {
LineIterator lineIterator = IOUtils.lineIterator(new FileReader(qf));
while (lineIterator.hasNext()) {
- String s = lineIterator.nextLine();
- String[] split = s.split(" ");
- String id = split[0];
- String infoString = split[1];
- Map<String, Object> info = new HashMap<String, Object>();
- JsonReader reader = Json.createReader(new StringReader(infoString));
- JsonObject jsonObject = reader.readObject();
- for (Map.Entry<String, JsonValue> entry : jsonObject.entrySet()) {
- if (entry.getValue().getValueType().equals(JsonValue.ValueType.ARRAY))
- {
- JsonArray value = jsonObject.getJsonArray(entry.getKey());
- String[] a = new String[value.size()];
- for (int i = 0; i < a.length; i++) {
- a[i] = value.getString(i);
- }
- info.put(entry.getKey(), a);
- }
- else if (JsonValue.NULL.equals(entry.getValue())) {
- info.put(entry.getKey(), null);
- }
- else
- {
- info.put(entry.getKey(), ((JsonString) entry.getValue()).getString());
- }
- }
- queue.add(new DistributionQueueItem(id, info));
+ String line = lineIterator.nextLine();
+ DistributionQueueItem item = mapper.readQueueItem(line);
+ queue.add(item);
}
log.info("recovered {} items from queue {}", queue.getStatus().getItemsCount(), queueName);
} catch (FileNotFoundException e) {
Added: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapperTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapperTest.java?rev=1808044&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapperTest.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapperTest.java Mon Sep 11 15:59:43 2017
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.impl.simple;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class QueueItemMapperTest {
+
+ @Test
+ public void testReadWriteQueueItem() throws Exception {
+ QueueItemMapper mapper = new QueueItemMapper();
+ Map<String, Object> properties = new HashMap<String, Object>();
+ properties.put("string-1", "some string");
+ properties.put("null", null);
+ properties.put("long", 200L);
+ properties.put("double", 300.3d);
+ DistributionQueueItem expected = new DistributionQueueItem("packageId", properties);
+ DistributionQueueItem actual = mapper.readQueueItem(mapper.writeQueueItem(expected));
+ assertEquals(expected.getPackageId(), actual.getPackageId());
+ for (Map.Entry<String, Object> entry : actual.entrySet()) {
+ assertEquals(expected.get(entry.getKey()), entry.getValue());
+ }
+ }
+}
\ No newline at end of file
Modified: sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint?rev=1808044&r1=1808043&r2=1808044&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint Mon Sep 11 15:59:43 2017
@@ -1 +1 @@
-/var/sling/distribution/packages/default/data/dstrpck-1464090256589-70a3fc84-9568-4aeb-ba1d-ffc5affc4332 {"internal.request.startTime":"1464090250095","request.type":"ADD","request.deepPaths":["/foo"],"internal.request.id":"DSTRQ1","request.paths":["/foo","bar"],"internal.request.user":"admin","package.type":"default"} UTF-8
\ No newline at end of file
+/var/sling/distribution/packages/default/data/dstrpck-1464090256589-70a3fc84-9568-4aeb-ba1d-ffc5affc4332 {"internal.request.startTime":"1464090250095","request.type":"ADD","request.deepPaths":["/foo"],"internal.request.id":"DSTRQ1","request.paths":["/foo","bar"],"internal.request.user":"admin","package.type":"default"}
\ No newline at end of file