You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/10/15 07:58:08 UTC
[camel] branch main updated: CAMEL-18596: Use async checkpoint updating method from Azure EventCon… (#8492)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new d0cf9297e0b CAMEL-18596: Use async checkpoint updating method from Azure EventCon… (#8492)
d0cf9297e0b is described below
commit d0cf9297e0bb367f96722b1e2098cf204694944a
Author: Jani Yli-Paavola <jy...@users.noreply.github.com>
AuthorDate: Sat Oct 15 10:58:02 2022 +0300
CAMEL-18596: Use async checkpoint updating method from Azure EventCon… (#8492)
* CAMEL-18596: Use async checkpoint updating method from Azure EventContext
* CAMEL-18596: Test processedEvents count's status changes after checkpoint updates
* CAMEL-18596: Add missing license header
* CAMEL-18596 Change order of imports
* CAMEL-18596: Do not store error in exchange
Co-authored-by: Jani Yli-Paavola <ex...@varma.fi>
---
.../EventHubsCheckpointUpdaterTimerTask.java | 9 +++-
.../azure/eventhubs/EventHubsConsumer.java | 10 +++-
.../EventHubsCheckpointUpdaterTimerTaskTest.java | 60 ++++++++++++++++++++++
3 files changed, 75 insertions(+), 4 deletions(-)
diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
index 16daf2d7556..4b3ab42ab70 100644
--- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
+++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
@@ -39,8 +39,13 @@ public class EventHubsCheckpointUpdaterTimerTask extends TimerTask {
public void run() {
if (processedEvents.get() > 0) {
LOG.debug("checkpointing offset after reaching timeout, with a batch of {}", processedEvents.get());
- eventContext.updateCheckpoint();
- processedEvents.set(0);
+ eventContext.updateCheckpointAsync()
+ .subscribe(unused -> LOG.debug("Processed one event..."),
+ error -> LOG.debug("Error when updating Checkpoint: {}", error.getMessage()),
+ () -> {
+ LOG.debug("Checkpoint updated.");
+ processedEvents.set(0);
+ });
} else {
LOG.debug("skip checkpointing offset even if timeout is reached. No events processed");
}
diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index 474cb19cf12..e9f43681954 100644
--- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -173,8 +173,14 @@ public class EventHubsConsumer extends DefaultConsumer {
try {
var completionCondition = processCheckpoint(exchange);
if (completionCondition.equals(COMPLETED_BY_SIZE)) {
- eventContext.updateCheckpoint();
- processedEvents.set(0);
+ eventContext.updateCheckpointAsync()
+ .subscribe(unused -> LOG.debug("Processed one event..."),
+ error -> LOG.debug("Error when updating Checkpoint: {}", error.getMessage()),
+ () -> {
+ processedEvents.set(0);
+ LOG.debug("Checkpoint updated.");
+ });
+
} else if (!completionCondition.equals(COMPLETED_BY_TIMEOUT)) {
processedEvents.incrementAndGet();
}
diff --git a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java
new file mode 100644
index 00000000000..e98c949c1e0
--- /dev/null
+++ b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.azure.messaging.eventhubs.models.EventContext;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import reactor.core.publisher.Mono;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class EventHubsCheckpointUpdaterTimerTaskTest {
+
+ @Test
+ void testProcessedEventsResetWhenCheckpointUpdated() {
+ var processedEvents = new AtomicInteger(1);
+ var eventContext = Mockito.mock(EventContext.class);
+
+ Mockito.when(eventContext.updateCheckpointAsync())
+ .thenReturn(Mono.just("").then());
+
+ var timerTask = new EventHubsCheckpointUpdaterTimerTask(eventContext, processedEvents);
+
+ timerTask.run();
+
+ assertEquals(0, processedEvents.get());
+ }
+
+ @Test
+ void testProcessedEventsNotResetWhenCheckpointUpdateFails() {
+ var processedEvents = new AtomicInteger(1);
+ var eventContext = Mockito.mock(EventContext.class);
+
+ Mockito.when(eventContext.updateCheckpointAsync())
+ .thenReturn(Mono.error(new RuntimeException()));
+
+ var timerTask = new EventHubsCheckpointUpdaterTimerTask(eventContext, processedEvents);
+
+ timerTask.run();
+
+ assertEquals(1, processedEvents.get());
+ }
+
+}