You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2017/09/11 15:59:43 UTC

svn commit: r1808044 - in /sling/trunk/contrib/extensions/distribution/core/src: main/java/org/apache/sling/distribution/queue/impl/simple/ test/java/org/apache/sling/distribution/queue/impl/simple/ test/resources/

Author: tmaret
Date: Mon Sep 11 15:59:43 2017
New Revision: 1808044

URL: http://svn.apache.org/viewvc?rev=1808044&view=rev
Log:
SLING-7120 - Checkpoints not stored in file when using In-file queue

* Fix parsing for number types
* remove un-needed charset in checkpoints entries

Added:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapper.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapperTest.java
Modified:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueCheckpoint.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
    sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapper.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapper.java?rev=1808044&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapper.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapper.java Mon Sep 11 15:59:43 2017
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.impl.simple;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.text.NumberFormat;
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+import javax.json.JsonString;
+import javax.json.JsonValue;
+import javax.json.stream.JsonGenerator;
+
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+
+/**
+ * Serialize/Unserialize {@link DistributionQueueItem} items.
+ */
+public class QueueItemMapper {
+
+    DistributionQueueItem readQueueItem(String line) {
+        String[] split = line.split(" ", 2);
+        if (split.length != 2) {
+            throw new IllegalArgumentException("Invalid item found " + line);
+        }
+        String packageId = split[0];
+        String infoString = split[1];
+
+        Map<String, Object> info = new HashMap<String, Object>();
+
+        JsonReader reader = Json.createReader(new StringReader(infoString));
+        JsonObject jsonObject = reader.readObject();
+        NumberFormat numberFormat = NumberFormat.getInstance();
+
+        for (Map.Entry<String, JsonValue> entry : jsonObject.entrySet()) {
+            if (entry.getValue().getValueType().equals(JsonValue.ValueType.ARRAY)) {
+                JsonArray value = jsonObject.getJsonArray(entry.getKey());
+                String[] a = new String[value.size()];
+                for (int i = 0; i < a.length; i++) {
+                    a[i] = value.getString(i);
+                }
+                info.put(entry.getKey(), a);
+            } else if (JsonValue.NULL.equals(entry.getValue())) {
+                info.put(entry.getKey(), null);
+            } else if (entry.getValue().getValueType().equals(JsonValue.ValueType.NUMBER)) {
+                try {
+                    Number n = numberFormat.parse(entry.getValue().toString());
+                    info.put(entry.getKey(), n);
+                } catch (ParseException e) {
+                    throw new IllegalStateException("Failed to read queue item", e);
+                }
+            } else {
+                info.put(entry.getKey(), ((JsonString) entry.getValue()).getString());
+            }
+        }
+
+        return new DistributionQueueItem(packageId, info);
+    }
+
+    String writeQueueItem(DistributionQueueItem item) {
+        String packageId = item.getPackageId();
+        StringWriter w = new StringWriter();
+        JsonGenerator jsonWriter = Json.createGenerator(w);
+        jsonWriter.writeStartObject();
+        for (Map.Entry<String, Object> entry : item.entrySet()) {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+            if (value instanceof String[]) {
+                jsonWriter.writeStartArray(entry.getKey());
+                for (String s : ((String[]) value)) {
+                    jsonWriter.write(s);
+                }
+                jsonWriter.writeEnd();
+            } else if (value == null) {
+                jsonWriter.write(key, JsonValue.NULL);
+            } else if (value instanceof String) {
+                jsonWriter.write(key, (String) value);
+            } else if (value instanceof Boolean) {
+                jsonWriter.write(key, (Boolean) value);
+            } else if (value instanceof Integer) {
+                jsonWriter.write(key, (Integer) value);
+            } else if (value instanceof Float) {
+                jsonWriter.write(key, (Float) value);
+            }else if (value instanceof Double) {
+                jsonWriter.write(key, (Double) value);
+            } else if (value instanceof Long) {
+                jsonWriter.write(key, (Long) value);
+            } else {
+                jsonWriter.write(key, String.valueOf(value));
+            }
+        }
+        jsonWriter.writeEnd();
+        jsonWriter.close();
+
+        return packageId + " " + w.toString();
+    }
+}

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueCheckpoint.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueCheckpoint.java?rev=1808044&r1=1808043&r2=1808044&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueCheckpoint.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueCheckpoint.java Mon Sep 11 15:59:43 2017
@@ -20,14 +20,9 @@ package org.apache.sling.distribution.qu
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.StringWriter;
 import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.Map;
-
-import javax.json.Json;
-import javax.json.stream.JsonGenerator;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.queue.DistributionQueue;
@@ -55,6 +50,7 @@ class SimpleDistributionQueueCheckpoint
     public void run() {
         String fileName = queue.getName() + "-checkpoint";
         File checkpointFile = new File(checkpointDirectory, fileName + "-new");
+        QueueItemMapper mapper = new QueueItemMapper();
         log.debug("started checkpointing");
 
         try {
@@ -66,30 +62,11 @@ class SimpleDistributionQueueCheckpoint
             FileOutputStream fileOutputStream = new FileOutputStream(checkpointFile);
             for (DistributionQueueEntry queueEntry : queue.getItems(0, -1)) {
                 DistributionQueueItem item = queueEntry.getItem();
-                String packageId = item.getPackageId();
-                StringWriter w = new StringWriter();
-                JsonGenerator jsonWriter = Json.createGenerator(w);
-                jsonWriter.writeStartObject();
-                for (Map.Entry<String, Object> entry : item.entrySet()) {
-                    
-                    Object value = entry.getValue();
-                    boolean isArray = value instanceof String[];
-                    if (isArray) {
-                        jsonWriter.writeStartArray(entry.getKey());
-                        for (String s : ((String[]) value)) {
-                            jsonWriter.write(s);
-                        }
-                        jsonWriter.writeEnd();
-                    } else {
-                        jsonWriter.write(entry.getKey(), (String) value);
-                    }
-                }
-                jsonWriter.writeEnd();
-                jsonWriter.close();
-                lines.add(packageId + " " + w.toString());
+                String line = mapper.writeQueueItem(item);
+                lines.add(line);
             }
             log.debug("parsed {} items", lines.size());
-            IOUtils.writeLines(lines, Charset.defaultCharset().name(), fileOutputStream, Charset.defaultCharset());
+            IOUtils.writeLines(lines, null, fileOutputStream, Charset.defaultCharset());
             fileOutputStream.flush();
             fileOutputStream.close();
             boolean success = checkpointFile.renameTo(new File(checkpointDirectory, fileName));

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java?rev=1808044&r1=1808043&r2=1808044&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java Mon Sep 11 15:59:43 2017
@@ -19,21 +19,13 @@
 package org.apache.sling.distribution.queue.impl.simple;
 
 import javax.annotation.Nonnull;
-import javax.json.Json;
-import javax.json.JsonArray;
 import javax.json.JsonException;
-import javax.json.JsonObject;
-import javax.json.JsonReader;
-import javax.json.JsonString;
-import javax.json.JsonValue;
 
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.FilenameFilter;
-import java.io.StringReader;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -117,6 +109,7 @@ public class SimpleDistributionQueueProv
 
         if (checkpoint) {
             // recover from checkpoints
+            QueueItemMapper mapper = new QueueItemMapper();
             log.debug("recovering from checkpoints if needed");
             for (final String queueName : queueNames) {
                 log.debug("recovering for queue {}", queueName);
@@ -132,32 +125,9 @@ public class SimpleDistributionQueueProv
                     try {
                         LineIterator lineIterator = IOUtils.lineIterator(new FileReader(qf));
                         while (lineIterator.hasNext()) {
-                            String s = lineIterator.nextLine();
-                            String[] split = s.split(" ");
-                            String id = split[0];
-                            String infoString = split[1];
-                            Map<String, Object> info = new HashMap<String, Object>();
-                            JsonReader reader = Json.createReader(new StringReader(infoString));
-                            JsonObject jsonObject = reader.readObject();
-                            for (Map.Entry<String, JsonValue> entry : jsonObject.entrySet()) {
-                                if (entry.getValue().getValueType().equals(JsonValue.ValueType.ARRAY))
-                                {
-                                    JsonArray value = jsonObject.getJsonArray(entry.getKey());
-                                    String[] a = new String[value.size()];
-                                    for (int i = 0; i < a.length; i++) {
-                                        a[i] = value.getString(i);
-                                    }
-                                    info.put(entry.getKey(), a);
-                                }
-                                else if (JsonValue.NULL.equals(entry.getValue())) {
-                                    info.put(entry.getKey(), null);
-                                }
-                                else
-                                {
-                                    info.put(entry.getKey(), ((JsonString) entry.getValue()).getString());
-                                }
-                            }
-                            queue.add(new DistributionQueueItem(id, info));
+                            String line  = lineIterator.nextLine();
+                            DistributionQueueItem item = mapper.readQueueItem(line);
+                            queue.add(item);
                         }
                         log.info("recovered {} items from queue {}", queue.getStatus().getItemsCount(), queueName);
                     } catch (FileNotFoundException e) {

Added: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapperTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapperTest.java?rev=1808044&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapperTest.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/simple/QueueItemMapperTest.java Mon Sep 11 15:59:43 2017
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.impl.simple;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class QueueItemMapperTest {
+
+    @Test
+    public void testReadWriteQueueItem() throws Exception {
+        QueueItemMapper mapper = new QueueItemMapper();
+        Map<String, Object> properties = new HashMap<String, Object>();
+        properties.put("string-1", "some string");
+        properties.put("null", null);
+        properties.put("long", 200L);
+        properties.put("double", 300.3d);
+        DistributionQueueItem expected = new DistributionQueueItem("packageId", properties);
+        DistributionQueueItem actual = mapper.readQueueItem(mapper.writeQueueItem(expected));
+        assertEquals(expected.getPackageId(), actual.getPackageId());
+        for (Map.Entry<String, Object> entry : actual.entrySet()) {
+            assertEquals(expected.get(entry.getKey()), entry.getValue());
+        }
+    }
+}
\ No newline at end of file

Modified: sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint?rev=1808044&r1=1808043&r2=1808044&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/resources/dummy-agent-checkpoint Mon Sep 11 15:59:43 2017
@@ -1 +1 @@
-/var/sling/distribution/packages/default/data/dstrpck-1464090256589-70a3fc84-9568-4aeb-ba1d-ffc5affc4332 {"internal.request.startTime":"1464090250095","request.type":"ADD","request.deepPaths":["/foo"],"internal.request.id":"DSTRQ1","request.paths":["/foo","bar"],"internal.request.user":"admin","package.type":"default"} UTF-8
\ No newline at end of file
+/var/sling/distribution/packages/default/data/dstrpck-1464090256589-70a3fc84-9568-4aeb-ba1d-ffc5affc4332 {"internal.request.startTime":"1464090250095","request.type":"ADD","request.deepPaths":["/foo"],"internal.request.id":"DSTRQ1","request.paths":["/foo","bar"],"internal.request.user":"admin","package.type":"default"}
\ No newline at end of file