You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/10/02 09:10:36 UTC
[camel-kamelets-examples] branch main updated: Added example of route from kafka to log (#30)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
The following commit(s) were added to refs/heads/main by this push:
new 84d9715 Added example of route from kafka to log (#30)
84d9715 is described below
commit 84d9715ec1fa8df1ba2796c475d4bacf7af14cd8
Author: Vedran Kolka <ve...@syntio.net>
AuthorDate: Mon Oct 2 11:10:30 2023 +0200
Added example of route from kafka to log (#30)
* added working example of route reading from kafka and logging
* deleted commented code
* renamed exmaple maven project to com.acme.example
* added needed infrastructure
* fixed and cleaned tf script
* renamed properties in properties file
* renamed properties
* wrote instructions
* added instructions for running with Spring Boot
---
.../README.md | 44 ++
.../application.properties.template | 4 +
.../azure-identity/pom.xml | 97 +++++
.../azure-identity/src/main/avro/order.avsc | 28 ++
.../azure/DefaultAzureCredentialWrapper.java | 21 +
.../java/com/acme/example/eventhubs/Produce.java | 65 +++
.../com/acme/example/eventhubs/models/Order.java | 475 +++++++++++++++++++++
.../kafka-log.yaml | 36 ++
.../main.tf | 75 ++++
9 files changed, 845 insertions(+)
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/README.md b/jbang/azure-eventhubs-kafka-azure-schema-registry/README.md
new file mode 100644
index 0000000..4b1f192
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/README.md
@@ -0,0 +1,44 @@
+# Example for consuming from EventHubs in Avro format, using Azure Schema Registry
+
+This example shows a YAML DSL route for consuming Avro messages from Eventhubs using Azure Schema Registry.
+The exmaple also includes a producer for convenience, as well as a wrapper around [DefaultAzureCredentials](https://learn.microsoft.com/en-us/java/api/com.azure.identity.defaultazurecredential?view=azure-java-stable)
+to solve the instantiation problem, as the class uses a builder for instantiating.
+
+## Build the infrastructure
+
+Choose a globally unique name for the eventhubs namespace and edit it in the terraform [script](main.tf).
+Then, create the services using the script.
+
+## Configure the applications
+
+Use [application.properties.template](application.properties.template) to create `application.properties` and define YOur eventhubs namespace in there.
+After the services have been created, the connection string for the eventhub can be found on the Azure Console,
+or by running:
+```bash
+az eventhubs eventhub authorization-rule keys list --resource-group "example-rg" --namespace-name "example-namespace" --eventhub-name "my-topic" --name "rw_policy"
+```
+Set the `primaryConnectionString` as the `connectionstring` in `application.properties`.
+
+## Produce to Eventhubs.
+
+Run [`Produce.java`](./azure-identity/src/main/java/com/acme/example/eventhubs/Produce.java) to produce a message to the Eventhub.
+
+## Consume from Eventhubs.
+
+To consume messages using a Camel route, first install the azure identity maven project:
+```bash
+cd azure-identity
+mvn clean install
+```
+then run:
+```bash
+camel run kafka-log.yaml camel-kafka-3.22.0.jar --properties application.properties
+```
+> At the time of writing, there was a [problem](https://github.com/apache/camel-kamelets-examples/issues/21#issuecomment-1732603257) running this example with camel-kafka-4.0.0 so the example was developed using version 3.22.0, which can be found [here](https://repository.apache.org/content/groups/snapshots/org/apache/camel/camel-kafka/3.22.0-SNAPSHOT/)
+
+To run the example without JBang, but e.g. Spring Boot, You can export the route:
+```bash
+camel export application.properties kafka-log.yaml --runtime=spring-boot --directory=code --gav com.acme.example:azure-sr:1.0.0
+```
+And run as a regular Spring Boot application.
+> Using this approach, camel-kafka-4.0.0 works, and no dependencies need to be modified.
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/application.properties.template b/jbang/azure-eventhubs-kafka-azure-schema-registry/application.properties.template
new file mode 100644
index 0000000..31a6623
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/application.properties.template
@@ -0,0 +1,4 @@
+bootstrap.servers=<EVENTHUBS_NAMESPACE>.servicebus.windows.net:9093
+schema.registry.url=https://<EVENTHUBS_NAMESPACE>.servicebus.windows.net
+topic=my-topic
+connectionstring="<ACTUAL_CONNECTION_STRING>"
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/pom.xml b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/pom.xml
new file mode 100644
index 0000000..dee82f6
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/pom.xml
@@ -0,0 +1,97 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.acme.example</groupId>
+ <artifactId>azure-identity</artifactId>
+ <packaging>jar</packaging>
+ <version>0.1</version>
+ <name>azure-identity</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-data-schemaregistry-apacheavro</artifactId>
+ <version>1.1.4</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.microsoft.azure/azure-schemaregistry-kafka-avro -->
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-schemaregistry-kafka-avro</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.azure/azure-identity -->
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-identity</artifactId>
+ <version>1.9.0</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>3.4.0</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.2</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>2.0.7</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>2.0.7</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.11.0</version>
+ <configuration>
+ <source>17</source>
+ <target>17</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>1.8.2</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>src/main/avro</sourceDirectory>
+ <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
+ <stringType>String</stringType>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/avro/order.avsc b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/avro/order.avsc
new file mode 100644
index 0000000..69d0008
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/avro/order.avsc
@@ -0,0 +1,28 @@
+{
+ "doc": "Fact schema of an order",
+ "fields": [
+ {
+ "doc": "Unique id of the order.",
+ "name": "orderId",
+ "type": "int"
+ },
+ {
+ "doc": "Id of the ordered item.",
+ "name": "itemId",
+ "type": "string"
+ },
+ {
+ "doc": "Id of the user who ordered the item.",
+ "name": "userId",
+ "type": "string"
+ },
+ {
+ "doc": "How much was ordered.",
+ "name": "quantity",
+ "type": "double"
+ }
+ ],
+ "name": "Order",
+ "namespace": "com.acme.example.eventhubs.models",
+ "type": "record"
+}
\ No newline at end of file
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/azure/DefaultAzureCredentialWrapper.java b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/azure/DefaultAzureCredentialWrapper.java
new file mode 100644
index 0000000..0c8c91c
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/azure/DefaultAzureCredentialWrapper.java
@@ -0,0 +1,21 @@
+package com.acme.example.azure;
+
+import com.azure.core.credential.AccessToken;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.credential.TokenRequestContext;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import reactor.core.publisher.Mono;
+
+public class DefaultAzureCredentialWrapper implements TokenCredential {
+
+ private final TokenCredential credential;
+
+ public DefaultAzureCredentialWrapper() {
+ this.credential = new DefaultAzureCredentialBuilder().build();
+ }
+
+ @Override
+ public Mono<AccessToken> getToken(TokenRequestContext tokenRequestContext) {
+ return this.credential.getToken(tokenRequestContext);
+ }
+}
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/Produce.java b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/Produce.java
new file mode 100644
index 0000000..e50d034
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/Produce.java
@@ -0,0 +1,65 @@
+package com.acme.example.eventhubs;
+
+import com.acme.example.azure.DefaultAzureCredentialWrapper;
+import com.acme.example.eventhubs.models.Order;
+import com.azure.core.credential.TokenCredential;
+import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer;
+import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializerConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class Produce {
+
+ public static final String DEFAULT_PROPERTIES_PATH = "../application.properties";
+
+ public static void main(String[] args) throws IOException {
+ String propertiesPath = DEFAULT_PROPERTIES_PATH;
+ if (args.length >= 1) {
+ propertiesPath = args[0];
+ }
+
+ Properties properties = new Properties();
+ properties.load(Files.newInputStream(Paths.get(propertiesPath)));
+
+ TokenCredential credential = new DefaultAzureCredentialWrapper();
+ String password = properties.getProperty("connectionstring");
+ String saslJaas = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+ "username=\"$ConnectionString\"" +
+ "password=" + password + ";";
+
+ properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+ properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaas);
+
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
+
+ properties.put(KafkaAvroSerializerConfig.SCHEMA_GROUP_CONFIG, "avro");
+ properties.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS_CONFIG, true);
+ properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential);
+
+ try (KafkaProducer<String, Order> orderProducer = new KafkaProducer<>(properties)) {
+ Order order = new Order(1, "item", "user", 3.0);
+ String topic = properties.getProperty("topic");
+ ProducerRecord<String, Order> record = new ProducerRecord<>(topic, "key", order);
+ RecordMetadata result = orderProducer.send(record).get(5, TimeUnit.SECONDS);
+ System.out.println("Sent record with offset " + result.offset());
+ } catch (ExecutionException | InterruptedException | TimeoutException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
\ No newline at end of file
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/models/Order.java b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/models/Order.java
new file mode 100644
index 0000000..7dd055a
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/models/Order.java
@@ -0,0 +1,475 @@
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package com.acme.example.eventhubs.models;
+
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.SchemaStore;
+
+@SuppressWarnings("all")
+/** Fact schema of an order */
+@org.apache.avro.specific.AvroGenerated
+public class Order extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ private static final long serialVersionUID = -6339119225573548571L;
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.acme.example.eventhubs.models\",\"doc\":\"Fact schema of an order\",\"fields\":[{\"name\":\"orderId\",\"type\":\"int\",\"doc\":\"Unique id of the order.\"},{\"name\":\"itemId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Id of the ordered item.\"},{\"name\":\"userId\",\"type\":{\"type\":\"string\",\"avro [...]
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+ private static final BinaryMessageEncoder<Order> ENCODER =
+ new BinaryMessageEncoder<Order>(MODEL$, SCHEMA$);
+
+ private static final BinaryMessageDecoder<Order> DECODER =
+ new BinaryMessageDecoder<Order>(MODEL$, SCHEMA$);
+
+ /**
+ * Return the BinaryMessageDecoder instance used by this class.
+ */
+ public static BinaryMessageDecoder<Order> getDecoder() {
+ return DECODER;
+ }
+
+ /**
+ * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
+ * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+ */
+ public static BinaryMessageDecoder<Order> createDecoder(SchemaStore resolver) {
+ return new BinaryMessageDecoder<Order>(MODEL$, SCHEMA$, resolver);
+ }
+
+ /** Serializes this Order to a ByteBuffer. */
+ public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+ return ENCODER.encode(this);
+ }
+
+ /** Deserializes a Order from a ByteBuffer. */
+ public static Order fromByteBuffer(
+ java.nio.ByteBuffer b) throws java.io.IOException {
+ return DECODER.decode(b);
+ }
+
+ /** Unique id of the order. */
+ @Deprecated public int orderId;
+ /** Id of the ordered item. */
+ @Deprecated public java.lang.String itemId;
+ /** Id of the user who ordered the item. */
+ @Deprecated public java.lang.String userId;
+ /** How much was ordered. */
+ @Deprecated public double quantity;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use <code>newBuilder()</code>.
+ */
+ public Order() {}
+
+ /**
+ * All-args constructor.
+ * @param orderId Unique id of the order.
+ * @param itemId Id of the ordered item.
+ * @param userId Id of the user who ordered the item.
+ * @param quantity How much was ordered.
+ */
+ public Order(java.lang.Integer orderId, java.lang.String itemId, java.lang.String userId, java.lang.Double quantity) {
+ this.orderId = orderId;
+ this.itemId = itemId;
+ this.userId = userId;
+ this.quantity = quantity;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return orderId;
+ case 1: return itemId;
+ case 2: return userId;
+ case 3: return quantity;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: orderId = (java.lang.Integer)value$; break;
+ case 1: itemId = (java.lang.String)value$; break;
+ case 2: userId = (java.lang.String)value$; break;
+ case 3: quantity = (java.lang.Double)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'orderId' field.
+ * @return Unique id of the order.
+ */
+ public java.lang.Integer getOrderId() {
+ return orderId;
+ }
+
+ /**
+ * Sets the value of the 'orderId' field.
+ * Unique id of the order.
+ * @param value the value to set.
+ */
+ public void setOrderId(java.lang.Integer value) {
+ this.orderId = value;
+ }
+
+ /**
+ * Gets the value of the 'itemId' field.
+ * @return Id of the ordered item.
+ */
+ public java.lang.String getItemId() {
+ return itemId;
+ }
+
+ /**
+ * Sets the value of the 'itemId' field.
+ * Id of the ordered item.
+ * @param value the value to set.
+ */
+ public void setItemId(java.lang.String value) {
+ this.itemId = value;
+ }
+
+ /**
+ * Gets the value of the 'userId' field.
+ * @return Id of the user who ordered the item.
+ */
+ public java.lang.String getUserId() {
+ return userId;
+ }
+
+ /**
+ * Sets the value of the 'userId' field.
+ * Id of the user who ordered the item.
+ * @param value the value to set.
+ */
+ public void setUserId(java.lang.String value) {
+ this.userId = value;
+ }
+
+ /**
+ * Gets the value of the 'quantity' field.
+ * @return How much was ordered.
+ */
+ public java.lang.Double getQuantity() {
+ return quantity;
+ }
+
+ /**
+ * Sets the value of the 'quantity' field.
+ * How much was ordered.
+ * @param value the value to set.
+ */
+ public void setQuantity(java.lang.Double value) {
+ this.quantity = value;
+ }
+
+ /**
+ * Creates a new Order RecordBuilder.
+ * @return A new Order RecordBuilder
+ */
+ public static com.acme.example.eventhubs.models.Order.Builder newBuilder() {
+ return new com.acme.example.eventhubs.models.Order.Builder();
+ }
+
+ /**
+ * Creates a new Order RecordBuilder by copying an existing Builder.
+ * @param other The existing builder to copy.
+ * @return A new Order RecordBuilder
+ */
+ public static com.acme.example.eventhubs.models.Order.Builder newBuilder(com.acme.example.eventhubs.models.Order.Builder other) {
+ return new com.acme.example.eventhubs.models.Order.Builder(other);
+ }
+
+ /**
+ * Creates a new Order RecordBuilder by copying an existing Order instance.
+ * @param other The existing instance to copy.
+ * @return A new Order RecordBuilder
+ */
+ public static com.acme.example.eventhubs.models.Order.Builder newBuilder(com.acme.example.eventhubs.models.Order other) {
+ return new com.acme.example.eventhubs.models.Order.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for Order instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Order>
+ implements org.apache.avro.data.RecordBuilder<Order> {
+
+ /** Unique id of the order. */
+ private int orderId;
+ /** Id of the ordered item. */
+ private java.lang.String itemId;
+ /** Id of the user who ordered the item. */
+ private java.lang.String userId;
+ /** How much was ordered. */
+ private double quantity;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(SCHEMA$);
+ }
+
+ /**
+ * Creates a Builder by copying an existing Builder.
+ * @param other The existing Builder to copy.
+ */
+ private Builder(com.acme.example.eventhubs.models.Order.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.orderId)) {
+ this.orderId = data().deepCopy(fields()[0].schema(), other.orderId);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.itemId)) {
+ this.itemId = data().deepCopy(fields()[1].schema(), other.itemId);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.userId)) {
+ this.userId = data().deepCopy(fields()[2].schema(), other.userId);
+ fieldSetFlags()[2] = true;
+ }
+ if (isValidValue(fields()[3], other.quantity)) {
+ this.quantity = data().deepCopy(fields()[3].schema(), other.quantity);
+ fieldSetFlags()[3] = true;
+ }
+ }
+
+ /**
+ * Creates a Builder by copying an existing Order instance
+ * @param other The existing instance to copy.
+ */
+ private Builder(com.acme.example.eventhubs.models.Order other) {
+ super(SCHEMA$);
+ if (isValidValue(fields()[0], other.orderId)) {
+ this.orderId = data().deepCopy(fields()[0].schema(), other.orderId);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.itemId)) {
+ this.itemId = data().deepCopy(fields()[1].schema(), other.itemId);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.userId)) {
+ this.userId = data().deepCopy(fields()[2].schema(), other.userId);
+ fieldSetFlags()[2] = true;
+ }
+ if (isValidValue(fields()[3], other.quantity)) {
+ this.quantity = data().deepCopy(fields()[3].schema(), other.quantity);
+ fieldSetFlags()[3] = true;
+ }
+ }
+
+ /**
+ * Gets the value of the 'orderId' field.
+ * Unique id of the order.
+ * @return The value.
+ */
+ public java.lang.Integer getOrderId() {
+ return orderId;
+ }
+
+ /**
+ * Sets the value of the 'orderId' field.
+ * Unique id of the order.
+ * @param value The value of 'orderId'.
+ * @return This builder.
+ */
+ public com.acme.example.eventhubs.models.Order.Builder setOrderId(int value) {
+ validate(fields()[0], value);
+ this.orderId = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /**
+ * Checks whether the 'orderId' field has been set.
+ * Unique id of the order.
+ * @return True if the 'orderId' field has been set, false otherwise.
+ */
+ public boolean hasOrderId() {
+ return fieldSetFlags()[0];
+ }
+
+
+ /**
+ * Clears the value of the 'orderId' field.
+ * Unique id of the order.
+ * @return This builder.
+ */
+ public com.acme.example.eventhubs.models.Order.Builder clearOrderId() {
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /**
+ * Gets the value of the 'itemId' field.
+ * Id of the ordered item.
+ * @return The value.
+ */
+ public java.lang.String getItemId() {
+ return itemId;
+ }
+
+ /**
+ * Sets the value of the 'itemId' field.
+ * Id of the ordered item.
+ * @param value The value of 'itemId'.
+ * @return This builder.
+ */
+ public com.acme.example.eventhubs.models.Order.Builder setItemId(java.lang.String value) {
+ validate(fields()[1], value);
+ this.itemId = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /**
+ * Checks whether the 'itemId' field has been set.
+ * Id of the ordered item.
+ * @return True if the 'itemId' field has been set, false otherwise.
+ */
+ public boolean hasItemId() {
+ return fieldSetFlags()[1];
+ }
+
+
+ /**
+ * Clears the value of the 'itemId' field.
+ * Id of the ordered item.
+ * @return This builder.
+ */
+ public com.acme.example.eventhubs.models.Order.Builder clearItemId() {
+ itemId = null;
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ /**
+ * Gets the value of the 'userId' field.
+ * Id of the user who ordered the item.
+ * @return The value.
+ */
+ public java.lang.String getUserId() {
+ return userId;
+ }
+
+ /**
+ * Sets the value of the 'userId' field.
+ * Id of the user who ordered the item.
+ * @param value The value of 'userId'.
+ * @return This builder.
+ */
+ public com.acme.example.eventhubs.models.Order.Builder setUserId(java.lang.String value) {
+ validate(fields()[2], value);
+ this.userId = value;
+ fieldSetFlags()[2] = true;
+ return this;
+ }
+
+ /**
+ * Checks whether the 'userId' field has been set.
+ * Id of the user who ordered the item.
+ * @return True if the 'userId' field has been set, false otherwise.
+ */
+ public boolean hasUserId() {
+ return fieldSetFlags()[2];
+ }
+
+
+ /**
+ * Clears the value of the 'userId' field.
+ * Id of the user who ordered the item.
+ * @return This builder.
+ */
+ public com.acme.example.eventhubs.models.Order.Builder clearUserId() {
+ userId = null;
+ fieldSetFlags()[2] = false;
+ return this;
+ }
+
+ /**
+ * Gets the value of the 'quantity' field.
+ * How much was ordered.
+ * @return The value.
+ */
+ public java.lang.Double getQuantity() {
+ return quantity;
+ }
+
+ /**
+ * Sets the value of the 'quantity' field.
+ * How much was ordered.
+ * @param value The value of 'quantity'.
+ * @return This builder.
+ */
+ public com.acme.example.eventhubs.models.Order.Builder setQuantity(double value) {
+ validate(fields()[3], value);
+ this.quantity = value;
+ fieldSetFlags()[3] = true;
+ return this;
+ }
+
+ /**
+ * Checks whether the 'quantity' field has been set.
+ * How much was ordered.
+ * @return True if the 'quantity' field has been set, false otherwise.
+ */
+ public boolean hasQuantity() {
+ return fieldSetFlags()[3];
+ }
+
+
+ /**
+ * Clears the value of the 'quantity' field.
+ * How much was ordered.
+ * @return This builder.
+ */
+ public com.acme.example.eventhubs.models.Order.Builder clearQuantity() {
+ fieldSetFlags()[3] = false;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Order build() {
+ try {
+ Order record = new Order();
+ record.orderId = fieldSetFlags()[0] ? this.orderId : (java.lang.Integer) defaultValue(fields()[0]);
+ record.itemId = fieldSetFlags()[1] ? this.itemId : (java.lang.String) defaultValue(fields()[1]);
+ record.userId = fieldSetFlags()[2] ? this.userId : (java.lang.String) defaultValue(fields()[2]);
+ record.quantity = fieldSetFlags()[3] ? this.quantity : (java.lang.Double) defaultValue(fields()[3]);
+ return record;
+ } catch (java.lang.Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final org.apache.avro.io.DatumWriter<Order>
+ WRITER$ = (org.apache.avro.io.DatumWriter<Order>)MODEL$.createDatumWriter(SCHEMA$);
+
+ @Override public void writeExternal(java.io.ObjectOutput out)
+ throws java.io.IOException {
+ WRITER$.write(this, SpecificData.getEncoder(out));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final org.apache.avro.io.DatumReader<Order>
+ READER$ = (org.apache.avro.io.DatumReader<Order>)MODEL$.createDatumReader(SCHEMA$);
+
+ @Override public void readExternal(java.io.ObjectInput in)
+ throws java.io.IOException {
+ READER$.read(this, SpecificData.getDecoder(in));
+ }
+
+}
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/kafka-log.yaml b/jbang/azure-eventhubs-kafka-azure-schema-registry/kafka-log.yaml
new file mode 100644
index 0000000..ff76d38
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/kafka-log.yaml
@@ -0,0 +1,36 @@
+# camel-k: dependency=mvn:org.apache.camel.kamelets:camel-kamelets-utils:3.20.1.1
+# camel-k: dependency=mvn:com.acme.example:azure-identity:0.1
+# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1
+# camel-k: dependency=mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.4
+# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0
+
+- beans:
+ - name: defaultAzureCredential
+ type: "#class:com.acme.example.azure.DefaultAzureCredentialWrapper"
+ - name: order
+ type: "#class:com.acme.example.eventhubs.models.Order"
+
+- route:
+ id: "kafka-to-log"
+ from:
+ uri: "kafka:{{topic}}"
+ parameters:
+ autoOffsetReset: earliest
+ brokers: "{{bootstrap.servers}}"
+ saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password={{connectionstring}};'
+ saslMechanism: PLAIN
+ securityProtocol: SASL_SSL
+ valueDeserializer: 'com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer'
+ groupId: 'my-consumer-group'
+ additionalProperties.schema.registry.url: '{{schema.registry.url}}'
+ additionalProperties.schema.group: avro
+ additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'
+ additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):com.acme.example.eventhubs.models.Order'
+ additionalProperties.specific.avro.reader: '#valueAs(boolean):true'
+ steps:
+ - to:
+ uri: "kamelet:log-sink"
+ parameters:
+ showStreams: true
+ showHeaders: true
+ multiline: true
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf b/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf
new file mode 100644
index 0000000..d0da4a2
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+# We strongly recommend using the required_providers block to set the
+# Azure Provider source and version being used
+terraform {
+ required_providers {
+ azurerm = {
+ source = "hashicorp/azurerm"
+ version = "=3.75.0"
+ }
+ }
+}
+
+# Configure the Microsoft Azure Provider
+provider "azurerm" {
+ features {}
+}
+
+# Resource group.
+resource "azurerm_resource_group" "example" {
+ name = "example-rg"
+ location = "West Europe"
+}
+
+# Eventhubs Namepsace.
+resource "azurerm_eventhub_namespace" "example" {
+ name = "example-namespace"
+ location = azurerm_resource_group.example.location
+ resource_group_name = azurerm_resource_group.example.name
+ sku = "Standard"
+ capacity = 1
+}
+
+# Eventhub (topic)
+resource "azurerm_eventhub" "example" {
+ name = "my-topic"
+ namespace_name = azurerm_eventhub_namespace.example.name
+ resource_group_name = azurerm_resource_group.example.name
+ partition_count = 1
+ message_retention = 1
+}
+
+# Read-Write policy to create a connection string.
+resource "azurerm_eventhub_authorization_rule" "example" {
+ name = "rw_policy"
+ namespace_name = azurerm_eventhub_namespace.example.name
+ eventhub_name = azurerm_eventhub.example.name
+ resource_group_name = azurerm_resource_group.example.name
+ listen = true
+ send = true
+ manage = false
+}
+
+# Schema group to utilize Azure Schema Registry.
+resource "azurerm_eventhub_namespace_schema_group" "example" {
+ name = "avro"
+ namespace_id = azurerm_eventhub_namespace.example.id
+ schema_compatibility = "Forward"
+ schema_type = "Avro"
+}
\ No newline at end of file