You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/26 10:03:30 UTC
[camel] 01/03: CAMEL-18649: avoid multiple recursive checks for resumed files
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit b7d9797d62f0c00b02ed767a67118b4cde7322c4
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 25 14:42:58 2022 +0200
CAMEL-18649: avoid multiple recursive checks for resumed files
---
.../apache/camel/component/file/FileConsumer.java | 23 +++----
.../consumer/DirectoryEntriesResumeAdapter.java | 6 +-
.../DefaultDirectoryEntriesResumeAdapter.java | 46 ++++++-------
.../file/consumer/adapters/DirectoryEntries.java | 75 ----------------------
.../adapters/FileResumeAdapterDelegate.java | 10 +--
.../FileConsumerResumeFromOffsetStrategyTest.java | 14 +---
.../file/FileConsumerResumeStrategyTest.java | 20 +++---
7 files changed, 51 insertions(+), 143 deletions(-)
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 6aeec6a7323..0fff815b525 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -33,7 +33,6 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
@@ -107,10 +106,20 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
if (resumeStrategy != null) {
ResumeAdapter adapter = resumeStrategy.getAdapter();
+ LOG.trace("Checking the resume adapter: {}", adapter);
if (adapter instanceof FileOffsetResumeAdapter) {
+ LOG.trace("The resume adapter is for offsets: {}", adapter);
((FileOffsetResumeAdapter) adapter).setResumePayload(gf);
adapter.resume();
}
+
+ if (adapter instanceof DirectoryEntriesResumeAdapter) {
+ LOG.trace("Running the resume process for file {}", file);
+ if (((DirectoryEntriesResumeAdapter) adapter).resume(file)) {
+ LOG.trace("Skipping file {} because it has been marked previously consumed", file);
+ continue;
+ }
+ }
}
if (file.isDirectory()) {
@@ -177,18 +186,6 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
}
}
- if (resumeStrategy != null) {
- ResumeAdapter adapter = resumeStrategy.getAdapter();
- if (adapter instanceof DirectoryEntriesResumeAdapter) {
- DirectoryEntries resumeSet = new DirectoryEntries(directory, dirFiles);
-
- ((DirectoryEntriesResumeAdapter) adapter).setResumePayload(resumeSet);
- adapter.resume();
-
- return resumeSet.resumed();
- }
- }
-
return dirFiles;
}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
index 9d1de1297dd..e7842d33cb2 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
@@ -17,10 +17,8 @@
package org.apache.camel.component.file.consumer;
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
+import java.io.File;
public interface DirectoryEntriesResumeAdapter {
- default void setResumePayload(DirectoryEntries fileSet) {
-
- }
+ boolean resume(File file);
}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
index 09e33457adb..445913a3bdd 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
@@ -21,29 +21,16 @@ import java.io.File;
import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An implementation of the {@link FileResumeAdapter} that can be used for resume operations for the file component.
* This one can be used to manage the resume operations for files within a directory.
*/
class DefaultDirectoryEntriesResumeAdapter extends AbstractFileResumeAdapter implements DirectoryEntriesResumeAdapter {
- private DirectoryEntries fileSet;
-
- private boolean notProcessed(File directory, File file) {
- FileSet cached = cache.get(directory, FileSet.class);
- if (cached == null) {
- return true;
- }
-
- return !cached.contains(file);
- }
-
- @Override
- public void setResumePayload(DirectoryEntries fileSet) {
- assert fileSet != null;
-
- this.fileSet = fileSet;
- }
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultDirectoryEntriesResumeAdapter.class);
protected boolean add(Object key, Object offset) {
if (offset instanceof File) {
@@ -58,16 +45,31 @@ class DefaultDirectoryEntriesResumeAdapter extends AbstractFileResumeAdapter imp
return true;
}
- private void resumeDirectoryEntries() {
- DirectoryEntries.doResume(fileSet, f -> notProcessed(fileSet.getDirectory(), f));
- }
-
@Override
public void resume() {
- resumeDirectoryEntries();
+ // NO-OP
+ }
+
+ private boolean processed(ResumeCache<File> cache, File directory, File file) {
+ LOG.trace("Checking if file {} with key {} is cached: {}, {}", file, directory);
+ FileSet cached = cache.get(directory, FileSet.class);
+ if (cached == null) {
+ LOG.trace("FileSet is not cached, therefore has not been processed yet");
+ return false;
+ }
+
+ final boolean isCached = cached.contains(file);
+ LOG.trace("FileSet is cached. Checking if it contains {}: {}", file, isCached);
+
+ return isCached;
+ }
+
+ public boolean resume(File file) {
+ return processed(cache, file.getParentFile(), file);
}
public void deserializeFileEntry(File keyObj, File valueObj) {
+ LOG.trace("Deserializing file key {} with value {}", keyObj, valueObj);
FileSet fileSet = (FileSet) cache.computeIfAbsent(keyObj, obj -> new FileSet());
fileSet.update(valueObj);
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
deleted file mode 100644
index 5795dc5042e..00000000000
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.file.consumer.adapters;
-
-import java.io.File;
-import java.util.Objects;
-import java.util.function.Predicate;
-
-import org.apache.camel.support.resume.Resumables;
-
-/**
- * This contains the input/output file set for resume operations.
- */
-public final class DirectoryEntries {
- private final File directory;
- private final File[] inputFiles;
- private File[] outputFiles;
-
- public DirectoryEntries(File directory, File[] inputFiles) {
- this.directory = directory;
- this.inputFiles = Objects.requireNonNull(inputFiles,
- "A list of input files must be provided for the resume info");
- }
-
- /**
- * Gets the files that should be resumed
- *
- * @return an array with the files that should be resumed
- */
- public File[] resumed() {
- return outputFiles;
- }
-
- /**
- * Whether there are resumable files to process
- *
- * @return true if there are resumable files or false otherwise
- */
- public boolean hasResumables() {
- if (outputFiles != inputFiles) {
- return true;
- }
-
- return false;
- }
-
- public File getDirectory() {
- return directory;
- }
-
- public void setOutputFiles(File[] resumed) {
- this.outputFiles = resumed;
- }
-
- public static void doResume(DirectoryEntries directoryEntries, Predicate<File> resumableCheck) {
- File[] processed = Resumables.resumeEach(directoryEntries.inputFiles,
- resumableCheck);
- directoryEntries.setOutputFiles(processed);
- }
-}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
index 79def33b4e3..78bf97643a4 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
@@ -45,11 +45,6 @@ public class FileResumeAdapterDelegate
fileOffsetResumeAdapter.setResumePayload(genericFile);
}
- @Override
- public void setResumePayload(DirectoryEntries fileSet) {
- directoryEntriesResumeAdapter.setResumePayload(fileSet);
- }
-
@Override
public boolean add(OffsetKey<?> key, Offset<?> offset) {
Object offsetObj = offset.getValue();
@@ -93,4 +88,9 @@ public class FileResumeAdapterDelegate
fileOffsetResumeAdapter.resume();
directoryEntriesResumeAdapter.resume();
}
+
+ @Override
+ public boolean resume(File file) {
+ return directoryEntriesResumeAdapter.resume(file);
+ }
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index b29d99d9f51..beca7934914 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -29,7 +29,6 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
import org.apache.camel.component.file.consumer.FileResumeAdapter;
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.resume.TransientResumeStrategy;
import org.apache.camel.support.resume.Resumables;
@@ -44,7 +43,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
private static class TestFileResumeAdapter implements FileResumeAdapter, FileOffsetResumeAdapter {
private GenericFile<File> resumable;
- private DirectoryEntries fileSet;
@Override
public void setResumePayload(GenericFile<File> resumable) {
@@ -61,14 +59,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
resumable.updateLastOffsetValue(3L);
resumable = null;
}
-
- if (fileSet != null) {
- DirectoryEntries.doResume(fileSet, f -> !f.getName().equals("resume-from-offset"));
- LOG.debug("Fileset: {}", fileSet);
- LOG.debug("Fileset: {}", fileSet.resumed());
-
- fileSet = null;
- }
}
}
@@ -81,8 +71,8 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
}
@Override
- public void setResumePayload(DirectoryEntries fileSet) {
- DirectoryEntries.doResume(fileSet, f -> true);
+ public boolean resume(File file) {
+ return false;
}
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index 5241e1571ec..ba8b9446913 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -16,25 +16,22 @@
*/
package org.apache.camel.component.file;
+import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
import org.apache.camel.component.file.consumer.FileResumeAdapter;
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.resume.TransientResumeStrategy;
import org.apache.camel.support.resume.Resumables;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@DisplayName("Tests whether file consumer works with the resume strategy")
@@ -42,16 +39,17 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
private static class TestFileSetResumeAdapter implements FileResumeAdapter, DirectoryEntriesResumeAdapter {
private final List<String> processedFiles = Arrays.asList("0.txt", "1.txt", "2.txt");
- private DirectoryEntries resumeSet;
+ private boolean resumedCalled;
@Override
- public void setResumePayload(DirectoryEntries resumeSet) {
- this.resumeSet = Objects.requireNonNull(resumeSet);
+ public void resume() {
+
}
@Override
- public void resume() {
- DirectoryEntries.doResume(resumeSet, f -> !processedFiles.contains(f.getName()));
+ public boolean resume(File file) {
+ resumedCalled = true;
+ return processedFiles.contains(file);
}
}
@@ -80,9 +78,7 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
// only expect 4 of the 6 sent
assertMockEndpointsSatisfied();
- assertTrue(adapter.resumeSet.hasResumables(), "The resume set should have resumables in this scenario");
- assertNotNull(adapter.resumeSet.resumed(), "The list of resumables should not be null");
- assertEquals(4, adapter.resumeSet.resumed().length, "There should be exactly 4 resumables");
+ assertTrue(adapter.resumedCalled, "The resume set should have resumables in this scenario");
}
private void setOffset(Exchange exchange) {