You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/16 17:43:41 UTC
[camel] 01/02: CAMEL-12148: Reworked FileIdempontentRepository so
LRUCache is only act as quick lookup. And in case of 1st-level miss the
file store is checked. File store also writes the entries in the same order
they are added.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 1a9f3ecb9e973deded01ab465baca26977a95db4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Jan 16 15:18:04 2018 +0100
CAMEL-12148: Reworked FileIdempontentRepository so LRUCache is only act as quick lookup. And in case of 1st-level miss the file store is checked. File store also writes the entries in the same order they are added.
---
.../idempotent/FileIdempotentRepository.java | 254 +++++++++++++++++----
.../processor/FileIdempotentTrunkStoreTest.java | 15 ++
.../FileIdempotentStoreOrderingTest.java | 151 ++++++++++++
3 files changed, 375 insertions(+), 45 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
index 4343331..339838b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
@@ -19,6 +19,8 @@ package org.apache.camel.processor.idempotent;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,13 +41,13 @@ import org.slf4j.LoggerFactory;
/**
* A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
* <p/>
- * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
- * memory leak.
- * <p/>
- * The default cache used is {@link LRUCache} which keeps the most used entries in the cache.
- * When this cache is being used and the state of the cache is stored to file via {@link #trunkStore()}
- * then the entries stored are not guaranteed to be in the exact order the entries were added to the cache.
- * If you need exact ordering, then you need to provide a custom {@link Map} implementation that does that
+ * This implementation provides a 1st-level in-memory {@link LRUCache} for fast check of the most
+ * frequently used keys. When {@link #add(String)} or {@link #contains(String)} methods are being used
+ * then in case of 1st-level cache miss, the underlying file is scanned which may cost additional performance.
+ * So try to find the right balance of the size of the 1st-level cache, the default size is 1000.
+ * The file store has a maximum capacity of 32mb by default. If the file store grows bigger, then
+ * the {@link #getDropOldestFileStore()} number of entries from the file store is dropped to reduce
+ * the file store and make room for newer entries.
*
* @version
*/
@@ -53,10 +55,13 @@ import org.slf4j.LoggerFactory;
public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
private static final String STORE_DELIMITER = "\n";
+
+ private final AtomicBoolean init = new AtomicBoolean();
+
private Map<String, Object> cache;
private File fileStore;
- private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
- private final AtomicBoolean init = new AtomicBoolean();
+ private long maxFileStoreSize = 32 * 1024 * 1000L; // 32mb store file
+ private long dropOldestFileStore = 1000;
public FileIdempotentRepository() {
}
@@ -123,12 +128,21 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
if (cache.containsKey(key)) {
return false;
} else {
+ // always register the most used keys in the LRUCache
cache.put(key, key);
- if (fileStore.length() < maxFileStoreSize) {
- // just append to store
- appendToStore(key);
- } else {
- // trunk store and flush the cache
+
+ // now check the file store
+ boolean containsInFile = containsStore(key);
+ if (containsInFile) {
+ return false;
+ }
+
+ // its a new key so append to file store
+ appendToStore(key);
+
+ // check if we hit maximum capacity and report a warning about this
+ if (fileStore.length() > maxFileStoreSize) {
+ LOG.warn("Maximum capacity of file store: {} hit at {} bytes. Dropping {} oldest entries from the file store", fileStore, maxFileStoreSize, dropOldestFileStore);
trunkStore();
}
@@ -140,7 +154,8 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
@ManagedOperation(description = "Does the store contain the given key")
public boolean contains(String key) {
synchronized (cache) {
- return cache.containsKey(key);
+ // check 1st-level first and then fallback to check the actual file
+ return cache.containsKey(key) || containsStore(key);
}
}
@@ -149,8 +164,8 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
boolean answer;
synchronized (cache) {
answer = cache.remove(key) != null;
- // trunk store and flush the cache on remove
- trunkStore();
+ // remove from file cache also
+ removeFromStore(key);
}
return answer;
}
@@ -160,13 +175,15 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
return true;
}
- @ManagedOperation(description = "Clear the store")
+ @ManagedOperation(description = "Clear the store (danger this removes all entries)")
public void clear() {
synchronized (cache) {
cache.clear();
if (cache instanceof LRUCache) {
((LRUCache) cache).cleanUp();
}
+ // clear file store
+ clearStore();
}
}
@@ -199,15 +216,30 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
/**
* Sets the maximum file size for the file store in bytes.
* <p/>
- * The default is 1mb.
+ * The default is 32mb.
*/
@ManagedAttribute(description = "The maximum file size for the file store in bytes")
public void setMaxFileStoreSize(long maxFileStoreSize) {
this.maxFileStoreSize = maxFileStoreSize;
}
+ public long getDropOldestFileStore() {
+ return dropOldestFileStore;
+ }
+
+ /**
+ * Sets the number of oldest entries to drop from the file store when the maximum capacity is hit to reduce
+ * disk space to allow room for new entries.
+ * <p/>
+ * The default is 1000.
+ */
+ @ManagedAttribute(description = "Number of oldest elements to drop from file store if maximum file size reached")
+ public void setDropOldestFileStore(long dropOldestFileStore) {
+ this.dropOldestFileStore = dropOldestFileStore;
+ }
+
/**
- * Sets the cache size.
+ * Sets the 1st-level cache size.
*
* Setting cache size is only possible when using the default {@link LRUCache} cache implementation.
*/
@@ -222,7 +254,7 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
cache = LRUCacheFactory.newLRUCache(size);
}
- @ManagedAttribute(description = "The current cache size")
+ @ManagedAttribute(description = "The current 1st-level cache size")
public int getCacheSize() {
if (cache != null) {
return cache.size();
@@ -231,28 +263,58 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
}
/**
- * Reset and clears the store to force it to reload from file
+ * Reset and clears the 1st-level cache to force it to reload from file
*/
@ManagedOperation(description = "Reset and reloads the file store")
public synchronized void reset() throws IOException {
synchronized (cache) {
- // trunk and clear, before we reload the store
- trunkStore();
- cache.clear();
+ // run the cleanup task first
if (cache instanceof LRUCache) {
((LRUCache) cache).cleanUp();
}
+ cache.clear();
loadStore();
}
}
/**
- * Appends the given message id to the file store
+ * Checks the file store if the key exists
*
- * @param messageId the message id
+ * @param key the key
+ * @return <tt>true</tt> if exists in the file, <tt>false</tt> otherwise
*/
- protected void appendToStore(final String messageId) {
- LOG.debug("Appending {} to idempotent filestore: {}", messageId, fileStore);
+ protected boolean containsStore(final String key) {
+ if (fileStore == null || !fileStore.exists()) {
+ return false;
+ }
+
+ Scanner scanner = null;
+ try {
+ scanner = new Scanner(fileStore);
+ scanner.useDelimiter(STORE_DELIMITER);
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ if (line.equals(key)) {
+ return true;
+ }
+ }
+ } catch (IOException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Appends the given key to the file store
+ *
+ * @param key the key
+ */
+ protected void appendToStore(final String key) {
+ LOG.debug("Appending: {} to idempotent filestore: {}", key, fileStore);
FileOutputStream fos = null;
try {
// create store parent directory if missing
@@ -260,9 +322,9 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
if (storeParentDirectory != null && !storeParentDirectory.exists()) {
LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore);
if (fileStore.getParentFile().mkdirs()) {
- LOG.info("Parent directory of file store {} successfully created.", fileStore);
+ LOG.info("Parent directory of filestore: {} successfully created.", fileStore);
} else {
- LOG.warn("Parent directory of file store {} cannot be created.", fileStore);
+ LOG.warn("Parent directory of filestore: {} cannot be created.", fileStore);
}
}
// create store if missing
@@ -271,7 +333,7 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
}
// append to store
fos = new FileOutputStream(fileStore, true);
- fos.write(messageId.getBytes());
+ fos.write(key.getBytes());
fos.write(STORE_DELIMITER.getBytes());
} catch (IOException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
@@ -280,23 +342,125 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
}
}
+ protected synchronized void removeFromStore(String key) {
+ LOG.debug("Removing: {} from idempotent filestore: {}", key, fileStore);
+
+ // we need to re-load the entire file and remove the key and then re-write the file
+ List<String> lines = new ArrayList<>();
+
+ boolean found = false;
+ Scanner scanner = null;
+ try {
+ scanner = new Scanner(fileStore);
+ scanner.useDelimiter(STORE_DELIMITER);
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ if (key.equals(line)) {
+ found = true;
+ } else {
+ lines.add(line);
+ }
+ }
+ } catch (IOException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+
+ if (found) {
+ // rewrite file
+ LOG.debug("Rewriting idempotent filestore: {} due to key: {} removed", fileStore, key);
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(fileStore);
+ for (String line : lines) {
+ fos.write(line.getBytes());
+ fos.write(STORE_DELIMITER.getBytes());
+ }
+ } catch (IOException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ IOHelper.close(fos, "Rewriting file idempotent repository", LOG);
+ }
+ }
+ }
+
/**
- * Trunks the file store when the max store size is hit by rewriting the 1st level cache
- * to the file store.
+ * Clears the file-store (danger this deletes all entries)
*/
- protected void trunkStore() {
- LOG.info("Trunking idempotent filestore: {}", fileStore);
- FileOutputStream fos = null;
+ protected void clearStore() {
+ try {
+ FileUtil.deleteFile(fileStore);
+ FileUtil.createNewFile(fileStore);
+ } catch (IOException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ /**
+ * Trunks the file store when the max store size is hit by dropping the most oldest entries.
+ */
+ protected synchronized void trunkStore() {
+ if (fileStore == null || !fileStore.exists()) {
+ return;
+ }
+
+ LOG.debug("Trunking: {} oldest entries from idempotent filestore: {}", dropOldestFileStore, fileStore);
+
+ // we need to re-load the entire file and remove the key and then re-write the file
+ List<String> lines = new ArrayList<>();
+
+ Scanner scanner = null;
+ int count = 0;
try {
- fos = new FileOutputStream(fileStore);
- for (String key : cache.keySet()) {
- fos.write(key.getBytes());
- fos.write(STORE_DELIMITER.getBytes());
+ scanner = new Scanner(fileStore);
+ scanner.useDelimiter(STORE_DELIMITER);
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ count++;
+ if (count > dropOldestFileStore) {
+ lines.add(line);
+ }
}
} catch (IOException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
} finally {
- IOHelper.close(fos, "Trunking file idempotent repository", LOG);
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+
+ if (!lines.isEmpty()) {
+ // rewrite file
+ LOG.debug("Rewriting idempotent filestore: {} with {} entries:", fileStore, lines.size());
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(fileStore);
+ for (String line : lines) {
+ fos.write(line.getBytes());
+ fos.write(STORE_DELIMITER.getBytes());
+ }
+ } catch (IOException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ IOHelper.close(fos, "Rewriting file idempotent repository", LOG);
+ }
+ } else {
+ // its a small file so recreate the file
+ LOG.debug("Clearing idempotent filestore: {}", fileStore);
+ clearStore();
+ }
+ }
+
+ /**
+ * Cleanup the 1st-level cache.
+ */
+ protected void cleanup() {
+ // run the cleanup task first
+ if (cache instanceof LRUCache) {
+ ((LRUCache) cache).cleanUp();
}
}
@@ -357,12 +521,12 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote
@Override
protected void doStop() throws Exception {
- // reset will trunk and clear the cache
- trunkStore();
- cache.clear();
+ // run the cleanup task first
if (cache instanceof LRUCache) {
((LRUCache) cache).cleanUp();
}
+
+ cache.clear();
init.set(false);
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java b/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java
index be3dc36..0055d57 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java
@@ -17,6 +17,10 @@
package org.apache.camel.processor;
import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
@@ -27,6 +31,8 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.idempotent.FileIdempotentRepository;
import org.apache.camel.spi.IdempotentRepository;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.Assert;
/**
* @version
@@ -60,6 +66,15 @@ public class FileIdempotentTrunkStoreTest extends ContextTestSupport {
resultEndpoint.assertIsSatisfied();
assertTrue(repo.contains("XXXXXXXXXX"));
+
+ // check the file should only have the last 2 entries as it was trunked
+ Stream<String> fileContent = Files.lines(store.toPath());
+ List<String> fileEntries = fileContent.collect(Collectors.toList());
+ fileContent.close();
+ //expected order
+ Assert.assertThat(fileEntries, IsIterableContainingInOrder.contains(
+ "ZZZZZZZZZZ",
+ "XXXXXXXXXX"));
}
protected void sendMessage(final Object messageId, final Object body) {
diff --git a/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentStoreOrderingTest.java b/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentStoreOrderingTest.java
new file mode 100644
index 0000000..da7d572
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentStoreOrderingTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.camel.TestSupport.createDirectory;
+import static org.apache.camel.TestSupport.deleteDirectory;
+
+public class FileIdempotentStoreOrderingTest {
+
+ private FileIdempotentRepository fileIdempotentRepository;
+ private List<String> files;
+
+ @Before
+ public void setup() {
+ files = Arrays.asList(
+ "file1.txt.20171123",
+ "file2.txt.20171123",
+ "file1.txt.20171124",
+ "file3.txt.20171125",
+ "file2.txt.20171126",
+ "fixed.income.lamr.out.20171126",
+ "pricing.px.20171126",
+ "test.out.20171126",
+ "processing.source.lamr.out.20171126");
+ this.fileIdempotentRepository = new FileIdempotentRepository();
+ }
+
+ @Test
+ public void testTrunkStoreNotMaxHit() throws Exception {
+ // ensure empty folder
+ deleteDirectory("target/mystore");
+ createDirectory("target/mystore");
+
+ //given
+ File fileStore = new File("target/mystore/data.dat");
+ fileIdempotentRepository.setFileStore(fileStore);
+ fileIdempotentRepository.setCacheSize(10);
+ fileIdempotentRepository.start();
+ files.forEach(e -> fileIdempotentRepository.add(e));
+
+ //when (will rebalance)
+ fileIdempotentRepository.stop();
+
+ //then
+ Stream<String> fileContent = Files.lines(fileStore.toPath());
+ List<String> fileEntries = fileContent.collect(Collectors.toList());
+ fileContent.close();
+ //expected order
+ Assert.assertThat(fileEntries, IsIterableContainingInOrder.contains(
+ "file1.txt.20171123",
+ "file2.txt.20171123",
+ "file1.txt.20171124",
+ "file3.txt.20171125",
+ "file2.txt.20171126",
+ "fixed.income.lamr.out.20171126",
+ "pricing.px.20171126",
+ "test.out.20171126",
+ "processing.source.lamr.out.20171126"));
+ }
+
+ @Test
+ public void testTrunkStoreFirstLevelMaxHit() throws Exception {
+ // ensure empty folder
+ deleteDirectory("target/mystore");
+ createDirectory("target/mystore");
+
+ //given
+ File fileStore = new File("target/mystore/data.dat");
+ fileIdempotentRepository.setFileStore(fileStore);
+ fileIdempotentRepository.setCacheSize(5);
+ fileIdempotentRepository.start();
+ files.forEach(e -> fileIdempotentRepository.add(e));
+
+ //when (will rebalance)
+ fileIdempotentRepository.stop();
+
+ //then
+ Stream<String> fileContent = Files.lines(fileStore.toPath());
+ List<String> fileEntries = fileContent.collect(Collectors.toList());
+ fileContent.close();
+ //expected order
+ Assert.assertThat(fileEntries, IsIterableContainingInOrder.contains(
+ "file1.txt.20171123",
+ "file2.txt.20171123",
+ "file1.txt.20171124",
+ "file3.txt.20171125",
+ "file2.txt.20171126",
+ "fixed.income.lamr.out.20171126",
+ "pricing.px.20171126",
+ "test.out.20171126",
+ "processing.source.lamr.out.20171126"));
+ }
+
+ @Test
+ public void testTrunkStoreFileMaxHit() throws Exception {
+ // ensure empty folder
+ deleteDirectory("target/mystore");
+ createDirectory("target/mystore");
+
+ //given
+ File fileStore = new File("target/mystore/data.dat");
+ fileIdempotentRepository.setFileStore(fileStore);
+ fileIdempotentRepository.setCacheSize(5);
+ fileIdempotentRepository.setMaxFileStoreSize(128);
+ fileIdempotentRepository.setDropOldestFileStore(1000);
+
+ fileIdempotentRepository.start();
+ files.forEach(e -> fileIdempotentRepository.add(e));
+
+ // force cleanup and trunk
+ fileIdempotentRepository.cleanup();
+ fileIdempotentRepository.trunkStore();
+
+ fileIdempotentRepository.stop();
+
+ //then
+ Stream<String> fileContent = Files.lines(fileStore.toPath());
+ List<String> fileEntries = fileContent.collect(Collectors.toList());
+ fileContent.close();
+
+ // all old entries is removed
+ Assert.assertEquals(0, fileEntries.size());
+ }
+
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.