You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/26 10:03:30 UTC

[camel] 01/03: CAMEL-18649: avoid multiple recursive checks for resumed files

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b7d9797d62f0c00b02ed767a67118b4cde7322c4
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 25 14:42:58 2022 +0200

    CAMEL-18649: avoid multiple recursive checks for resumed files
---
 .../apache/camel/component/file/FileConsumer.java  | 23 +++----
 .../consumer/DirectoryEntriesResumeAdapter.java    |  6 +-
 .../DefaultDirectoryEntriesResumeAdapter.java      | 46 ++++++-------
 .../file/consumer/adapters/DirectoryEntries.java   | 75 ----------------------
 .../adapters/FileResumeAdapterDelegate.java        | 10 +--
 .../FileConsumerResumeFromOffsetStrategyTest.java  | 14 +---
 .../file/FileConsumerResumeStrategyTest.java       | 20 +++---
 7 files changed, 51 insertions(+), 143 deletions(-)

diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 6aeec6a7323..0fff815b525 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -33,7 +33,6 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
 import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
@@ -107,10 +106,20 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
 
             if (resumeStrategy != null) {
                 ResumeAdapter adapter = resumeStrategy.getAdapter();
+                LOG.trace("Checking the resume adapter: {}", adapter);
                 if (adapter instanceof FileOffsetResumeAdapter) {
+                    LOG.trace("The resume adapter is for offsets: {}", adapter);
                     ((FileOffsetResumeAdapter) adapter).setResumePayload(gf);
                     adapter.resume();
                 }
+
+                if (adapter instanceof DirectoryEntriesResumeAdapter) {
+                    LOG.trace("Running the resume process for file {}", file);
+                    if (((DirectoryEntriesResumeAdapter) adapter).resume(file)) {
+                        LOG.trace("Skipping file {} because it has been marked previously consumed", file);
+                        continue;
+                    }
+                }
             }
 
             if (file.isDirectory()) {
@@ -177,18 +186,6 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
             }
         }
 
-        if (resumeStrategy != null) {
-            ResumeAdapter adapter = resumeStrategy.getAdapter();
-            if (adapter instanceof DirectoryEntriesResumeAdapter) {
-                DirectoryEntries resumeSet = new DirectoryEntries(directory, dirFiles);
-
-                ((DirectoryEntriesResumeAdapter) adapter).setResumePayload(resumeSet);
-                adapter.resume();
-
-                return resumeSet.resumed();
-            }
-        }
-
         return dirFiles;
     }
 
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
index 9d1de1297dd..e7842d33cb2 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
@@ -17,10 +17,8 @@
 
 package org.apache.camel.component.file.consumer;
 
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
+import java.io.File;
 
 public interface DirectoryEntriesResumeAdapter {
-    default void setResumePayload(DirectoryEntries fileSet) {
-
-    }
+    boolean resume(File file);
 }
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
index 09e33457adb..445913a3bdd 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
@@ -21,29 +21,16 @@ import java.io.File;
 
 import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
 import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of the {@link FileResumeAdapter} that can be used for resume operations for the file component.
  * This one can be used to manage the resume operations for files within a directory.
  */
 class DefaultDirectoryEntriesResumeAdapter extends AbstractFileResumeAdapter implements DirectoryEntriesResumeAdapter {
-    private DirectoryEntries fileSet;
-
-    private boolean notProcessed(File directory, File file) {
-        FileSet cached = cache.get(directory, FileSet.class);
-        if (cached == null) {
-            return true;
-        }
-
-        return !cached.contains(file);
-    }
-
-    @Override
-    public void setResumePayload(DirectoryEntries fileSet) {
-        assert fileSet != null;
-
-        this.fileSet = fileSet;
-    }
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultDirectoryEntriesResumeAdapter.class);
 
     protected boolean add(Object key, Object offset) {
         if (offset instanceof File) {
@@ -58,16 +45,31 @@ class DefaultDirectoryEntriesResumeAdapter extends AbstractFileResumeAdapter imp
         return true;
     }
 
-    private void resumeDirectoryEntries() {
-        DirectoryEntries.doResume(fileSet, f -> notProcessed(fileSet.getDirectory(), f));
-    }
-
     @Override
     public void resume() {
-        resumeDirectoryEntries();
+        // NO-OP
+    }
+
+    private boolean processed(ResumeCache<File> cache, File directory, File file) {
+        LOG.trace("Checking if file {} with key {} is cached: {}, {}", file, directory);
+        FileSet cached = cache.get(directory, FileSet.class);
+        if (cached == null) {
+            LOG.trace("FileSet is not cached, therefore has not been processed yet");
+            return false;
+        }
+
+        final boolean isCached = cached.contains(file);
+        LOG.trace("FileSet is cached. Checking if it contains {}: {}", file, isCached);
+
+        return isCached;
+    }
+
+    public boolean resume(File file) {
+        return processed(cache, file.getParentFile(), file);
     }
 
     public void deserializeFileEntry(File keyObj, File valueObj) {
+        LOG.trace("Deserializing file key {} with value {}", keyObj, valueObj);
         FileSet fileSet = (FileSet) cache.computeIfAbsent(keyObj, obj -> new FileSet());
 
         fileSet.update(valueObj);
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
deleted file mode 100644
index 5795dc5042e..00000000000
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.file.consumer.adapters;
-
-import java.io.File;
-import java.util.Objects;
-import java.util.function.Predicate;
-
-import org.apache.camel.support.resume.Resumables;
-
-/**
- * This contains the input/output file set for resume operations.
- */
-public final class DirectoryEntries {
-    private final File directory;
-    private final File[] inputFiles;
-    private File[] outputFiles;
-
-    public DirectoryEntries(File directory, File[] inputFiles) {
-        this.directory = directory;
-        this.inputFiles = Objects.requireNonNull(inputFiles,
-                "A list of input files must be provided for the resume info");
-    }
-
-    /**
-     * Gets the files that should be resumed
-     *
-     * @return an array with the files that should be resumed
-     */
-    public File[] resumed() {
-        return outputFiles;
-    }
-
-    /**
-     * Whether there are resumable files to process
-     *
-     * @return true if there are resumable files or false otherwise
-     */
-    public boolean hasResumables() {
-        if (outputFiles != inputFiles) {
-            return true;
-        }
-
-        return false;
-    }
-
-    public File getDirectory() {
-        return directory;
-    }
-
-    public void setOutputFiles(File[] resumed) {
-        this.outputFiles = resumed;
-    }
-
-    public static void doResume(DirectoryEntries directoryEntries, Predicate<File> resumableCheck) {
-        File[] processed = Resumables.resumeEach(directoryEntries.inputFiles,
-                resumableCheck);
-        directoryEntries.setOutputFiles(processed);
-    }
-}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
index 79def33b4e3..78bf97643a4 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
@@ -45,11 +45,6 @@ public class FileResumeAdapterDelegate
         fileOffsetResumeAdapter.setResumePayload(genericFile);
     }
 
-    @Override
-    public void setResumePayload(DirectoryEntries fileSet) {
-        directoryEntriesResumeAdapter.setResumePayload(fileSet);
-    }
-
     @Override
     public boolean add(OffsetKey<?> key, Offset<?> offset) {
         Object offsetObj = offset.getValue();
@@ -93,4 +88,9 @@ public class FileResumeAdapterDelegate
         fileOffsetResumeAdapter.resume();
         directoryEntriesResumeAdapter.resume();
     }
+
+    @Override
+    public boolean resume(File file) {
+        return directoryEntriesResumeAdapter.resume(file);
+    }
 }
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index b29d99d9f51..beca7934914 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -29,7 +29,6 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
 import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
 import org.apache.camel.component.file.consumer.FileResumeAdapter;
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.support.resume.Resumables;
@@ -44,7 +43,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
 
     private static class TestFileResumeAdapter implements FileResumeAdapter, FileOffsetResumeAdapter {
         private GenericFile<File> resumable;
-        private DirectoryEntries fileSet;
 
         @Override
         public void setResumePayload(GenericFile<File> resumable) {
@@ -61,14 +59,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
                 resumable.updateLastOffsetValue(3L);
                 resumable = null;
             }
-
-            if (fileSet != null) {
-                DirectoryEntries.doResume(fileSet, f -> !f.getName().equals("resume-from-offset"));
-                LOG.debug("Fileset: {}", fileSet);
-                LOG.debug("Fileset: {}", fileSet.resumed());
-
-                fileSet = null;
-            }
         }
     }
 
@@ -81,8 +71,8 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
         }
 
         @Override
-        public void setResumePayload(DirectoryEntries fileSet) {
-            DirectoryEntries.doResume(fileSet, f -> true);
+        public boolean resume(File file) {
+            return false;
         }
 
     }
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index 5241e1571ec..ba8b9446913 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -16,25 +16,22 @@
  */
 package org.apache.camel.component.file;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
 import org.apache.camel.component.file.consumer.FileResumeAdapter;
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.support.resume.Resumables;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @DisplayName("Tests whether file consumer works with the resume strategy")
@@ -42,16 +39,17 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
 
     private static class TestFileSetResumeAdapter implements FileResumeAdapter, DirectoryEntriesResumeAdapter {
         private final List<String> processedFiles = Arrays.asList("0.txt", "1.txt", "2.txt");
-        private DirectoryEntries resumeSet;
+        private boolean resumedCalled;
 
         @Override
-        public void setResumePayload(DirectoryEntries resumeSet) {
-            this.resumeSet = Objects.requireNonNull(resumeSet);
+        public void resume() {
+
         }
 
         @Override
-        public void resume() {
-            DirectoryEntries.doResume(resumeSet, f -> !processedFiles.contains(f.getName()));
+        public boolean resume(File file) {
+            resumedCalled = true;
+            return processedFiles.contains(file);
         }
     }
 
@@ -80,9 +78,7 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
         // only expect 4 of the 6 sent
         assertMockEndpointsSatisfied();
 
-        assertTrue(adapter.resumeSet.hasResumables(), "The resume set should have resumables in this scenario");
-        assertNotNull(adapter.resumeSet.resumed(), "The list of resumables should not be null");
-        assertEquals(4, adapter.resumeSet.resumed().length, "There should be exactly 4 resumables");
+        assertTrue(adapter.resumedCalled, "The resume set should have resumables in this scenario");
     }
 
     private void setOffset(Exchange exchange) {