You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/10/02 09:10:36 UTC

[camel-kamelets-examples] branch main updated: Added example of route from kafka to log (#30)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new 84d9715  Added example of route from kafka to log (#30)
84d9715 is described below

commit 84d9715ec1fa8df1ba2796c475d4bacf7af14cd8
Author: Vedran Kolka <ve...@syntio.net>
AuthorDate: Mon Oct 2 11:10:30 2023 +0200

    Added example of route from kafka to log (#30)
    
    * added working example of route reading from kafka and logging
    
    * deleted commented code
    
    * renamed exmaple maven project to com.acme.example
    
    * added needed infrastructure
    
    * fixed and cleaned tf script
    
    * renamed properties in properties file
    
    * renamed properties
    
    * wrote instructions
    
    * added instructions for running with Spring Boot
---
 .../README.md                                      |  44 ++
 .../application.properties.template                |   4 +
 .../azure-identity/pom.xml                         |  97 +++++
 .../azure-identity/src/main/avro/order.avsc        |  28 ++
 .../azure/DefaultAzureCredentialWrapper.java       |  21 +
 .../java/com/acme/example/eventhubs/Produce.java   |  65 +++
 .../com/acme/example/eventhubs/models/Order.java   | 475 +++++++++++++++++++++
 .../kafka-log.yaml                                 |  36 ++
 .../main.tf                                        |  75 ++++
 9 files changed, 845 insertions(+)

diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/README.md b/jbang/azure-eventhubs-kafka-azure-schema-registry/README.md
new file mode 100644
index 0000000..4b1f192
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/README.md
@@ -0,0 +1,44 @@
+# Example for consuming from EventHubs in Avro format, using Azure Schema Registry
+
+This example shows a YAML DSL route for consuming Avro messages from Eventhubs using Azure Schema Registry.
+The exmaple also includes a producer for convenience, as well as a wrapper around [DefaultAzureCredentials](https://learn.microsoft.com/en-us/java/api/com.azure.identity.defaultazurecredential?view=azure-java-stable)
+to solve the instantiation problem, as the class uses a builder for instantiating.
+
+## Build the infrastructure
+
+Choose a globally unique name for the eventhubs namespace and edit it in the terraform [script](main.tf).
+Then, create the services using the script.
+
+## Configure the applications
+
+Use [application.properties.template](application.properties.template) to create `application.properties` and define YOur eventhubs namespace in there.
+After the services have been created, the connection string for the eventhub can be found on the Azure Console,
+or by running:
+```bash
+az eventhubs eventhub authorization-rule keys list --resource-group "example-rg" --namespace-name "example-namespace" --eventhub-name "my-topic" --name "rw_policy"
+```
+Set the `primaryConnectionString` as the `connectionstring` in `application.properties`.
+
+## Produce to Eventhubs.
+
+Run [`Produce.java`](./azure-identity/src/main/java/com/acme/example/eventhubs/Produce.java) to produce a message to the Eventhub.
+
+## Consume from Eventhubs.
+
+To consume messages using a Camel route, first install the azure identity maven project:
+```bash
+cd azure-identity
+mvn clean install
+```
+then run:
+```bash
+camel run kafka-log.yaml camel-kafka-3.22.0.jar --properties application.properties
+```
+> At the time of writing, there was a [problem](https://github.com/apache/camel-kamelets-examples/issues/21#issuecomment-1732603257) running this example with camel-kafka-4.0.0 so the example was developed using version 3.22.0, which can be found [here](https://repository.apache.org/content/groups/snapshots/org/apache/camel/camel-kafka/3.22.0-SNAPSHOT/)
+
+To run the example without JBang, but e.g. Spring Boot, You can export the route:
+```bash
+camel export application.properties kafka-log.yaml --runtime=spring-boot --directory=code --gav com.acme.example:azure-sr:1.0.0
+```
+And run as a regular Spring Boot application.
+> Using this approach, camel-kafka-4.0.0 works, and no dependencies need to be modified.
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/application.properties.template b/jbang/azure-eventhubs-kafka-azure-schema-registry/application.properties.template
new file mode 100644
index 0000000..31a6623
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/application.properties.template
@@ -0,0 +1,4 @@
+bootstrap.servers=<EVENTHUBS_NAMESPACE>.servicebus.windows.net:9093
+schema.registry.url=https://<EVENTHUBS_NAMESPACE>.servicebus.windows.net
+topic=my-topic
+connectionstring="<ACTUAL_CONNECTION_STRING>"
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/pom.xml b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/pom.xml
new file mode 100644
index 0000000..dee82f6
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/pom.xml
@@ -0,0 +1,97 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>com.acme.example</groupId>
+  <artifactId>azure-identity</artifactId>
+  <packaging>jar</packaging>
+  <version>0.1</version>
+  <name>azure-identity</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+
+    <dependency>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-data-schemaregistry-apacheavro</artifactId>
+      <version>1.1.4</version>
+    </dependency>
+    <!-- https://mvnrepository.com/artifact/com.microsoft.azure/azure-schemaregistry-kafka-avro -->
+    <dependency>
+      <groupId>com.microsoft.azure</groupId>
+      <artifactId>azure-schemaregistry-kafka-avro</artifactId>
+      <version>1.1.1</version>
+    </dependency>
+    <!-- https://mvnrepository.com/artifact/com.azure/azure-identity -->
+    <dependency>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-identity</artifactId>
+      <version>1.9.0</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>3.4.0</version>
+    </dependency>
+    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.11.2</version>
+    </dependency>
+
+    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>2.0.7</version>
+    </dependency>
+    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>2.0.7</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>3.8.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.11.0</version>
+        <configuration>
+          <source>17</source>
+          <target>17</target>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>1.8.2</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <sourceDirectory>src/main/avro</sourceDirectory>
+              <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
+              <stringType>String</stringType>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/avro/order.avsc b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/avro/order.avsc
new file mode 100644
index 0000000..69d0008
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/avro/order.avsc
@@ -0,0 +1,28 @@
+{
+  "doc": "Fact schema of an order",
+  "fields": [
+    {
+      "doc": "Unique id of the order.",
+      "name": "orderId",
+      "type": "int"
+    },
+    {
+      "doc": "Id of the ordered item.",
+      "name": "itemId",
+      "type": "string"
+    },
+    {
+      "doc": "Id of the user who ordered the item.",
+      "name": "userId",
+      "type": "string"
+    },
+    {
+      "doc": "How much was ordered.",
+      "name": "quantity",
+      "type": "double"
+    }
+  ],
+  "name": "Order",
+  "namespace": "com.acme.example.eventhubs.models",
+  "type": "record"
+}
\ No newline at end of file
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/azure/DefaultAzureCredentialWrapper.java b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/azure/DefaultAzureCredentialWrapper.java
new file mode 100644
index 0000000..0c8c91c
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/azure/DefaultAzureCredentialWrapper.java
@@ -0,0 +1,21 @@
+package com.acme.example.azure;
+
+import com.azure.core.credential.AccessToken;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.credential.TokenRequestContext;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import reactor.core.publisher.Mono;
+
+public class DefaultAzureCredentialWrapper implements TokenCredential {
+
+    private final TokenCredential credential;
+
+    public DefaultAzureCredentialWrapper() {
+        this.credential = new DefaultAzureCredentialBuilder().build();
+    }
+
+    @Override
+    public Mono<AccessToken> getToken(TokenRequestContext tokenRequestContext) {
+        return this.credential.getToken(tokenRequestContext);
+    }
+}
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/Produce.java b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/Produce.java
new file mode 100644
index 0000000..e50d034
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/Produce.java
@@ -0,0 +1,65 @@
+package com.acme.example.eventhubs;
+
+import com.acme.example.azure.DefaultAzureCredentialWrapper;
+import com.acme.example.eventhubs.models.Order;
+import com.azure.core.credential.TokenCredential;
+import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer;
+import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializerConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class Produce {
+
+    public static final String DEFAULT_PROPERTIES_PATH = "../application.properties";
+
+    public static void main(String[] args) throws IOException {
+        String propertiesPath = DEFAULT_PROPERTIES_PATH;
+        if (args.length >= 1) {
+            propertiesPath = args[0];
+        }
+
+        Properties properties = new Properties();
+        properties.load(Files.newInputStream(Paths.get(propertiesPath)));
+
+        TokenCredential credential = new DefaultAzureCredentialWrapper();
+        String password = properties.getProperty("connectionstring");
+        String saslJaas = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+                "username=\"$ConnectionString\"" +
+                "password=" + password + ";";
+
+        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+        properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        properties.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaas);
+
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
+
+        properties.put(KafkaAvroSerializerConfig.SCHEMA_GROUP_CONFIG, "avro");
+        properties.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS_CONFIG, true);
+        properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential);
+
+        try (KafkaProducer<String, Order> orderProducer = new KafkaProducer<>(properties)) {
+            Order order = new Order(1, "item", "user", 3.0);
+            String topic = properties.getProperty("topic");
+            ProducerRecord<String, Order> record = new ProducerRecord<>(topic, "key", order);
+            RecordMetadata result = orderProducer.send(record).get(5, TimeUnit.SECONDS);
+            System.out.println("Sent record with offset " + result.offset());
+        } catch (ExecutionException | InterruptedException | TimeoutException e) {
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+}
\ No newline at end of file
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/models/Order.java b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/models/Order.java
new file mode 100644
index 0000000..7dd055a
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/models/Order.java
@@ -0,0 +1,475 @@
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package com.acme.example.eventhubs.models;
+
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.SchemaStore;
+
+@SuppressWarnings("all")
+/** Fact schema of an order */
+@org.apache.avro.specific.AvroGenerated
+public class Order extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  private static final long serialVersionUID = -6339119225573548571L;
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.acme.example.eventhubs.models\",\"doc\":\"Fact schema of an order\",\"fields\":[{\"name\":\"orderId\",\"type\":\"int\",\"doc\":\"Unique id of the order.\"},{\"name\":\"itemId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Id of the ordered item.\"},{\"name\":\"userId\",\"type\":{\"type\":\"string\",\"avro [...]
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+  private static SpecificData MODEL$ = new SpecificData();
+
+  private static final BinaryMessageEncoder<Order> ENCODER =
+      new BinaryMessageEncoder<Order>(MODEL$, SCHEMA$);
+
+  private static final BinaryMessageDecoder<Order> DECODER =
+      new BinaryMessageDecoder<Order>(MODEL$, SCHEMA$);
+
+  /**
+   * Return the BinaryMessageDecoder instance used by this class.
+   */
+  public static BinaryMessageDecoder<Order> getDecoder() {
+    return DECODER;
+  }
+
+  /**
+   * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
+   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+   */
+  public static BinaryMessageDecoder<Order> createDecoder(SchemaStore resolver) {
+    return new BinaryMessageDecoder<Order>(MODEL$, SCHEMA$, resolver);
+  }
+
+  /** Serializes this Order to a ByteBuffer. */
+  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+    return ENCODER.encode(this);
+  }
+
+  /** Deserializes a Order from a ByteBuffer. */
+  public static Order fromByteBuffer(
+      java.nio.ByteBuffer b) throws java.io.IOException {
+    return DECODER.decode(b);
+  }
+
+  /** Unique id of the order. */
+  @Deprecated public int orderId;
+  /** Id of the ordered item. */
+  @Deprecated public java.lang.String itemId;
+  /** Id of the user who ordered the item. */
+  @Deprecated public java.lang.String userId;
+  /** How much was ordered. */
+  @Deprecated public double quantity;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use <code>newBuilder()</code>.
+   */
+  public Order() {}
+
+  /**
+   * All-args constructor.
+   * @param orderId Unique id of the order.
+   * @param itemId Id of the ordered item.
+   * @param userId Id of the user who ordered the item.
+   * @param quantity How much was ordered.
+   */
+  public Order(java.lang.Integer orderId, java.lang.String itemId, java.lang.String userId, java.lang.Double quantity) {
+    this.orderId = orderId;
+    this.itemId = itemId;
+    this.userId = userId;
+    this.quantity = quantity;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return orderId;
+    case 1: return itemId;
+    case 2: return userId;
+    case 3: return quantity;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: orderId = (java.lang.Integer)value$; break;
+    case 1: itemId = (java.lang.String)value$; break;
+    case 2: userId = (java.lang.String)value$; break;
+    case 3: quantity = (java.lang.Double)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'orderId' field.
+   * @return Unique id of the order.
+   */
+  public java.lang.Integer getOrderId() {
+    return orderId;
+  }
+
+  /**
+   * Sets the value of the 'orderId' field.
+   * Unique id of the order.
+   * @param value the value to set.
+   */
+  public void setOrderId(java.lang.Integer value) {
+    this.orderId = value;
+  }
+
+  /**
+   * Gets the value of the 'itemId' field.
+   * @return Id of the ordered item.
+   */
+  public java.lang.String getItemId() {
+    return itemId;
+  }
+
+  /**
+   * Sets the value of the 'itemId' field.
+   * Id of the ordered item.
+   * @param value the value to set.
+   */
+  public void setItemId(java.lang.String value) {
+    this.itemId = value;
+  }
+
+  /**
+   * Gets the value of the 'userId' field.
+   * @return Id of the user who ordered the item.
+   */
+  public java.lang.String getUserId() {
+    return userId;
+  }
+
+  /**
+   * Sets the value of the 'userId' field.
+   * Id of the user who ordered the item.
+   * @param value the value to set.
+   */
+  public void setUserId(java.lang.String value) {
+    this.userId = value;
+  }
+
+  /**
+   * Gets the value of the 'quantity' field.
+   * @return How much was ordered.
+   */
+  public java.lang.Double getQuantity() {
+    return quantity;
+  }
+
+  /**
+   * Sets the value of the 'quantity' field.
+   * How much was ordered.
+   * @param value the value to set.
+   */
+  public void setQuantity(java.lang.Double value) {
+    this.quantity = value;
+  }
+
+  /**
+   * Creates a new Order RecordBuilder.
+   * @return A new Order RecordBuilder
+   */
+  public static com.acme.example.eventhubs.models.Order.Builder newBuilder() {
+    return new com.acme.example.eventhubs.models.Order.Builder();
+  }
+
+  /**
+   * Creates a new Order RecordBuilder by copying an existing Builder.
+   * @param other The existing builder to copy.
+   * @return A new Order RecordBuilder
+   */
+  public static com.acme.example.eventhubs.models.Order.Builder newBuilder(com.acme.example.eventhubs.models.Order.Builder other) {
+    return new com.acme.example.eventhubs.models.Order.Builder(other);
+  }
+
+  /**
+   * Creates a new Order RecordBuilder by copying an existing Order instance.
+   * @param other The existing instance to copy.
+   * @return A new Order RecordBuilder
+   */
+  public static com.acme.example.eventhubs.models.Order.Builder newBuilder(com.acme.example.eventhubs.models.Order other) {
+    return new com.acme.example.eventhubs.models.Order.Builder(other);
+  }
+
+  /**
+   * RecordBuilder for Order instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Order>
+    implements org.apache.avro.data.RecordBuilder<Order> {
+
+    /** Unique id of the order. */
+    private int orderId;
+    /** Id of the ordered item. */
+    private java.lang.String itemId;
+    /** Id of the user who ordered the item. */
+    private java.lang.String userId;
+    /** How much was ordered. */
+    private double quantity;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(SCHEMA$);
+    }
+
+    /**
+     * Creates a Builder by copying an existing Builder.
+     * @param other The existing Builder to copy.
+     */
+    private Builder(com.acme.example.eventhubs.models.Order.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.orderId)) {
+        this.orderId = data().deepCopy(fields()[0].schema(), other.orderId);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.itemId)) {
+        this.itemId = data().deepCopy(fields()[1].schema(), other.itemId);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.userId)) {
+        this.userId = data().deepCopy(fields()[2].schema(), other.userId);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.quantity)) {
+        this.quantity = data().deepCopy(fields()[3].schema(), other.quantity);
+        fieldSetFlags()[3] = true;
+      }
+    }
+
+    /**
+     * Creates a Builder by copying an existing Order instance
+     * @param other The existing instance to copy.
+     */
+    private Builder(com.acme.example.eventhubs.models.Order other) {
+            super(SCHEMA$);
+      if (isValidValue(fields()[0], other.orderId)) {
+        this.orderId = data().deepCopy(fields()[0].schema(), other.orderId);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.itemId)) {
+        this.itemId = data().deepCopy(fields()[1].schema(), other.itemId);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.userId)) {
+        this.userId = data().deepCopy(fields()[2].schema(), other.userId);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.quantity)) {
+        this.quantity = data().deepCopy(fields()[3].schema(), other.quantity);
+        fieldSetFlags()[3] = true;
+      }
+    }
+
+    /**
+      * Gets the value of the 'orderId' field.
+      * Unique id of the order.
+      * @return The value.
+      */
+    public java.lang.Integer getOrderId() {
+      return orderId;
+    }
+
+    /**
+      * Sets the value of the 'orderId' field.
+      * Unique id of the order.
+      * @param value The value of 'orderId'.
+      * @return This builder.
+      */
+    public com.acme.example.eventhubs.models.Order.Builder setOrderId(int value) {
+      validate(fields()[0], value);
+      this.orderId = value;
+      fieldSetFlags()[0] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'orderId' field has been set.
+      * Unique id of the order.
+      * @return True if the 'orderId' field has been set, false otherwise.
+      */
+    public boolean hasOrderId() {
+      return fieldSetFlags()[0];
+    }
+
+
+    /**
+      * Clears the value of the 'orderId' field.
+      * Unique id of the order.
+      * @return This builder.
+      */
+    public com.acme.example.eventhubs.models.Order.Builder clearOrderId() {
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'itemId' field.
+      * Id of the ordered item.
+      * @return The value.
+      */
+    public java.lang.String getItemId() {
+      return itemId;
+    }
+
+    /**
+      * Sets the value of the 'itemId' field.
+      * Id of the ordered item.
+      * @param value The value of 'itemId'.
+      * @return This builder.
+      */
+    public com.acme.example.eventhubs.models.Order.Builder setItemId(java.lang.String value) {
+      validate(fields()[1], value);
+      this.itemId = value;
+      fieldSetFlags()[1] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'itemId' field has been set.
+      * Id of the ordered item.
+      * @return True if the 'itemId' field has been set, false otherwise.
+      */
+    public boolean hasItemId() {
+      return fieldSetFlags()[1];
+    }
+
+
+    /**
+      * Clears the value of the 'itemId' field.
+      * Id of the ordered item.
+      * @return This builder.
+      */
+    public com.acme.example.eventhubs.models.Order.Builder clearItemId() {
+      itemId = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'userId' field.
+      * Id of the user who ordered the item.
+      * @return The value.
+      */
+    public java.lang.String getUserId() {
+      return userId;
+    }
+
+    /**
+      * Sets the value of the 'userId' field.
+      * Id of the user who ordered the item.
+      * @param value The value of 'userId'.
+      * @return This builder.
+      */
+    public com.acme.example.eventhubs.models.Order.Builder setUserId(java.lang.String value) {
+      validate(fields()[2], value);
+      this.userId = value;
+      fieldSetFlags()[2] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'userId' field has been set.
+      * Id of the user who ordered the item.
+      * @return True if the 'userId' field has been set, false otherwise.
+      */
+    public boolean hasUserId() {
+      return fieldSetFlags()[2];
+    }
+
+
+    /**
+      * Clears the value of the 'userId' field.
+      * Id of the user who ordered the item.
+      * @return This builder.
+      */
+    public com.acme.example.eventhubs.models.Order.Builder clearUserId() {
+      userId = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'quantity' field.
+      * How much was ordered.
+      * @return The value.
+      */
+    public java.lang.Double getQuantity() {
+      return quantity;
+    }
+
+    /**
+      * Sets the value of the 'quantity' field.
+      * How much was ordered.
+      * @param value The value of 'quantity'.
+      * @return This builder.
+      */
+    public com.acme.example.eventhubs.models.Order.Builder setQuantity(double value) {
+      validate(fields()[3], value);
+      this.quantity = value;
+      fieldSetFlags()[3] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'quantity' field has been set.
+      * How much was ordered.
+      * @return True if the 'quantity' field has been set, false otherwise.
+      */
+    public boolean hasQuantity() {
+      return fieldSetFlags()[3];
+    }
+
+
+    /**
+      * Clears the value of the 'quantity' field.
+      * How much was ordered.
+      * @return This builder.
+      */
+    public com.acme.example.eventhubs.models.Order.Builder clearQuantity() {
+      fieldSetFlags()[3] = false;
+      return this;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Order build() {
+      try {
+        Order record = new Order();
+        record.orderId = fieldSetFlags()[0] ? this.orderId : (java.lang.Integer) defaultValue(fields()[0]);
+        record.itemId = fieldSetFlags()[1] ? this.itemId : (java.lang.String) defaultValue(fields()[1]);
+        record.userId = fieldSetFlags()[2] ? this.userId : (java.lang.String) defaultValue(fields()[2]);
+        record.quantity = fieldSetFlags()[3] ? this.quantity : (java.lang.Double) defaultValue(fields()[3]);
+        return record;
+      } catch (java.lang.Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final org.apache.avro.io.DatumWriter<Order>
+    WRITER$ = (org.apache.avro.io.DatumWriter<Order>)MODEL$.createDatumWriter(SCHEMA$);
+
+  @Override public void writeExternal(java.io.ObjectOutput out)
+    throws java.io.IOException {
+    WRITER$.write(this, SpecificData.getEncoder(out));
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final org.apache.avro.io.DatumReader<Order>
+    READER$ = (org.apache.avro.io.DatumReader<Order>)MODEL$.createDatumReader(SCHEMA$);
+
+  @Override public void readExternal(java.io.ObjectInput in)
+    throws java.io.IOException {
+    READER$.read(this, SpecificData.getDecoder(in));
+  }
+
+}
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/kafka-log.yaml b/jbang/azure-eventhubs-kafka-azure-schema-registry/kafka-log.yaml
new file mode 100644
index 0000000..ff76d38
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/kafka-log.yaml
@@ -0,0 +1,36 @@
+# camel-k: dependency=mvn:org.apache.camel.kamelets:camel-kamelets-utils:3.20.1.1
+# camel-k: dependency=mvn:com.acme.example:azure-identity:0.1
+# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1
+# camel-k: dependency=mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.4
+# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0
+
+- beans:
+  - name: defaultAzureCredential
+    type: "#class:com.acme.example.azure.DefaultAzureCredentialWrapper"
+  - name: order
+    type: "#class:com.acme.example.eventhubs.models.Order"
+
+- route:
+    id: "kafka-to-log"
+    from:
+      uri: "kafka:{{topic}}"
+      parameters:
+        autoOffsetReset: earliest
+        brokers: "{{bootstrap.servers}}"
+        saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password={{connectionstring}};'
+        saslMechanism: PLAIN
+        securityProtocol: SASL_SSL
+        valueDeserializer: 'com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer'
+        groupId: 'my-consumer-group'
+        additionalProperties.schema.registry.url: '{{schema.registry.url}}'
+        additionalProperties.schema.group: avro
+        additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'
+        additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):com.acme.example.eventhubs.models.Order'
+        additionalProperties.specific.avro.reader: '#valueAs(boolean):true'
+      steps:
+        - to:
+            uri: "kamelet:log-sink"
+            parameters:
+              showStreams: true
+              showHeaders: true
+              multiline: true
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf b/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf
new file mode 100644
index 0000000..d0da4a2
--- /dev/null
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+# We strongly recommend using the required_providers block to set the
+# Azure Provider source and version being used
+terraform {
+  required_providers {
+    azurerm = {
+      source  = "hashicorp/azurerm"
+      version = "=3.75.0"
+    }
+  }
+}
+
+# Configure the Microsoft Azure Provider
+provider "azurerm" {
+  features {}
+}
+
+# Resource group.
+resource "azurerm_resource_group" "example" {
+  name     = "example-rg"
+  location = "West Europe"
+}
+
+# Eventhubs Namepsace.
+resource "azurerm_eventhub_namespace" "example" {
+  name                = "example-namespace"
+  location            = azurerm_resource_group.example.location
+  resource_group_name = azurerm_resource_group.example.name
+  sku                 = "Standard"
+  capacity            = 1
+}
+
+# Eventhub (topic)
+resource "azurerm_eventhub" "example" {
+  name                = "my-topic"
+  namespace_name      = azurerm_eventhub_namespace.example.name
+  resource_group_name = azurerm_resource_group.example.name
+  partition_count     = 1
+  message_retention   = 1
+}
+
+# Read-Write policy to create a connection string.
+resource "azurerm_eventhub_authorization_rule" "example" {
+  name                = "rw_policy"
+  namespace_name      = azurerm_eventhub_namespace.example.name
+  eventhub_name       = azurerm_eventhub.example.name
+  resource_group_name = azurerm_resource_group.example.name
+  listen              = true
+  send                = true
+  manage              = false
+}
+
+# Schema group to utilize Azure Schema Registry.
+resource "azurerm_eventhub_namespace_schema_group" "example" {
+  name                 = "avro"
+  namespace_id         = azurerm_eventhub_namespace.example.id
+  schema_compatibility = "Forward"
+  schema_type          = "Avro"
+}
\ No newline at end of file