You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2020/09/09 11:51:24 UTC

[camel] branch master updated: CAMEL-15517: Support iterable of events in Azure EventHubs component (#4179)

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 9780179  CAMEL-15517: Support iterable of events in Azure EventHubs component (#4179)
9780179 is described below

commit 97801790ece61614555b7b32d57a8c345134c530
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Wed Sep 9 13:50:54 2020 +0200

    CAMEL-15517: Support iterable of events in Azure EventHubs component (#4179)
---
 .../component/azure/eventhubs/azure-eventhubs.json |   2 +-
 .../src/main/docs/azure-eventhubs-component.adoc   |  16 +++-
 .../operations/EventHubsProducerOperations.java    |  48 +++++++++-
 .../operations/EventHubsProducerOperationsIT.java  | 105 ++++++++++++++++++++-
 4 files changed, 161 insertions(+), 10 deletions(-)

diff --git a/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json b/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
index b564024..7836480 100644
--- a/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
+++ b/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
@@ -40,7 +40,7 @@
     "partitionId": { "kind": "property", "displayName": "Partition Id", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "Sets the identifier of the Event Hub partition that the {link EventData events} will be sent to. If the identifier is not spe [...]
     "partitionKey": { "kind": "property", "displayName": "Partition Key", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "Sets a hashing key to be provided for the batch of events, which instructs the Event Hubs service to map this key to a spec [...]
     "producerAsyncClient": { "kind": "property", "displayName": "Producer Async Client", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.eventhubs.EventHubProducerAsyncClient", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "Sets the EventHubProducerAsyncClient.An asynchronous producer respo [...]
-    "basicPropertyBinding": { "kind": "property", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": true, "secret": false, "defaultValue": false, "description": "Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
+    "basicPropertyBinding": { "kind": "property", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
     "connectionString": { "kind": "property", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "Instead of supplying namespace, sharedAccessKey, sharedAccessName ... etc, you can just supply the connection string [...]
     "sharedAccessKey": { "kind": "property", "displayName": "Shared Access Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "The generated value for the SharedAccessName" },
     "sharedAccessName": { "kind": "property", "displayName": "Shared Access Name", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "The name you chose for your EventHubs SAS keys" }
diff --git a/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc b/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc
index ce08321..ca0c6cb 100644
--- a/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc
+++ b/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc
@@ -133,7 +133,7 @@ The Azure Event Hubs component supports 21 options, which are listed below.
 | *partitionId* (producer) | Sets the identifier of the Event Hub partition that the {link EventData events} will be sent to. If the identifier is not specified, the Event Hubs service will be responsible for routing events that are sent to an available partition. |  | String
 | *partitionKey* (producer) | Sets a hashing key to be provided for the batch of events, which instructs the Event Hubs service to map this key to a specific partition. The selection of a partition is stable for a given partition hashing key. Should any other batches of events be sent using the same exact partition hashing key, the Event Hubs service will route them all to the same partition. This should be specified only when there is a need to group events by partition, but there is fl [...]
 | *producerAsyncClient* (producer) | Sets the EventHubProducerAsyncClient.An asynchronous producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the {link CreateBatchOptions options} specified when creating an \{linkEventDataBatch\}, the events may be automatically routed to an available partition or specific to a partition. Use by this component to produce the data in camel producer. |  | EventHubProducerAsyncClient
-| *basicPropertyBinding* (advanced) | *Deprecated* Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
+| *basicPropertyBinding* (advanced) | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
 | *connectionString* (security) | Instead of supplying namespace, sharedAccessKey, sharedAccessName ... etc, you can just supply the connection string for your eventHub. The connection string for EventHubs already include all the necessary information to connection to your EventHub. To learn on how to generate the connection string, take a look at this documentation: \https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string |  | String
 | *sharedAccessKey* (security) | The generated value for the SharedAccessName |  | String
 | *sharedAccessName* (security) | The name you chose for your EventHubs SAS keys |  | String
@@ -228,6 +228,20 @@ from("direct:start")
 .to("azure-eventhubs:?connectionString=RAW({{connectionString}})"
 ```
 
+Also, the component supports as well *aggregation* of messages by sending events as *iterable* of either Exchanges/Messages or normal data (e.g: list of Strings). For example:
+```
+from("direct:start")
+.process(exchange -> {
+        final List<String> messages = new LinkedList<>();
+        messages.add("Test String Message 1");
+        messages.add("Test String Message 2");
+
+        exchange.getIn().setHeader(EventHubsConstants.PARTITION_ID, firstPartition);
+        exchange.getIn().setBody(messages);
+})
+.to("azure-eventhubs:?connectionString=RAW({{connectionString}})"
+```
+
 === Development Notes (Important)
 When developing on this component, you will need to obtain your Azure accessKey in order to run the integration tests. In addition to the mocked unit tests
 you *will need to run the integration tests with every change you make or even client upgrade as the Azure client can break things even on minor versions upgrade.*
diff --git a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
index 9183f03..4037ba2 100644
--- a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
+++ b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
@@ -17,12 +17,16 @@
 package org.apache.camel.component.azure.eventhubs.operations;
 
 import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 
 import com.azure.messaging.eventhubs.EventData;
 import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
 import com.azure.messaging.eventhubs.models.SendOptions;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.TypeConverter;
 import org.apache.camel.component.azure.eventhubs.EventHubsConfiguration;
 import org.apache.camel.component.azure.eventhubs.EventHubsConfigurationOptionsProxy;
 import org.apache.camel.util.ObjectHelper;
@@ -97,16 +101,52 @@ public class EventHubsProducerOperations {
                 .setPartitionKey(partitionKey);
     }
 
+    @SuppressWarnings("unchecked")
     private Iterable<EventData> createEventData(final Exchange exchange) {
-        final byte[] data = exchange.getIn().getBody(byte[].class);
+        // check if our exchange is list or contain some values
+        if (exchange.getIn().getBody() instanceof Iterable) {
+            return createEventDataFromIterable((Iterable<Object>) exchange.getIn().getBody(),
+                    exchange.getContext().getTypeConverter());
+        }
+
+        // we have only a single event here
+        return Collections.singletonList(createEventDataFromExchange(exchange));
+    }
+
+    private Iterable<EventData> createEventDataFromIterable(final Iterable<Object> inputData, final TypeConverter converter) {
+        final List<EventData> finalEventData = new LinkedList<>();
+
+        inputData.forEach(data -> {
+            if (data instanceof Exchange) {
+                finalEventData.add(createEventDataFromExchange((Exchange) data));
+            } else if (data instanceof Message) {
+                finalEventData.add(createEventDataFromMessage((Message) data));
+            } else {
+                finalEventData.add(createEventDataFromObject(data, converter));
+            }
+        });
+
+        return finalEventData;
+    }
+
+    private EventData createEventDataFromExchange(final Exchange exchange) {
+        return createEventDataFromMessage(exchange.getIn());
+    }
+
+    private EventData createEventDataFromMessage(final Message message) {
+        return createEventDataFromObject(message.getBody(), message.getExchange().getContext().getTypeConverter());
+    }
+
+    private EventData createEventDataFromObject(final Object inputData, final TypeConverter converter) {
+        final byte[] data = converter.convertTo(byte[].class, inputData);
 
         if (ObjectHelper.isEmpty(data)) {
             throw new IllegalArgumentException(
                     String.format("Cannot convert message body %s to byte[]. You will need "
                                   + "to make sure the data encoded in byte[] or add a Camel TypeConverter to convert the data to byte[]",
-                            exchange.getIn().getBody()));
+                            inputData));
         }
-        // for now we only support single event
-        return Collections.singletonList(new EventData(data));
+
+        return new EventData(data);
     }
 }
diff --git a/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java b/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
index 03e09f0..e5d7960 100644
--- a/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
+++ b/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.azure.eventhubs.operations;
 
 import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -32,6 +34,7 @@ import org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -40,6 +43,8 @@ import org.junit.jupiter.api.TestInstance;
 class EventHubsProducerOperationsIT extends CamelTestSupport {
 
     private EventHubsConfiguration configuration;
+    private EventHubProducerAsyncClient producerAsyncClient;
+    private EventHubConsumerAsyncClient consumerAsyncClient;
 
     @BeforeAll
     public void prepare() throws Exception {
@@ -48,15 +53,14 @@ class EventHubsProducerOperationsIT extends CamelTestSupport {
         configuration = new EventHubsConfiguration();
         configuration.setConnectionString(properties.getProperty("connectionString"));
         configuration.setConsumerGroupName(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME);
+
+        producerAsyncClient = EventHubsClientFactory.createEventHubProducerAsyncClient(configuration);
+        consumerAsyncClient = EventHubsClientFactory.createEventHubConsumerAsyncClient(configuration);
     }
 
     @Test
     public void testSendEventWithSpecificPartition() {
-        final EventHubProducerAsyncClient producerAsyncClient
-                = EventHubsClientFactory.createEventHubProducerAsyncClient(configuration);
         final EventHubsProducerOperations operations = new EventHubsProducerOperations(producerAsyncClient, configuration);
-        final EventHubConsumerAsyncClient consumerAsyncClient
-                = EventHubsClientFactory.createEventHubConsumerAsyncClient(configuration);
         final String firstPartition = producerAsyncClient.getPartitionIds().blockLast();
         final Exchange exchange = new DefaultExchange(context);
 
@@ -84,7 +88,100 @@ class EventHubsProducerOperationsIT extends CamelTestSupport {
 
                     return eventExists;
                 });
+    }
+
+    @Test
+    public void testIterableExchangesSendEventsWithSpecificPartition() {
+        final EventHubsProducerOperations operations = new EventHubsProducerOperations(producerAsyncClient, configuration);
+        final String firstPartition = producerAsyncClient.getPartitionIds().blockLast();
+
+        final Exchange exchange1 = new DefaultExchange(context);
+        final Exchange exchange2 = new DefaultExchange(context);
+
+        exchange1.getIn().setBody("Exchange Message 1");
+        exchange2.getIn().setBody("Exchange Message 2");
+
+        final List<Exchange> exchanges = new LinkedList<>();
+        exchanges.add(exchange1);
+        exchanges.add(exchange2);
+
+        final Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody(exchanges);
+
+        operations.sendEvents(exchange, doneSync -> {
+        });
+
+        Awaitility.await()
+                .atMost(40, TimeUnit.SECONDS)
+                .pollDelay(Duration.ofSeconds(2))
+                .pollInterval(Duration.ofSeconds(2))
+                .until(() -> {
+                    final Boolean event1Exists = consumerAsyncClient
+                            .receiveFromPartition(firstPartition, EventPosition.earliest())
+                            .any(partitionEvent -> partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+                                    && partitionEvent.getData().getBodyAsString()
+                                            .contains("Exchange Message 1"))
+                            .block();
+
+                    final Boolean event2Exists = consumerAsyncClient
+                            .receiveFromPartition(firstPartition, EventPosition.earliest())
+                            .any(partitionEvent -> partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+                                    && partitionEvent.getData().getBodyAsString()
+                                            .contains("Exchange Message 2"))
+                            .block();
+
+                    if (event1Exists == null || event2Exists == null) {
+                        return false;
+                    }
+
+                    return event1Exists && event2Exists;
+                });
+    }
+
+    @Test
+    public void testIterableStringSendEventsWithSpecificPartition() {
+        final EventHubsProducerOperations operations = new EventHubsProducerOperations(producerAsyncClient, configuration);
+        final String firstPartition = producerAsyncClient.getPartitionIds().blockLast();
+
+        final List<String> messages = new LinkedList<>();
+        messages.add("Test String Message 1");
+        messages.add("Test String Message 2");
+
+        final Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody(messages);
+
+        operations.sendEvents(exchange, doneSync -> {
+        });
+
+        Awaitility.await()
+                .atMost(40, TimeUnit.SECONDS)
+                .pollDelay(Duration.ofSeconds(2))
+                .pollInterval(Duration.ofSeconds(2))
+                .until(() -> {
+                    final Boolean event1Exists = consumerAsyncClient
+                            .receiveFromPartition(firstPartition, EventPosition.earliest())
+                            .any(partitionEvent -> partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+                                    && partitionEvent.getData().getBodyAsString()
+                                            .contains("Test String Message 1"))
+                            .block();
+
+                    final Boolean event2Exists = consumerAsyncClient
+                            .receiveFromPartition(firstPartition, EventPosition.earliest())
+                            .any(partitionEvent -> partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+                                    && partitionEvent.getData().getBodyAsString()
+                                            .contains("Test String Message 2"))
+                            .block();
+
+                    if (event1Exists == null || event2Exists == null) {
+                        return false;
+                    }
+
+                    return event1Exists && event2Exists;
+                });
+    }
 
+    @AfterAll
+    public void tearDown() {
         producerAsyncClient.close();
         consumerAsyncClient.close();
     }