You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/01 18:24:13 UTC

[camel] 07/16: CAMEL-15562: abstract the underlying cache so it's configurable

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6a55b310a0606c290d99bb41d77ce08df35c9c82
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Feb 23 14:57:44 2022 +0100

    CAMEL-15562: abstract the underlying cache so it's configurable
---
 .../resume/kafka/AbstractKafkaResumeStrategy.java  | 40 +++++++----------
 .../main/java/org/apache/camel/ResumeCache.java    | 52 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 24 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
index 4fad434..a779c00 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
@@ -23,14 +23,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.camel.Resumable;
+import org.apache.camel.ResumeCache;
 import org.apache.camel.Service;
 import org.apache.camel.UpdatableConsumerResumeStrategy;
 import org.apache.camel.util.StringHelper;
@@ -62,24 +61,30 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     private Producer<K, V> producer;
     private long errorCount;
 
-    private Map<K, List<V>> processedItems = new TreeMap<>();
-    private List<Future<RecordMetadata>> sentItems = new ArrayList<>();
+    private final List<Future<RecordMetadata>> sentItems = new ArrayList<>();
+    private final ResumeCache<K, V> resumeCache;
     private boolean subscribed;
     private Properties producerConfig;
     private Properties consumerConfig;
 
-    public AbstractKafkaResumeStrategy(String bootstrapServers, String topic) {
+    public AbstractKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache) {
         this.topic = topic;
 
         this.producerConfig = createProducer(bootstrapServers);
         this.consumerConfig = createConsumer(bootstrapServers);
+        this.resumeCache = resumeCache;
+
+        init();
     }
 
-    public AbstractKafkaResumeStrategy(String topic, Properties producerConfig,
+    public AbstractKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, Properties producerConfig,
                                        Properties consumerConfig) {
         this.topic = topic;
+        this.resumeCache = resumeCache;
         this.producerConfig = producerConfig;
         this.consumerConfig = consumerConfig;
+
+        init();
     }
 
     private Properties createProducer(String bootstrapServers) {
@@ -148,22 +153,15 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
 
         produce(topic, key, offsetValue);
 
-        // TODO: this leaks. It must be fixed for merge
-        var entries = processedItems.computeIfAbsent(key, k -> new ArrayList<>());
-        entries.add(offsetValue);
-    }
-
-    protected void loadProcessedItems(Map<K, List<V>> processed) throws Exception {
-        loadProcessedItems(processed, UNLIMITED);
+        resumeCache.add(key, offsetValue);
     }
 
-    protected void loadProcessedItems(Map<K, List<V>> processed, int limit) throws Exception {
+    protected void loadCache() throws Exception {
         subscribe();
 
         LOG.debug("Loading records from topic {}", topic);
 
         ConsumerRecords<K, V> records;
-
         do {
             records = consume();
 
@@ -175,16 +173,14 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
                 V value = record.value();
 
                 LOG.trace("Read from Kafka: {}", value);
-                var entries = processed.computeIfAbsent(record.key(), k -> new ArrayList<>());
-                entries.add(record.value());
+                resumeCache.add(record.key(), record.value());
 
-                if (limit != UNLIMITED && processed.size() >= limit) {
+                if (resumeCache.isFull()) {
                     break;
                 }
             }
         } while (true);
 
-        LOG.debug("Loaded {} records", processed.size());
         unsubscribe();
     }
 
@@ -263,10 +259,6 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         return ConsumerRecords.empty();
     }
 
-    protected Map<K, List<V>> getProcessedItems() {
-        return processedItems;
-    }
-
     public long getErrorCount() {
         return errorCount;
     }
@@ -309,7 +301,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         LOG.info("Starting the kafka resume strategy");
 
         try {
-            loadProcessedItems(processedItems);
+            loadCache();
         } catch (Exception e) {
             LOG.error("Failed to load already processed items: {}", e.getMessage(), e);
         }
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java b/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java
new file mode 100644
index 0000000..0abe2e6
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel;
+
+import java.util.Optional;
+
+/**
+ * This cache stored the resumed data from a {@link ResumeStrategy}.
+ * 
+ * @param <K> the type of the key
+ * @param <V> the type of the offset value
+ */
+public interface ResumeCache<K, V> {
+
+    /**
+     * Adds a value to the cache
+     * 
+     * @param key         the key to add
+     * @param offsetValue the offset value
+     */
+    void add(K key, V offsetValue);
+
+    /**
+     * Checks whether the cache is full
+     * 
+     * @return true if full, or false otherwise
+     */
+    boolean isFull();
+
+    /**
+     * Gets the offset value for the key
+     * 
+     * @param  key the key
+     * @return     the key
+     */
+    Optional<V> get(K key);
+}