You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/10/27 20:08:30 UTC
(camel) branch main updated: Adding possibility to send binary messages to Azure Service Bus (#11838)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new a32512e84ec Adding possibility to send binary messages to Azure Service Bus (#11838)
a32512e84ec is described below
commit a32512e84ecca8ea59531d8a1fab3334c620f7d1
Author: Anatoliy Mazurok <58...@users.noreply.github.com>
AuthorDate: Fri Oct 27 23:08:23 2023 +0300
Adding possibility to send binary messages to Azure Service Bus (#11838)
* Adding possibility to send binary messages to Azure Service Bus
* Introduced new endpoint configuration parameter "binary"
* Added missed generated files and fixed test
* Code review remarks
* Testing line separators
* Reverted unnecessary changes
---
.../camel/catalog/components/azure-servicebus.json | 36 ++++----
.../servicebus/ServiceBusComponentConfigurer.java | 3 +
.../servicebus/ServiceBusEndpointConfigurer.java | 3 +
.../servicebus/ServiceBusEndpointUriFactory.java | 3 +-
.../azure/servicebus/azure-servicebus.json | 36 ++++----
.../src/main/docs/azure-servicebus-component.adoc | 2 +-
.../azure/servicebus/ServiceBusConfiguration.java | 13 +++
.../azure/servicebus/ServiceBusProducer.java | 60 ++++++++++--
.../azure/servicebus/ServiceBusUtils.java | 2 +-
.../operations/ServiceBusSenderOperations.java | 12 +--
.../azure/servicebus/ServiceBusEndpointTest.java | 2 +
.../azure/servicebus/ServiceBusUtilsTest.java | 23 ++++-
.../integration/ServiceBusProducerTest.java | 101 ++++++++++++++++++++-
.../operations/ServiceBusSenderOperationsTest.java | 57 ++++++++++++
14 files changed, 297 insertions(+), 56 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/azure-servicebus.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/azure-servicebus.json
index a5fc6a6ba70..cdc92a1809b 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/azure-servicebus.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/azure-servicebus.json
@@ -38,15 +38,16 @@
"serviceBusReceiveMode": { "index": 13, "kind": "property", "displayName": "Service Bus Receive Mode", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.models.ServiceBusReceiveMode", "enum": [ "PEEK_LOCK", "RECEIVE_AND_DELETE" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "PEEK_LOCK", "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "con [...]
"subQueue": { "index": 14, "kind": "property", "displayName": "Sub Queue", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.models.SubQueue", "enum": [ "NONE", "DEAD_LETTER_QUEUE", "TRANSFER_DEAD_LETTER_QUEUE" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "descriptio [...]
"subscriptionName": { "index": 15, "kind": "property", "displayName": "Subscription Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets the name of the subscription in the topic to listen to. topicOrQueueName an [...]
- "lazyStartProducer": { "index": 16, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fai [...]
- "producerOperation": { "index": 17, "kind": "property", "displayName": "Producer Operation", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sendMessages", "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConf [...]
- "scheduledEnqueueTime": { "index": 18, "kind": "property", "displayName": "Scheduled Enqueue Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets OffsetDateTime at which the message should appear in the S [...]
- "senderAsyncClient": { "index": 19, "kind": "property", "displayName": "Sender Async Client", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets SenderAs [...]
- "serviceBusTransactionContext": { "index": 20, "kind": "property", "displayName": "Service Bus Transaction Context", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Represents [...]
- "autowiredEnabled": { "index": 21, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching [...]
- "connectionString": { "index": 22, "kind": "property", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets the connection string for a Service Bus namespace or a specific Service Bus [...]
- "fullyQualifiedNamespace": { "index": 23, "kind": "property", "displayName": "Fully Qualified Namespace", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Fully Qualified Namespace of the service bus" },
- "tokenCredential": { "index": 24, "kind": "property", "displayName": "Token Credential", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "com.azure.core.credential.TokenCredential", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "A TokenCredential for Azure AD authentication." }
+ "binary": { "index": 16, "kind": "property", "displayName": "Binary", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Set binary mode. If true, message body will be sent as byte. By default, it is false." },
+ "lazyStartProducer": { "index": 17, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fai [...]
+ "producerOperation": { "index": 18, "kind": "property", "displayName": "Producer Operation", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sendMessages", "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConf [...]
+ "scheduledEnqueueTime": { "index": 19, "kind": "property", "displayName": "Scheduled Enqueue Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets OffsetDateTime at which the message should appear in the S [...]
+ "senderAsyncClient": { "index": 20, "kind": "property", "displayName": "Sender Async Client", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets SenderAs [...]
+ "serviceBusTransactionContext": { "index": 21, "kind": "property", "displayName": "Service Bus Transaction Context", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Represents [...]
+ "autowiredEnabled": { "index": 22, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching [...]
+ "connectionString": { "index": 23, "kind": "property", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets the connection string for a Service Bus namespace or a specific Service Bus [...]
+ "fullyQualifiedNamespace": { "index": 24, "kind": "property", "displayName": "Fully Qualified Namespace", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Fully Qualified Namespace of the service bus" },
+ "tokenCredential": { "index": 25, "kind": "property", "displayName": "Token Credential", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "com.azure.core.credential.TokenCredential", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "A TokenCredential for Azure AD authentication." }
},
"headers": {
"CamelAzureServiceBusApplicationProperties": { "index": 0, "kind": "header", "displayName": "", "group": "common", "label": "common", "required": false, "javaType": "Map<String, Object>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The application properties (also known as custom properties) on messages sent and received by the producer and consumer, respectively.", "constantName": "org.apache.camel.component.azure.servicebus.Servi [...]
@@ -94,13 +95,14 @@
"bridgeErrorHandler": { "index": 15, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...]
"exceptionHandler": { "index": 16, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...]
"exchangePattern": { "index": 17, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
- "producerOperation": { "index": 18, "kind": "parameter", "displayName": "Producer Operation", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sendMessages", "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusCon [...]
- "scheduledEnqueueTime": { "index": 19, "kind": "parameter", "displayName": "Scheduled Enqueue Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets OffsetDateTime at which the message should appear in the [...]
- "senderAsyncClient": { "index": 20, "kind": "parameter", "displayName": "Sender Async Client", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets SenderA [...]
- "serviceBusTransactionContext": { "index": 21, "kind": "parameter", "displayName": "Service Bus Transaction Context", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Represents [...]
- "lazyStartProducer": { "index": 22, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...]
- "connectionString": { "index": 23, "kind": "parameter", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets the connection string for a Service Bus namespace or a specific Service Bus [...]
- "fullyQualifiedNamespace": { "index": 24, "kind": "parameter", "displayName": "Fully Qualified Namespace", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Fully Qualified Namespace of the service bus" },
- "tokenCredential": { "index": 25, "kind": "parameter", "displayName": "Token Credential", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "com.azure.core.credential.TokenCredential", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "A TokenCredential for Azure AD authentication." }
+ "binary": { "index": 18, "kind": "parameter", "displayName": "Binary", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Set binary mode. If true, message body will be sent as byte. By default, it is false." },
+ "producerOperation": { "index": 19, "kind": "parameter", "displayName": "Producer Operation", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sendMessages", "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusCon [...]
+ "scheduledEnqueueTime": { "index": 20, "kind": "parameter", "displayName": "Scheduled Enqueue Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets OffsetDateTime at which the message should appear in the [...]
+ "senderAsyncClient": { "index": 21, "kind": "parameter", "displayName": "Sender Async Client", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets SenderA [...]
+ "serviceBusTransactionContext": { "index": 22, "kind": "parameter", "displayName": "Service Bus Transaction Context", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Represents [...]
+ "lazyStartProducer": { "index": 23, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...]
+ "connectionString": { "index": 24, "kind": "parameter", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets the connection string for a Service Bus namespace or a specific Service Bus [...]
+ "fullyQualifiedNamespace": { "index": 25, "kind": "parameter", "displayName": "Fully Qualified Namespace", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Fully Qualified Namespace of the service bus" },
+ "tokenCredential": { "index": 26, "kind": "parameter", "displayName": "Token Credential", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "com.azure.core.credential.TokenCredential", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "A TokenCredential for Azure AD authentication." }
}
}
diff --git a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusComponentConfigurer.java b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusComponentConfigurer.java
index 038dbe05e88..b5d53b7e034 100644
--- a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusComponentConfigurer.java
+++ b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusComponentConfigurer.java
@@ -34,6 +34,7 @@ public class ServiceBusComponentConfigurer extends PropertyConfigurerSupport imp
case "amqpTransportType": getOrCreateConfiguration(target).setAmqpTransportType(property(camelContext, com.azure.core.amqp.AmqpTransportType.class, value)); return true;
case "autowiredenabled":
case "autowiredEnabled": target.setAutowiredEnabled(property(camelContext, boolean.class, value)); return true;
+ case "binary": getOrCreateConfiguration(target).setBinary(property(camelContext, boolean.class, value)); return true;
case "bridgeerrorhandler":
case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
case "clientoptions":
@@ -95,6 +96,7 @@ public class ServiceBusComponentConfigurer extends PropertyConfigurerSupport imp
case "amqpTransportType": return com.azure.core.amqp.AmqpTransportType.class;
case "autowiredenabled":
case "autowiredEnabled": return boolean.class;
+ case "binary": return boolean.class;
case "bridgeerrorhandler":
case "bridgeErrorHandler": return boolean.class;
case "clientoptions":
@@ -152,6 +154,7 @@ public class ServiceBusComponentConfigurer extends PropertyConfigurerSupport imp
case "amqpTransportType": return getOrCreateConfiguration(target).getAmqpTransportType();
case "autowiredenabled":
case "autowiredEnabled": return target.isAutowiredEnabled();
+ case "binary": return getOrCreateConfiguration(target).isBinary();
case "bridgeerrorhandler":
case "bridgeErrorHandler": return target.isBridgeErrorHandler();
case "clientoptions":
diff --git a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointConfigurer.java b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointConfigurer.java
index 64791d2d7be..347cd7fca2b 100644
--- a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointConfigurer.java
+++ b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointConfigurer.java
@@ -25,6 +25,7 @@ public class ServiceBusEndpointConfigurer extends PropertyConfigurerSupport impl
case "amqpRetryOptions": target.getConfiguration().setAmqpRetryOptions(property(camelContext, com.azure.core.amqp.AmqpRetryOptions.class, value)); return true;
case "amqptransporttype":
case "amqpTransportType": target.getConfiguration().setAmqpTransportType(property(camelContext, com.azure.core.amqp.AmqpTransportType.class, value)); return true;
+ case "binary": target.getConfiguration().setBinary(property(camelContext, boolean.class, value)); return true;
case "bridgeerrorhandler":
case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
case "clientoptions":
@@ -87,6 +88,7 @@ public class ServiceBusEndpointConfigurer extends PropertyConfigurerSupport impl
case "amqpRetryOptions": return com.azure.core.amqp.AmqpRetryOptions.class;
case "amqptransporttype":
case "amqpTransportType": return com.azure.core.amqp.AmqpTransportType.class;
+ case "binary": return boolean.class;
case "bridgeerrorhandler":
case "bridgeErrorHandler": return boolean.class;
case "clientoptions":
@@ -145,6 +147,7 @@ public class ServiceBusEndpointConfigurer extends PropertyConfigurerSupport impl
case "amqpRetryOptions": return target.getConfiguration().getAmqpRetryOptions();
case "amqptransporttype":
case "amqpTransportType": return target.getConfiguration().getAmqpTransportType();
+ case "binary": return target.getConfiguration().isBinary();
case "bridgeerrorhandler":
case "bridgeErrorHandler": return target.isBridgeErrorHandler();
case "clientoptions":
diff --git a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointUriFactory.java b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointUriFactory.java
index 5113bae32fb..44df4303677 100644
--- a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointUriFactory.java
+++ b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointUriFactory.java
@@ -21,9 +21,10 @@ public class ServiceBusEndpointUriFactory extends org.apache.camel.support.compo
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(26);
+ Set<String> props = new HashSet<>(27);
props.add("amqpRetryOptions");
props.add("amqpTransportType");
+ props.add("binary");
props.add("bridgeErrorHandler");
props.add("clientOptions");
props.add("connectionString");
diff --git a/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json b/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json
index a5fc6a6ba70..cdc92a1809b 100644
--- a/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json
+++ b/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json
@@ -38,15 +38,16 @@
"serviceBusReceiveMode": { "index": 13, "kind": "property", "displayName": "Service Bus Receive Mode", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.models.ServiceBusReceiveMode", "enum": [ "PEEK_LOCK", "RECEIVE_AND_DELETE" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "PEEK_LOCK", "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "con [...]
"subQueue": { "index": 14, "kind": "property", "displayName": "Sub Queue", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.models.SubQueue", "enum": [ "NONE", "DEAD_LETTER_QUEUE", "TRANSFER_DEAD_LETTER_QUEUE" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "descriptio [...]
"subscriptionName": { "index": 15, "kind": "property", "displayName": "Subscription Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets the name of the subscription in the topic to listen to. topicOrQueueName an [...]
- "lazyStartProducer": { "index": 16, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fai [...]
- "producerOperation": { "index": 17, "kind": "property", "displayName": "Producer Operation", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sendMessages", "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConf [...]
- "scheduledEnqueueTime": { "index": 18, "kind": "property", "displayName": "Scheduled Enqueue Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets OffsetDateTime at which the message should appear in the S [...]
- "senderAsyncClient": { "index": 19, "kind": "property", "displayName": "Sender Async Client", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets SenderAs [...]
- "serviceBusTransactionContext": { "index": 20, "kind": "property", "displayName": "Service Bus Transaction Context", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Represents [...]
- "autowiredEnabled": { "index": 21, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching [...]
- "connectionString": { "index": 22, "kind": "property", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets the connection string for a Service Bus namespace or a specific Service Bus [...]
- "fullyQualifiedNamespace": { "index": 23, "kind": "property", "displayName": "Fully Qualified Namespace", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Fully Qualified Namespace of the service bus" },
- "tokenCredential": { "index": 24, "kind": "property", "displayName": "Token Credential", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "com.azure.core.credential.TokenCredential", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "A TokenCredential for Azure AD authentication." }
+ "binary": { "index": 16, "kind": "property", "displayName": "Binary", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Set binary mode. If true, message body will be sent as byte. By default, it is false." },
+ "lazyStartProducer": { "index": 17, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fai [...]
+ "producerOperation": { "index": 18, "kind": "property", "displayName": "Producer Operation", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sendMessages", "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConf [...]
+ "scheduledEnqueueTime": { "index": 19, "kind": "property", "displayName": "Scheduled Enqueue Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets OffsetDateTime at which the message should appear in the S [...]
+ "senderAsyncClient": { "index": 20, "kind": "property", "displayName": "Sender Async Client", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets SenderAs [...]
+ "serviceBusTransactionContext": { "index": 21, "kind": "property", "displayName": "Service Bus Transaction Context", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Represents [...]
+ "autowiredEnabled": { "index": 22, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching [...]
+ "connectionString": { "index": 23, "kind": "property", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets the connection string for a Service Bus namespace or a specific Service Bus [...]
+ "fullyQualifiedNamespace": { "index": 24, "kind": "property", "displayName": "Fully Qualified Namespace", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Fully Qualified Namespace of the service bus" },
+ "tokenCredential": { "index": 25, "kind": "property", "displayName": "Token Credential", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "com.azure.core.credential.TokenCredential", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "A TokenCredential for Azure AD authentication." }
},
"headers": {
"CamelAzureServiceBusApplicationProperties": { "index": 0, "kind": "header", "displayName": "", "group": "common", "label": "common", "required": false, "javaType": "Map<String, Object>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The application properties (also known as custom properties) on messages sent and received by the producer and consumer, respectively.", "constantName": "org.apache.camel.component.azure.servicebus.Servi [...]
@@ -94,13 +95,14 @@
"bridgeErrorHandler": { "index": 15, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...]
"exceptionHandler": { "index": 16, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...]
"exchangePattern": { "index": 17, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
- "producerOperation": { "index": 18, "kind": "parameter", "displayName": "Producer Operation", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sendMessages", "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusCon [...]
- "scheduledEnqueueTime": { "index": 19, "kind": "parameter", "displayName": "Scheduled Enqueue Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets OffsetDateTime at which the message should appear in the [...]
- "senderAsyncClient": { "index": 20, "kind": "parameter", "displayName": "Sender Async Client", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets SenderA [...]
- "serviceBusTransactionContext": { "index": 21, "kind": "parameter", "displayName": "Service Bus Transaction Context", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Represents [...]
- "lazyStartProducer": { "index": 22, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...]
- "connectionString": { "index": 23, "kind": "parameter", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets the connection string for a Service Bus namespace or a specific Service Bus [...]
- "fullyQualifiedNamespace": { "index": 24, "kind": "parameter", "displayName": "Fully Qualified Namespace", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Fully Qualified Namespace of the service bus" },
- "tokenCredential": { "index": 25, "kind": "parameter", "displayName": "Token Credential", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "com.azure.core.credential.TokenCredential", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "A TokenCredential for Azure AD authentication." }
+ "binary": { "index": 18, "kind": "parameter", "displayName": "Binary", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Set binary mode. If true, message body will be sent as byte. By default, it is false." },
+ "producerOperation": { "index": 19, "kind": "parameter", "displayName": "Producer Operation", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sendMessages", "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusCon [...]
+ "scheduledEnqueueTime": { "index": 20, "kind": "parameter", "displayName": "Scheduled Enqueue Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets OffsetDateTime at which the message should appear in the [...]
+ "senderAsyncClient": { "index": 21, "kind": "parameter", "displayName": "Sender Async Client", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets SenderA [...]
+ "serviceBusTransactionContext": { "index": 22, "kind": "parameter", "displayName": "Service Bus Transaction Context", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Represents [...]
+ "lazyStartProducer": { "index": 23, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...]
+ "connectionString": { "index": 24, "kind": "parameter", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Sets the connection string for a Service Bus namespace or a specific Service Bus [...]
+ "fullyQualifiedNamespace": { "index": 25, "kind": "parameter", "displayName": "Fully Qualified Namespace", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Fully Qualified Namespace of the service bus" },
+ "tokenCredential": { "index": 26, "kind": "parameter", "displayName": "Token Credential", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "com.azure.core.credential.TokenCredential", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "A TokenCredential for Azure AD authentication." }
}
}
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/docs/azure-servicebus-component.adoc b/components/camel-azure/camel-azure-servicebus/src/main/docs/azure-servicebus-component.adoc
index 0707f208759..74d63e78cef 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/docs/azure-servicebus-component.adoc
+++ b/components/camel-azure/camel-azure-servicebus/src/main/docs/azure-servicebus-component.adoc
@@ -62,7 +62,7 @@ include::partial$component-endpoint-headers.adoc[]
// component headers: END
=== Message Body
-In the producer, this component accepts message body of `String` type or `List<String>` to send batch messages.
+In the producer, this component accepts message body of `String`, `byte[]` and `BinaryData` types or `List<String>`, `List<byte[]>` and `List<BinaryData>` to send batch messages.
In the consumer, the returned message body will be of type `String.
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
index b39570b69c2..80193afb301 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
@@ -85,6 +85,8 @@ public class ServiceBusConfiguration implements Cloneable {
private ServiceBusTransactionContext serviceBusTransactionContext;
@UriParam(label = "producer")
private OffsetDateTime scheduledEnqueueTime;
+ @UriParam(label = "producer", defaultValue = "false")
+ private boolean binary;
/**
* Selected topic name or the queue name, that is depending on serviceBusType config. For example if
@@ -340,6 +342,17 @@ public class ServiceBusConfiguration implements Cloneable {
this.peekNumMaxMessages = peekNumMaxMessages;
}
+ /**
+ * Set binary mode. If true, message body will be sent as byte[]. By default, it is false.
+ */
+ public boolean isBinary() {
+ return binary;
+ }
+
+ public void setBinary(boolean binary) {
+ this.binary = binary;
+ }
+
// *************************************************
//
// *************************************************
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
index 5f72f3dd340..e6300d342f2 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.azure.servicebus;
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Path;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
@@ -23,11 +26,13 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
+import com.azure.core.util.BinaryData;
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConverter;
import org.apache.camel.component.azure.servicebus.client.ServiceBusClientFactory;
import org.apache.camel.component.azure.servicebus.client.ServiceBusSenderAsyncClientWrapper;
import org.apache.camel.component.azure.servicebus.operations.ServiceBusSenderOperations;
@@ -147,12 +152,16 @@ public class ServiceBusProducer extends DefaultAsyncProducer {
Mono<Void> sendMessageAsync;
- if (exchange.getMessage().getBody() instanceof Iterable) {
+ if (inputBody instanceof Iterable<?>) {
sendMessageAsync
- = serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<Object>) inputBody),
+ = serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<?>) inputBody),
configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
} else {
- sendMessageAsync = serviceBusSenderOperations.sendMessages(exchange.getMessage().getBody(String.class),
+ Object convertedBody = inputBody instanceof BinaryData ? inputBody
+ : getConfiguration().isBinary() ? convertBodyToBinary(exchange)
+ : exchange.getMessage().getBody(String.class);
+
+ sendMessageAsync = serviceBusSenderOperations.sendMessages(convertedBody,
configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
}
@@ -170,15 +179,18 @@ public class ServiceBusProducer extends DefaultAsyncProducer {
Mono<List<Long>> scheduleMessagesAsync;
- if (exchange.getMessage().getBody() instanceof Iterable) {
+ if (inputBody instanceof Iterable<?>) {
scheduleMessagesAsync
- = serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<Object>) inputBody),
+ = serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<?>) inputBody),
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties);
} else {
+ Object convertedBody = inputBody instanceof BinaryData ? inputBody
+ : getConfiguration().isBinary() ? convertBodyToBinary(exchange)
+ : exchange.getMessage().getBody(String.class);
scheduleMessagesAsync
- = serviceBusSenderOperations.scheduleMessages(exchange.getMessage().getBody(String.class),
+ = serviceBusSenderOperations.scheduleMessages(convertedBody,
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties);
@@ -189,12 +201,44 @@ public class ServiceBusProducer extends DefaultAsyncProducer {
};
}
- private List<String> convertBodyToList(final Iterable<Object> inputBody) {
+ private List<?> convertBodyToList(final Iterable<?> inputBody) {
return StreamSupport.stream(inputBody.spliterator(), false)
- .map(body -> getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, body))
+ .map(this::convertMessageBody)
.toList();
}
+ private Object convertBodyToBinary(Exchange exchange) {
+ Object body = exchange.getMessage().getBody();
+ if (body instanceof InputStream) {
+ return BinaryData.fromStream((InputStream) body);
+ } else if (body instanceof Path) {
+ return BinaryData.fromFile((Path) body);
+ } else if (body instanceof File) {
+ return BinaryData.fromFile(((File) body).toPath());
+ } else {
+ return BinaryData.fromBytes(exchange.getMessage().getBody(byte[].class));
+ }
+ }
+
+ private Object convertMessageBody(Object inputBody) {
+ TypeConverter typeConverter = getEndpoint().getCamelContext().getTypeConverter();
+ if (inputBody instanceof BinaryData) {
+ return inputBody;
+ } else if (getConfiguration().isBinary()) {
+ if (inputBody instanceof InputStream) {
+ return BinaryData.fromStream((InputStream) inputBody);
+ } else if (inputBody instanceof Path) {
+ return BinaryData.fromFile((Path) inputBody);
+ } else if (inputBody instanceof File) {
+ return BinaryData.fromFile(((File) inputBody).toPath());
+ } else {
+ return typeConverter.convertTo(byte[].class, inputBody);
+ }
+ } else {
+ return typeConverter.convertTo(String.class, inputBody);
+ }
+ }
+
private <T> void subscribeToMono(
final Mono<T> inputMono, final Exchange exchange, final Consumer<T> resultsCallback, final AsyncCallback callback) {
inputMono
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
index bc9106f08a2..4f26dbd9d5f 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
@@ -47,7 +47,7 @@ public final class ServiceBusUtils {
}
public static Iterable<ServiceBusMessage> createServiceBusMessages(
- final Iterable<Object> data, final Map<String, Object> applicationProperties) {
+ final Iterable<?> data, final Map<String, Object> applicationProperties) {
return StreamSupport.stream(data.spliterator(), false)
.map(obj -> createServiceBusMessage(obj, applicationProperties))
.collect(Collectors.toList());
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
index 5da15a769fc..612168261f5 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
@@ -42,8 +42,8 @@ public class ServiceBusSenderOperations {
final Object data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
- if (data instanceof Iterable) {
- return sendMessages((Iterable<Object>) data, context, applicationProperties);
+ if (data instanceof Iterable<?>) {
+ return sendMessages((Iterable<?>) data, context, applicationProperties);
}
return sendMessage(data, context, applicationProperties);
@@ -58,15 +58,15 @@ public class ServiceBusSenderOperations {
throw new IllegalArgumentException("To schedule a message, you need to set scheduledEnqueueTime.");
}
- if (data instanceof Iterable) {
- return scheduleMessages((Iterable<Object>) data, scheduledEnqueueTime, context, applicationProperties);
+ if (data instanceof Iterable<?>) {
+ return scheduleMessages((Iterable<?>) data, scheduledEnqueueTime, context, applicationProperties);
}
return scheduleMessage(data, scheduledEnqueueTime, context, applicationProperties);
}
private Mono<Void> sendMessages(
- final Iterable<Object> data,
+ final Iterable<?> data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
@@ -108,7 +108,7 @@ public class ServiceBusSenderOperations {
}
private Mono<List<Long>> scheduleMessages(
- final Iterable<Object> data, final OffsetDateTime scheduledEnqueueTime,
+ final Iterable<?> data, final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java
index 820d7217153..f3a4831a5d9 100644
--- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java
+++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java
@@ -52,6 +52,7 @@ class ServiceBusEndpointTest extends CamelTestSupport {
params.put("serviceBusType", ServiceBusType.topic);
params.put("prefetchCount", 10);
params.put("connectionString", "testString");
+ params.put("binary", "true");
final ServiceBusEndpoint endpoint
= (ServiceBusEndpoint) context.getComponent("azure-servicebus", ServiceBusComponent.class)
@@ -61,6 +62,7 @@ class ServiceBusEndpointTest extends CamelTestSupport {
assertEquals("testTopicOrQueue", endpoint.getConfiguration().getTopicOrQueueName());
assertEquals(10, endpoint.getConfiguration().getPrefetchCount());
assertEquals("testString", endpoint.getConfiguration().getConnectionString());
+ assertEquals(true, endpoint.getConfiguration().isBinary());
}
@Test
diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
index 301e702a545..f952bc81514 100644
--- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
+++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.azure.servicebus;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.StreamSupport;
@@ -38,11 +40,16 @@ public class ServiceBusUtilsTest {
final ServiceBusMessage message2 = ServiceBusUtils.createServiceBusMessage(String.valueOf(12345), null);
assertEquals("12345", message2.getBody().toString());
+
+ //test bytes
+ byte[] testByteBody = "test string".getBytes(StandardCharsets.UTF_8);
+ final ServiceBusMessage message3 = ServiceBusUtils.createServiceBusMessage(testByteBody, null);
+ assertArrayEquals(testByteBody, message3.getBody().toBytes());
}
@Test
void testCreateServiceBusMessages() {
- final List<Object> inputMessages = new LinkedList<>();
+ final List<String> inputMessages = new LinkedList<>();
inputMessages.add("test data");
inputMessages.add(String.valueOf(12345));
@@ -52,5 +59,19 @@ public class ServiceBusUtilsTest {
.anyMatch(record -> record.getBody().toString().equals("test data")));
assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
.anyMatch(record -> record.getBody().toString().equals("12345")));
+
+ //Test bytes
+ final List<byte[]> inputMessages2 = new LinkedList<>();
+ byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
+ byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
+ inputMessages2.add(byteBody1);
+ inputMessages2.add(byteBody2);
+
+ final Iterable<ServiceBusMessage> busMessages2 = ServiceBusUtils.createServiceBusMessages(inputMessages2, null);
+
+ assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
+ .anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody1)));
+ assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
+ .anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody2)));
}
}
diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerTest.java
index 419251207e4..1a98a53a1e0 100644
--- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerTest.java
+++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.camel.component.azure.servicebus.integration;
+import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -49,7 +51,7 @@ class ServiceBusProducerTest extends BaseCamelServiceBusTestSupport {
@Test
public void testSendMessage() throws InterruptedException {
template.send("direct:sendMessage", exchange -> {
- exchange.getIn().setBody(123456789);
+ exchange.getIn().setBody("123456789");
exchange.getIn().setHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.of("customKey", "customValue"));
});
@@ -68,16 +70,38 @@ class ServiceBusProducerTest extends BaseCamelServiceBusTestSupport {
assertTrue(batch1Exists, "test message body");
assertTrue(applicationPropertiesPresent, "test message application properties");
+
+ //test byte body
+ byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
+ template.send("direct:sendMessage", exchange -> {
+ exchange.getIn().setBody(testByteBody);
+ exchange.getIn().setHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.of("customKey", "customValue"));
+ });
+
+ Thread.sleep(1000);
+
+ final List<ServiceBusReceivedMessage> receivedMessages2
+ = receiverAsyncClient.receiveMessages().toStream().toList();
+
+ final boolean batch2Exists = receivedMessages2.stream()
+ .anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString().equals("123456789"));
+
+ final boolean applicationPropertiesPresent2 = receivedMessages2.stream()
+ .anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getApplicationProperties()
+ .containsKey("customKey"));
+
+ assertTrue(batch2Exists, "test byte body");
+ assertTrue(applicationPropertiesPresent2, "test byte message application properties");
}
@Test
public void testSendBatchMessages() throws InterruptedException {
template.send("direct:sendBatchMessages", exchange -> {
- final List<Object> inputBatch = new LinkedList<>();
+ final List<String> inputBatch = new LinkedList<>();
inputBatch.add("test batch 1");
inputBatch.add("test batch 2");
inputBatch.add("test batch 3");
- inputBatch.add(123456);
+ inputBatch.add("123456");
exchange.getIn().setBody(inputBatch);
});
@@ -104,6 +128,30 @@ class ServiceBusProducerTest extends BaseCamelServiceBusTestSupport {
assertTrue(batch2Exists, "test message body 2");
assertTrue(batch3Exists, "test message body 3");
assertTrue(batch4Exists, "test message body 4");
+
+ //test bytes
+ final List<byte[]> inputBatch2 = new LinkedList<>();
+ byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
+ byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
+ inputBatch2.add(byteBody1);
+ inputBatch2.add(byteBody2);
+
+ template.send("direct:sendBatchMessages", exchange -> {
+ exchange.getIn().setBody(inputBatch2);
+ });
+
+ Thread.sleep(1000);
+
+ // let's check our data
+ final Spliterator<ServiceBusReceivedMessage> receivedMessages2
+ = receiverAsyncClient.receiveMessages().toIterable().spliterator();
+ final boolean byteBody1Exists = StreamSupport.stream(receivedMessages2, false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody1));
+ final boolean byteBody2Exists = StreamSupport.stream(receivedMessages2, false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody2));
+
+ assertTrue(byteBody1Exists, "test byte body 1");
+ assertTrue(byteBody2Exists, "test byte body 2");
}
@Test
@@ -123,6 +171,24 @@ class ServiceBusProducerTest extends BaseCamelServiceBusTestSupport {
.anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString().equals("test message"));
assertTrue(batch1Exists);
+
+ //test bytes
+ byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
+ template.send("direct:scheduleMessage", exchange -> {
+ exchange.getIn().setHeader(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME, OffsetDateTime.now());
+ exchange.getIn().setBody(testByteBody);
+ });
+
+ Thread.sleep(1000);
+
+ final Spliterator<ServiceBusReceivedMessage> receivedMessages2
+ = receiverAsyncClient.receiveMessages().toIterable().spliterator();
+
+ final boolean batch2Exists = StreamSupport.stream(receivedMessages2, false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(),
+ testByteBody));
+
+ assertTrue(batch2Exists);
}
@Test
@@ -132,7 +198,7 @@ class ServiceBusProducerTest extends BaseCamelServiceBusTestSupport {
inputBatch.add("test batch 1");
inputBatch.add("test batch 2");
inputBatch.add("test batch 3");
- inputBatch.add(123456);
+ inputBatch.add("123456");
exchange.getIn().setHeader(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME, OffsetDateTime.now());
exchange.getIn().setBody(inputBatch);
@@ -160,6 +226,33 @@ class ServiceBusProducerTest extends BaseCamelServiceBusTestSupport {
assertTrue(batch2Exists, "test message body 2");
assertTrue(batch3Exists, "test message body 3");
assertTrue(batch4Exists, "test message body 4");
+
+ //test bytes
+ final List<byte[]> inputBatch2 = new LinkedList<>();
+ byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
+ byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
+ inputBatch2.add(byteBody1);
+ inputBatch2.add(byteBody2);
+
+ template.send("direct:scheduleBatchMessages", exchange -> {
+ exchange.getIn().setHeader(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME, OffsetDateTime.now());
+ exchange.getIn().setBody(inputBatch2);
+ });
+
+ Thread.sleep(1000);
+
+ // let's check our data
+ final Spliterator<ServiceBusReceivedMessage> receivedMessages2
+ = receiverAsyncClient.receiveMessages().toIterable().spliterator();
+
+ final boolean byteBody1Exists = StreamSupport.stream(receivedMessages2, false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody1));
+
+ final boolean byteBody2Exists = StreamSupport.stream(receivedMessages2, false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody2));
+
+ assertTrue(byteBody1Exists, "test byte message body 1");
+ assertTrue(byteBody2Exists, "test byte message body 2");
}
@Override
diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
index 741867c9d89..f8a53540b5b 100644
--- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
+++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.azure.servicebus.integration.operations;
+import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.stream.StreamSupport;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@EnabledIfSystemProperty(named = "connectionString", matches = ".*",
disabledReason = "Make sure to supply azure servicebus connectionString, e.g: mvn verify -DconnectionString=string")
@@ -93,6 +95,14 @@ public class ServiceBusSenderOperationsTest {
assertTrue(exists, "test message body");
+ //test bytes
+ byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
+ operations.sendMessages(testByteBody, null, Map.of("customKey", "customValue")).block();
+ final boolean exists2 = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(),
+ testByteBody));
+ assertTrue(exists2, "test byte body");
+
// test if we have something other than string or byte[]
assertThrows(IllegalArgumentException.class, () -> {
operations.sendMessages(12345, null, null).block();
@@ -125,6 +135,26 @@ public class ServiceBusSenderOperationsTest {
assertTrue(batch1Exists, "test message body 1");
assertTrue(batch2Exists, "test message body 2");
assertTrue(batch3Exists, "test message body 3");
+
+ //test bytes
+ final List<byte[]> inputBatch2 = new LinkedList<>();
+ byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
+ byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
+ inputBatch2.add(byteBody1);
+ inputBatch2.add(byteBody2);
+
+ operations.sendMessages(inputBatch2, null, null).block();
+ final Spliterator<ServiceBusReceivedMessage> receivedMessages2
+ = clientReceiverWrapper.receiveMessages().toIterable().spliterator();
+
+ final boolean byteBody1Exists = StreamSupport.stream(receivedMessages2, false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody1));
+ final boolean byteBody2Exists = StreamSupport.stream(receivedMessages2, false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody2));
+
+ assertTrue(byteBody1Exists, "test byte body 1");
+ assertTrue(byteBody2Exists, "test byte body 2");
+
}
@Test
@@ -139,6 +169,14 @@ public class ServiceBusSenderOperationsTest {
assertTrue(exists, "test message body");
+ //test bytes
+ byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
+ operations.scheduleMessages(testByteBody, OffsetDateTime.now(), null, null).block();
+ final boolean exists2 = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(),
+ testByteBody));
+ assertTrue(exists2, "test byte body");
+
// test if we have something other than string or byte[]
assertThrows(IllegalArgumentException.class, () -> {
operations.scheduleMessages(12345, OffsetDateTime.now(), null, null).block();
@@ -174,5 +212,24 @@ public class ServiceBusSenderOperationsTest {
assertTrue(batch1Exists, "test message body 1");
assertTrue(batch2Exists, "test message body 2");
assertTrue(batch3Exists, "test message body 3");
+
+ //test bytes
+ final List<byte[]> inputBatch2 = new LinkedList<>();
+ byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
+ byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
+ inputBatch2.add(byteBody1);
+ inputBatch2.add(byteBody2);
+
+ operations.scheduleMessages(inputBatch2, OffsetDateTime.now(), null, null).block();
+ final Spliterator<ServiceBusReceivedMessage> receivedMessages2
+ = clientReceiverWrapper.receiveMessages().toIterable().spliterator();
+
+ final boolean byteBody1Exists = StreamSupport.stream(receivedMessages2, false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody1));
+ final boolean byteBody2Exists = StreamSupport.stream(receivedMessages2, false)
+ .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody2));
+
+ assertTrue(byteBody1Exists, "test byte body 1");
+ assertTrue(byteBody2Exists, "test byte body 2");
}
}