You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/10/15 07:58:08 UTC

[camel] branch main updated: CAMEL-18596: Use async checkpoint updating method from Azure EventCon… (#8492)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new d0cf9297e0b CAMEL-18596: Use async checkpoint updating method from Azure EventCon… (#8492)
d0cf9297e0b is described below

commit d0cf9297e0bb367f96722b1e2098cf204694944a
Author: Jani Yli-Paavola <jy...@users.noreply.github.com>
AuthorDate: Sat Oct 15 10:58:02 2022 +0300

    CAMEL-18596: Use async checkpoint updating method from Azure EventCon… (#8492)
    
    * CAMEL-18596: Use async checkpoint updating method from Azure EventContext
    
    * CAMEL-18596: Test processedEvents count's status changes after checkpoint updates
    
    * CAMEL-18596: Add missing license header
    
    * CAMEL-18596 Change order of imports
    
    * CAMEL-18596: Do not store error in exchange
    
    Co-authored-by: Jani Yli-Paavola <ex...@varma.fi>
---
 .../EventHubsCheckpointUpdaterTimerTask.java       |  9 +++-
 .../azure/eventhubs/EventHubsConsumer.java         | 10 +++-
 .../EventHubsCheckpointUpdaterTimerTaskTest.java   | 60 ++++++++++++++++++++++
 3 files changed, 75 insertions(+), 4 deletions(-)

diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
index 16daf2d7556..4b3ab42ab70 100644
--- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
+++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
@@ -39,8 +39,13 @@ public class EventHubsCheckpointUpdaterTimerTask extends TimerTask {
     public void run() {
         if (processedEvents.get() > 0) {
             LOG.debug("checkpointing offset after reaching timeout, with a batch of {}", processedEvents.get());
-            eventContext.updateCheckpoint();
-            processedEvents.set(0);
+            eventContext.updateCheckpointAsync()
+                    .subscribe(unused -> LOG.debug("Processed one event..."),
+                            error -> LOG.debug("Error when updating Checkpoint: {}", error.getMessage()),
+                            () -> {
+                                LOG.debug("Checkpoint updated.");
+                                processedEvents.set(0);
+                            });
         } else {
             LOG.debug("skip checkpointing offset even if timeout is reached. No events processed");
         }
diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index 474cb19cf12..e9f43681954 100644
--- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -173,8 +173,14 @@ public class EventHubsConsumer extends DefaultConsumer {
         try {
             var completionCondition = processCheckpoint(exchange);
             if (completionCondition.equals(COMPLETED_BY_SIZE)) {
-                eventContext.updateCheckpoint();
-                processedEvents.set(0);
+                eventContext.updateCheckpointAsync()
+                        .subscribe(unused -> LOG.debug("Processed one event..."),
+                                error -> LOG.debug("Error when updating Checkpoint: {}", error.getMessage()),
+                                () -> {
+                                    processedEvents.set(0);
+                                    LOG.debug("Checkpoint updated.");
+                                });
+
             } else if (!completionCondition.equals(COMPLETED_BY_TIMEOUT)) {
                 processedEvents.incrementAndGet();
             }
diff --git a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java
new file mode 100644
index 00000000000..e98c949c1e0
--- /dev/null
+++ b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.azure.messaging.eventhubs.models.EventContext;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import reactor.core.publisher.Mono;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class EventHubsCheckpointUpdaterTimerTaskTest {
+
+    @Test
+    void testProcessedEventsResetWhenCheckpointUpdated() {
+        var processedEvents = new AtomicInteger(1);
+        var eventContext = Mockito.mock(EventContext.class);
+
+        Mockito.when(eventContext.updateCheckpointAsync())
+                .thenReturn(Mono.just("").then());
+
+        var timerTask = new EventHubsCheckpointUpdaterTimerTask(eventContext, processedEvents);
+
+        timerTask.run();
+
+        assertEquals(0, processedEvents.get());
+    }
+
+    @Test
+    void testProcessedEventsNotResetWhenCheckpointUpdateFails() {
+        var processedEvents = new AtomicInteger(1);
+        var eventContext = Mockito.mock(EventContext.class);
+
+        Mockito.when(eventContext.updateCheckpointAsync())
+                .thenReturn(Mono.error(new RuntimeException()));
+
+        var timerTask = new EventHubsCheckpointUpdaterTimerTask(eventContext, processedEvents);
+
+        timerTask.run();
+
+        assertEquals(1, processedEvents.get());
+    }
+
+}