You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/16 17:43:41 UTC

[camel] 01/02: CAMEL-12148: Reworked FileIdempontentRepository so LRUCache is only act as quick lookup. And in case of 1st-level miss the file store is checked. File store also writes the entries in the same order they are added.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1a9f3ecb9e973deded01ab465baca26977a95db4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Jan 16 15:18:04 2018 +0100

    CAMEL-12148: Reworked FileIdempontentRepository so LRUCache is only act as quick lookup. And in case of 1st-level miss the file store is checked. File store also writes the entries in the same order they are added.
---
 .../idempotent/FileIdempotentRepository.java       | 254 +++++++++++++++++----
 .../processor/FileIdempotentTrunkStoreTest.java    |  15 ++
 .../FileIdempotentStoreOrderingTest.java           | 151 ++++++++++++
 3 files changed, 375 insertions(+), 45 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
index 4343331..339838b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
@@ -19,6 +19,8 @@ package org.apache.camel.processor.idempotent;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,13 +41,13 @@ import org.slf4j.LoggerFactory;
 /**
  * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
  * <p/>
- * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
- * memory leak.
- * <p/>
- * The default cache used is {@link LRUCache} which keeps the most used entries in the cache.
- * When this cache is being used and the state of the cache is stored to file via {@link #trunkStore()}
- * then the entries stored are not guaranteed to be in the exact order the entries were added to the cache.
- * If you need exact ordering, then you need to provide a custom {@link Map} implementation that does that
+ * This implementation provides a 1st-level in-memory {@link LRUCache} for fast check of the most
+ * frequently used keys. When {@link #add(String)} or {@link #contains(String)} methods are being used
+ * then in case of 1st-level cache miss, the underlying file is scanned which may cost additional performance.
+ * So try to find the right balance of the size of the 1st-level cache, the default size is 1000.
+ * The file store has a maximum capacity of 32mb by default. If the file store grows bigger, then
+ * the {@link #getDropOldestFileStore()} number of entries from the file store is dropped to reduce
+ * the file store and make room for newer entries.
  *
  * @version 
  */
@@ -53,10 +55,13 @@ import org.slf4j.LoggerFactory;
 public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
     private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
     private static final String STORE_DELIMITER = "\n";
+
+    private final AtomicBoolean init = new AtomicBoolean();
+
     private Map<String, Object> cache;
     private File fileStore;
-    private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
-    private final AtomicBoolean init = new AtomicBoolean();
+    private long maxFileStoreSize = 32 * 1024 * 1000L; // 32mb store file
+    private long dropOldestFileStore = 1000;
 
     public FileIdempotentRepository() {
     }
@@ -123,12 +128,21 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
             if (cache.containsKey(key)) {
                 return false;
             } else {
+                // always register the most used keys in the LRUCache
                 cache.put(key, key);
-                if (fileStore.length() < maxFileStoreSize) {
-                    // just append to store
-                    appendToStore(key);
-                } else {
-                    // trunk store and flush the cache
+
+                // now check the file store
+                boolean containsInFile = containsStore(key);
+                if (containsInFile) {
+                    return false;
+                }
+
+                // its a new key so append to file store
+                appendToStore(key);
+
+                // check if we hit maximum capacity and report a warning about this
+                if (fileStore.length() > maxFileStoreSize) {
+                    LOG.warn("Maximum capacity of file store: {} hit at {} bytes. Dropping {} oldest entries from the file store", fileStore, maxFileStoreSize, dropOldestFileStore);
                     trunkStore();
                 }
 
@@ -140,7 +154,8 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
     @ManagedOperation(description = "Does the store contain the given key")
     public boolean contains(String key) {
         synchronized (cache) {
-            return cache.containsKey(key);
+            // check 1st-level first and then fallback to check the actual file
+            return cache.containsKey(key) || containsStore(key);
         }
     }
 
@@ -149,8 +164,8 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
         boolean answer;
         synchronized (cache) {
             answer = cache.remove(key) != null;
-            // trunk store and flush the cache on remove
-            trunkStore();
+            // remove from file cache also
+            removeFromStore(key);
         }
         return answer;
     }
@@ -160,13 +175,15 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
         return true;
     }
     
-    @ManagedOperation(description = "Clear the store")
+    @ManagedOperation(description = "Clear the store (danger this removes all entries)")
     public void clear() {
         synchronized (cache) {
             cache.clear();
             if (cache instanceof LRUCache) {
                 ((LRUCache) cache).cleanUp();
             }
+            // clear file store
+            clearStore();
         }
     }
 
@@ -199,15 +216,30 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
     /**
      * Sets the maximum file size for the file store in bytes.
      * <p/>
-     * The default is 1mb.
+     * The default is 32mb.
      */
     @ManagedAttribute(description = "The maximum file size for the file store in bytes")
     public void setMaxFileStoreSize(long maxFileStoreSize) {
         this.maxFileStoreSize = maxFileStoreSize;
     }
 
+    public long getDropOldestFileStore() {
+        return dropOldestFileStore;
+    }
+
+    /**
+     * Sets the number of oldest entries to drop from the file store when the maximum capacity is hit to reduce
+     * disk space to allow room for new entries.
+     * <p/>
+     * The default is 1000.
+     */
+    @ManagedAttribute(description = "Number of oldest elements to drop from file store if maximum file size reached")
+    public void setDropOldestFileStore(long dropOldestFileStore) {
+        this.dropOldestFileStore = dropOldestFileStore;
+    }
+
     /**
-     * Sets the cache size.
+     * Sets the 1st-level cache size.
      *
      * Setting cache size is only possible when using the default {@link LRUCache} cache implementation.
      */
@@ -222,7 +254,7 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
         cache = LRUCacheFactory.newLRUCache(size);
     }
 
-    @ManagedAttribute(description = "The current cache size")
+    @ManagedAttribute(description = "The current 1st-level cache size")
     public int getCacheSize() {
         if (cache != null) {
             return cache.size();
@@ -231,28 +263,58 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
     }
 
     /**
-     * Reset and clears the store to force it to reload from file
+     * Reset and clears the 1st-level cache to force it to reload from file
      */
     @ManagedOperation(description = "Reset and reloads the file store")
     public synchronized void reset() throws IOException {
         synchronized (cache) {
-            // trunk and clear, before we reload the store
-            trunkStore();
-            cache.clear();
+            // run the cleanup task first
             if (cache instanceof LRUCache) {
                 ((LRUCache) cache).cleanUp();
             }
+            cache.clear();
             loadStore();
         }
     }
 
     /**
-     * Appends the given message id to the file store
+     * Checks the file store if the key exists
      *
-     * @param messageId  the message id
+     * @param key  the key
+     * @return <tt>true</tt> if exists in the file, <tt>false</tt> otherwise
      */
-    protected void appendToStore(final String messageId) {
-        LOG.debug("Appending {} to idempotent filestore: {}", messageId, fileStore);
+    protected boolean containsStore(final String key) {
+        if (fileStore == null || !fileStore.exists()) {
+            return false;
+        }
+
+        Scanner scanner = null;
+        try {
+            scanner = new Scanner(fileStore);
+            scanner.useDelimiter(STORE_DELIMITER);
+            while (scanner.hasNextLine()) {
+                String line = scanner.nextLine();
+                if (line.equals(key)) {
+                    return true;
+                }
+            }
+        } catch (IOException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            if (scanner != null) {
+                scanner.close();
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Appends the given key to the file store
+     *
+     * @param key  the key
+     */
+    protected void appendToStore(final String key) {
+        LOG.debug("Appending: {} to idempotent filestore: {}", key, fileStore);
         FileOutputStream fos = null;
         try {
             // create store parent directory if missing
@@ -260,9 +322,9 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
             if (storeParentDirectory != null && !storeParentDirectory.exists()) {
                 LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore);
                 if (fileStore.getParentFile().mkdirs()) {
-                    LOG.info("Parent directory of file store {} successfully created.", fileStore);
+                    LOG.info("Parent directory of filestore: {} successfully created.", fileStore);
                 } else {
-                    LOG.warn("Parent directory of file store {} cannot be created.", fileStore);
+                    LOG.warn("Parent directory of filestore: {} cannot be created.", fileStore);
                 }
             }
             // create store if missing
@@ -271,7 +333,7 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
             }
             // append to store
             fos = new FileOutputStream(fileStore, true);
-            fos.write(messageId.getBytes());
+            fos.write(key.getBytes());
             fos.write(STORE_DELIMITER.getBytes());
         } catch (IOException e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);
@@ -280,23 +342,125 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
         }
     }
 
+    protected synchronized void removeFromStore(String key) {
+        LOG.debug("Removing: {} from idempotent filestore: {}", key, fileStore);
+
+        // we need to re-load the entire file and remove the key and then re-write the file
+        List<String> lines = new ArrayList<>();
+
+        boolean found = false;
+        Scanner scanner = null;
+        try {
+            scanner = new Scanner(fileStore);
+            scanner.useDelimiter(STORE_DELIMITER);
+            while (scanner.hasNextLine()) {
+                String line = scanner.nextLine();
+                if (key.equals(line)) {
+                    found = true;
+                } else {
+                    lines.add(line);
+                }
+            }
+        } catch (IOException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            if (scanner != null) {
+                scanner.close();
+            }
+        }
+
+        if (found) {
+            // rewrite file
+            LOG.debug("Rewriting idempotent filestore: {} due to key: {} removed", fileStore, key);
+            FileOutputStream fos = null;
+            try {
+                fos = new FileOutputStream(fileStore);
+                for (String line : lines) {
+                    fos.write(line.getBytes());
+                    fos.write(STORE_DELIMITER.getBytes());
+                }
+            } catch (IOException e) {
+                throw ObjectHelper.wrapRuntimeCamelException(e);
+            } finally {
+                IOHelper.close(fos, "Rewriting file idempotent repository", LOG);
+            }
+        }
+    }
+
     /**
-     * Trunks the file store when the max store size is hit by rewriting the 1st level cache
-     * to the file store.
+     * Clears the file-store (danger this deletes all entries)
      */
-    protected void trunkStore() {
-        LOG.info("Trunking idempotent filestore: {}", fileStore);
-        FileOutputStream fos = null;
+    protected void clearStore() {
+        try {
+            FileUtil.deleteFile(fileStore);
+            FileUtil.createNewFile(fileStore);
+        } catch (IOException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    /**
+     * Trunks the file store when the max store size is hit by dropping the most oldest entries.
+     */
+    protected synchronized void trunkStore() {
+        if (fileStore == null || !fileStore.exists()) {
+            return;
+        }
+
+        LOG.debug("Trunking: {} oldest entries from idempotent filestore: {}", dropOldestFileStore, fileStore);
+
+        // we need to re-load the entire file and remove the key and then re-write the file
+        List<String> lines = new ArrayList<>();
+
+        Scanner scanner = null;
+        int count = 0;
         try {
-            fos = new FileOutputStream(fileStore);
-            for (String key : cache.keySet()) {
-                fos.write(key.getBytes());
-                fos.write(STORE_DELIMITER.getBytes());
+            scanner = new Scanner(fileStore);
+            scanner.useDelimiter(STORE_DELIMITER);
+            while (scanner.hasNextLine()) {
+                String line = scanner.nextLine();
+                count++;
+                if (count > dropOldestFileStore) {
+                    lines.add(line);
+                }
             }
         } catch (IOException e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);
         } finally {
-            IOHelper.close(fos, "Trunking file idempotent repository", LOG);
+            if (scanner != null) {
+                scanner.close();
+            }
+        }
+
+        if (!lines.isEmpty()) {
+            // rewrite file
+            LOG.debug("Rewriting idempotent filestore: {} with {} entries:", fileStore, lines.size());
+            FileOutputStream fos = null;
+            try {
+                fos = new FileOutputStream(fileStore);
+                for (String line : lines) {
+                    fos.write(line.getBytes());
+                    fos.write(STORE_DELIMITER.getBytes());
+                }
+            } catch (IOException e) {
+                throw ObjectHelper.wrapRuntimeCamelException(e);
+            } finally {
+                IOHelper.close(fos, "Rewriting file idempotent repository", LOG);
+            }
+        } else {
+            // its a small file so recreate the file
+            LOG.debug("Clearing idempotent filestore: {}", fileStore);
+            clearStore();
+        }
+    }
+
+    /**
+     * Cleanup the 1st-level cache.
+     */
+    protected void cleanup() {
+        // run the cleanup task first
+        if (cache instanceof LRUCache) {
+            ((LRUCache) cache).cleanUp();
         }
     }
 
@@ -357,12 +521,12 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
 
     @Override
     protected void doStop() throws Exception {
-        // reset will trunk and clear the cache
-        trunkStore();
-        cache.clear();
+        // run the cleanup task first
         if (cache instanceof LRUCache) {
             ((LRUCache) cache).cleanUp();
         }
+
+        cache.clear();
         init.set(false);
     }
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java b/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java
index be3dc36..0055d57 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java
@@ -17,6 +17,10 @@
 package org.apache.camel.processor;
 
 import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
@@ -27,6 +31,8 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.idempotent.FileIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.Assert;
 
 /**
  * @version 
@@ -60,6 +66,15 @@ public class FileIdempotentTrunkStoreTest extends ContextTestSupport {
         resultEndpoint.assertIsSatisfied();
 
         assertTrue(repo.contains("XXXXXXXXXX"));
+
+        // check the file should only have the last 2 entries as it was trunked
+        Stream<String> fileContent = Files.lines(store.toPath());
+        List<String> fileEntries = fileContent.collect(Collectors.toList());
+        fileContent.close();
+        //expected order
+        Assert.assertThat(fileEntries, IsIterableContainingInOrder.contains(
+            "ZZZZZZZZZZ",
+            "XXXXXXXXXX"));
     }
 
     protected void sendMessage(final Object messageId, final Object body) {
diff --git a/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentStoreOrderingTest.java b/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentStoreOrderingTest.java
new file mode 100644
index 0000000..da7d572
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentStoreOrderingTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.camel.TestSupport.createDirectory;
+import static org.apache.camel.TestSupport.deleteDirectory;
+
+public class FileIdempotentStoreOrderingTest {
+
+    private FileIdempotentRepository fileIdempotentRepository;
+    private List<String> files;
+
+    @Before
+    public void setup() {
+        files = Arrays.asList(
+            "file1.txt.20171123",
+            "file2.txt.20171123",
+            "file1.txt.20171124",
+            "file3.txt.20171125",
+            "file2.txt.20171126",
+            "fixed.income.lamr.out.20171126",
+            "pricing.px.20171126",
+            "test.out.20171126",
+            "processing.source.lamr.out.20171126");
+        this.fileIdempotentRepository = new FileIdempotentRepository();
+    }
+
+    @Test
+    public void testTrunkStoreNotMaxHit() throws Exception {
+        // ensure empty folder
+        deleteDirectory("target/mystore");
+        createDirectory("target/mystore");
+
+        //given
+        File fileStore = new File("target/mystore/data.dat");
+        fileIdempotentRepository.setFileStore(fileStore);
+        fileIdempotentRepository.setCacheSize(10);
+        fileIdempotentRepository.start();
+        files.forEach(e -> fileIdempotentRepository.add(e));
+
+        //when (will rebalance)
+        fileIdempotentRepository.stop();
+
+        //then
+        Stream<String> fileContent = Files.lines(fileStore.toPath());
+        List<String> fileEntries = fileContent.collect(Collectors.toList());
+        fileContent.close();
+        //expected order
+        Assert.assertThat(fileEntries, IsIterableContainingInOrder.contains(
+            "file1.txt.20171123",
+            "file2.txt.20171123",
+            "file1.txt.20171124",
+            "file3.txt.20171125",
+            "file2.txt.20171126",
+            "fixed.income.lamr.out.20171126",
+            "pricing.px.20171126",
+            "test.out.20171126",
+            "processing.source.lamr.out.20171126"));
+    }
+
+    @Test
+    public void testTrunkStoreFirstLevelMaxHit() throws Exception {
+        // ensure empty folder
+        deleteDirectory("target/mystore");
+        createDirectory("target/mystore");
+
+        //given
+        File fileStore = new File("target/mystore/data.dat");
+        fileIdempotentRepository.setFileStore(fileStore);
+        fileIdempotentRepository.setCacheSize(5);
+        fileIdempotentRepository.start();
+        files.forEach(e -> fileIdempotentRepository.add(e));
+
+        //when (will rebalance)
+        fileIdempotentRepository.stop();
+
+        //then
+        Stream<String> fileContent = Files.lines(fileStore.toPath());
+        List<String> fileEntries = fileContent.collect(Collectors.toList());
+        fileContent.close();
+        //expected order
+        Assert.assertThat(fileEntries, IsIterableContainingInOrder.contains(
+            "file1.txt.20171123",
+            "file2.txt.20171123",
+            "file1.txt.20171124",
+            "file3.txt.20171125",
+            "file2.txt.20171126",
+            "fixed.income.lamr.out.20171126",
+            "pricing.px.20171126",
+            "test.out.20171126",
+            "processing.source.lamr.out.20171126"));
+    }
+
+    @Test
+    public void testTrunkStoreFileMaxHit() throws Exception {
+        // ensure empty folder
+        deleteDirectory("target/mystore");
+        createDirectory("target/mystore");
+
+        //given
+        File fileStore = new File("target/mystore/data.dat");
+        fileIdempotentRepository.setFileStore(fileStore);
+        fileIdempotentRepository.setCacheSize(5);
+        fileIdempotentRepository.setMaxFileStoreSize(128);
+        fileIdempotentRepository.setDropOldestFileStore(1000);
+
+        fileIdempotentRepository.start();
+        files.forEach(e -> fileIdempotentRepository.add(e));
+
+        // force cleanup and trunk
+        fileIdempotentRepository.cleanup();
+        fileIdempotentRepository.trunkStore();
+
+        fileIdempotentRepository.stop();
+
+        //then
+        Stream<String> fileContent = Files.lines(fileStore.toPath());
+        List<String> fileEntries = fileContent.collect(Collectors.toList());
+        fileContent.close();
+
+        // all old entries is removed
+        Assert.assertEquals(0, fileEntries.size());
+    }
+
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.