You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/10/10 11:50:02 UTC

[camel-kamelets-examples] branch main updated (32beaca -> 44ea3b9)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git


    from 32beaca  Add camel-k bind example
     new 5b0c70a  Added an example of Apicurio Registry Usage with Kafka through YAML
     new 44ea3b9  Added an example of Apicurio Registry Usage with Kafka through YAML - Externalize Schema Registry URL

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 jbang/kafka-apicurio-schema-registry/README.md     |  81 ++++++
 .../application.properties                         |   3 +
 .../kafka-log.yaml}                                |  22 +-
 .../kafka-producer}/pom.xml                        |  31 +--
 .../kafka-producer}/src/main/avro/order.avsc       |   9 +-
 .../main/java/com/acme/example/kafka}/Produce.java |  35 +--
 .../java/com/acme/example/kafka}/models/Order.java | 292 +++++++++++++++++----
 7 files changed, 372 insertions(+), 101 deletions(-)
 create mode 100644 jbang/kafka-apicurio-schema-registry/README.md
 create mode 100644 jbang/kafka-apicurio-schema-registry/application.properties
 copy jbang/{slack-source/slack-source.yaml => kafka-apicurio-schema-registry/kafka-log.yaml} (60%)
 copy jbang/{azure-eventhubs-kafka-azure-schema-registry/azure-identity => kafka-apicurio-schema-registry/kafka-producer}/pom.xml (66%)
 copy jbang/{azure-eventhubs-kafka-azure-schema-registry/azure-identity => kafka-apicurio-schema-registry/kafka-producer}/src/main/avro/order.avsc (76%)
 copy jbang/{azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs => kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka}/Produce.java (64%)
 copy jbang/{azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs => kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka}/models/Order.java (58%)


[camel-kamelets-examples] 01/02: Added an example of Apicurio Registry Usage with Kafka through YAML

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit 5b0c70a46c280a17d8b775a0d35e5e3a00b6a8f2
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Oct 10 13:46:33 2023 +0200

    Added an example of Apicurio Registry Usage with Kafka through YAML
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 jbang/kafka-apicurio-schema-registry/README.md     |  81 +++
 .../application.properties                         |   2 +
 .../kafka-apicurio-schema-registry/kafka-log.yaml  |  42 ++
 .../kafka-producer/pom.xml                         |  78 +++
 .../kafka-producer/src/main/avro/order.avsc        |  33 +
 .../main/java/com/acme/example/kafka/Produce.java  |  70 +++
 .../java/com/acme/example/kafka/models/Order.java  | 673 +++++++++++++++++++++
 7 files changed, 979 insertions(+)

diff --git a/jbang/kafka-apicurio-schema-registry/README.md b/jbang/kafka-apicurio-schema-registry/README.md
new file mode 100644
index 0000000..5e60ee0
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/README.md
@@ -0,0 +1,81 @@
+# Example for consuming from Kafka with the usage of Apicurio Schema Registry and Avro
+
+You'll need a running Kafka instance and an Apicurio Registry
+
+## Kafka instance
+
+You could use a plain Kafka archive or use an Ansible role
+
+## Apicurio Registry
+
+```bash
+docker run -it -p 8080:8080 apicurio/apicurio-registry-mem:2.4.12.Final
+```
+
+This will run an Apicurio Registry instance with in-memory persistence.
+
+## Configure the applications
+
+In `application.properties` set the Kafka instance address and the Apicurio schema registry URL.
+
+## Produce to Kafka.
+
+Run [`Produce.java`](./kafka-producer/src/main/java/com/acme/example/kafka/Produce.java) to produce a message to the Kafka.
+
+```bash
+mvn compile exec:java -Dexec.mainClass="com.acme.example.kafka.Produce"
+```
+
+## Consume from Kafka.
+
+To consume messages using a Camel route, first install the azure identity maven project:
+```bash
+cd kafka-producer
+mvn clean install
+```
+then run:
+```bash
+camel run kafka-log.yaml 
+```
+
+You should see something like
+
+```bash
+2023-10-10 13:44:09.810  INFO 71770 --- [           main] el.impl.engine.AbstractCamelContext : Routes startup (started:2)
+2023-10-10 13:44:09.810  INFO 71770 --- [           main] el.impl.engine.AbstractCamelContext :     Started kafka-to-log (kafka://my-topic)
+2023-10-10 13:44:09.810  INFO 71770 --- [           main] el.impl.engine.AbstractCamelContext :     Started log-sink-1 (kamelet://source)
+2023-10-10 13:44:09.810  INFO 71770 --- [           main] el.impl.engine.AbstractCamelContext : Apache Camel 4.0.1 (kafka-log) started in 187ms (build:0ms init:0ms start:187ms)
+2023-10-10 13:44:10.018  WARN 71770 --- [sumer[my-topic]] fka.clients.consumer.ConsumerConfig : These configurations '[apicurio.registry.avroDatumProvider, apicurio.registry.url]' were supplied but are not used yet.
+2023-10-10 13:44:10.019  INFO 71770 --- [sumer[my-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.4.0
+2023-10-10 13:44:10.019  INFO 71770 --- [sumer[my-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 2e1947d240607d53
+2023-10-10 13:44:10.019  INFO 71770 --- [sumer[my-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1696938250018
+2023-10-10 13:44:10.023  INFO 71770 --- [sumer[my-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
+2023-10-10 13:44:10.023  INFO 71770 --- [sumer[my-topic]] l.component.kafka.KafkaFetchRecords : Subscribing my-topic-Thread 0 to topic my-topic
+2023-10-10 13:44:10.024  INFO 71770 --- [sumer[my-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Subscribed to topic(s): my-topic
+2023-10-10 13:44:10.254  INFO 71770 --- [sumer[my-topic]] org.apache.kafka.clients.Metadata   : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Resetting the last seen epoch of partition my-topic-0 to 0 since the associated topicId changed from null to PP5gKKwZTTOwYYvKftvhgA
+2023-10-10 13:44:10.256  INFO 71770 --- [sumer[my-topic]] org.apache.kafka.clients.Metadata   : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Cluster ID: LGe3ByI8SLSis9Sm9zcCVg
+2023-10-10 13:44:10.257  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
+2023-10-10 13:44:10.263  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] (Re-)joining group
+2023-10-10 13:44:10.276  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Request joining group due to: need to re-join with the given member-id: consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033
+2023-10-10 13:44:10.278  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
+2023-10-10 13:44:10.278  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] (Re-)joining group
+2023-10-10 13:44:10.283  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Successfully joined group with generation Generation{generationId=19, memberId='consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033', protocol='range'}
+2023-10-10 13:44:10.285  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Finished assignment for group at generation 19: {consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033=Assignment(partitions=[my-topic-0])}
+2023-10-10 13:44:10.292  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Successfully synced group in generation Generation{generationId=19, memberId='consumer-my-consumer-group-1-88145d04-879c-4cd9-9f5a-53a2c6778033', protocol='range'}
+2023-10-10 13:44:10.294  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Notifying assignor about the new Assignment(partitions=[my-topic-0])
+2023-10-10 13:44:10.298  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Adding newly assigned partitions: my-topic-0
+2023-10-10 13:44:10.314  INFO 71770 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-consumer-group-1, groupId=my-consumer-group] Setting offset for partition my-topic-0 to the committed offset FetchPosition{offset=11, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}
+```
+
+and after a message has been produced to Kafka you should see
+
+```bash
+2023-10-10 13:44:10.519  INFO 71770 --- [sumer[my-topic]] log-sink                            : Exchange[
+  ExchangePattern: InOnly
+  Headers: {apicurio.value.encoding=[B@474baada, apicurio.value.globalId=[B@28e32105, CamelMessageTimestamp=1696938203819, kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = apicurio.value.globalId, value = [0, 0, 0, 0, 0, 0, 0, 3]), RecordHeader(key = apicurio.value.encoding, value = [66, 73, 78, 65, 82, 89])], isReadOnly = false), kafka.KEY=key, kafka.OFFSET=11, kafka.PARTITION=0, kafka.TIMESTAMP=1696938203819, kafka.TOPIC=my-topic}
+  BodyType: org.apache.avro.generic.GenericData.Record
+  Body: {"orderId": 1, "itemId": "item", "userId": "user", "quantity": 3.0, "description": "A really nice item I do love"}
+]
+```
+
+
diff --git a/jbang/kafka-apicurio-schema-registry/application.properties b/jbang/kafka-apicurio-schema-registry/application.properties
new file mode 100644
index 0000000..d491e63
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/application.properties
@@ -0,0 +1,2 @@
+bootstrap.servers=localhost:9092
+topic=my-topic
diff --git a/jbang/kafka-apicurio-schema-registry/kafka-log.yaml b/jbang/kafka-apicurio-schema-registry/kafka-log.yaml
new file mode 100644
index 0000000..9ca2187
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/kafka-log.yaml
@@ -0,0 +1,42 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# camel-k: dependency=mvn:com.acme.example:kafka-apicurio-producer:0.1
+# camel-k: dependency=mvn:io.apicurio:apicurio-registry-serdes-avro-serde:2.4.12.Final
+
+- beans:
+  - name: order
+    type: "#class:com.acme.example.kafka.models.Order"
+
+- route:
+    id: "kafka-to-log"
+    from:
+      uri: "kafka:{{topic}}"
+      parameters:
+        autoOffsetReset: earliest
+        brokers: "{{bootstrap.servers}}"
+        groupId: 'my-consumer-group'
+        valueDeserializer: 'io.apicurio.registry.serde.avro.AvroKafkaDeserializer'
+        additionalProperties.apicurio.registry.url: 'http://localhost:8080/apis/registry/v2'
+        additionalProperties.apicurio.registry.avro-datum-provider: 'io.apicurio.registry.serde.avro.ReflectAvroDatumProvider'
+      steps:
+        - to:
+            uri: "kamelet:log-sink"
+            parameters:
+              showStreams: true
+              showHeaders: true
+              multiline: true
diff --git a/jbang/kafka-apicurio-schema-registry/kafka-producer/pom.xml b/jbang/kafka-apicurio-schema-registry/kafka-producer/pom.xml
new file mode 100644
index 0000000..fb16f3d
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/kafka-producer/pom.xml
@@ -0,0 +1,78 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>com.acme.example</groupId>
+  <artifactId>kafka-apicurio-producer</artifactId>
+  <packaging>jar</packaging>
+  <version>0.1</version>
+  <name>kafka-apicurio-prod</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+
+    <dependency>
+      <groupId>io.apicurio</groupId>
+      <artifactId>apicurio-registry-serdes-avro-serde</artifactId>
+      <version>2.4.12.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>3.4.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.11.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>2.0.7</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>2.0.7</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>3.8.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.11.0</version>
+        <configuration>
+          <source>17</source>
+          <target>17</target>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>1.11.3</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <sourceDirectory>src/main/avro</sourceDirectory>
+              <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
+              <stringType>String</stringType>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/avro/order.avsc b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/avro/order.avsc
new file mode 100644
index 0000000..e41753e
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/avro/order.avsc
@@ -0,0 +1,33 @@
+{
+  "doc": "Fact schema of an order",
+  "fields": [
+    {
+      "doc": "Unique id of the order.",
+      "name": "orderId",
+      "type": "int"
+    },
+    {
+      "doc": "Id of the ordered item.",
+      "name": "itemId",
+      "type": "string"
+    },
+    {
+      "doc": "Id of the user who ordered the item.",
+      "name": "userId",
+      "type": "string"
+    },
+    {
+      "doc": "How much was ordered.",
+      "name": "quantity",
+      "type": "double"
+    },
+    {
+      "doc": "Description of item.",
+      "name": "description",
+      "type": "string"
+    }
+  ],
+  "name": "Order",
+  "namespace": "com.acme.example.kafka.models",
+  "type": "record"
+}
diff --git a/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java
new file mode 100644
index 0000000..856c903
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.acme.example.kafka;
+
+import com.acme.example.kafka.models.Order;
+import io.apicurio.registry.serde.SerdeConfig;
+import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
+import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import io.apicurio.registry.serde.avro.ReflectAvroDatumProvider;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class Produce {
+
+    private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2";
+    public static final String DEFAULT_PROPERTIES_PATH = "../application.properties";
+
+    public static void main(String[] args) throws IOException {
+        String propertiesPath = DEFAULT_PROPERTIES_PATH;
+        if (args.length >= 1) {
+            propertiesPath = args[0];
+        }
+
+        Properties properties = new Properties();
+        properties.load(Files.newInputStream(Paths.get(propertiesPath)));
+
+        properties.put(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
+        properties.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class);
+
+        properties.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
+
+        try (KafkaProducer<String, Order> orderProducer = new KafkaProducer<>(properties)) {
+            Order order = new Order(1, "item", "user", 3.0, "A really nice item I do love");
+            String topic = properties.getProperty("topic");
+            ProducerRecord<String, Order> record = new ProducerRecord<>(topic, "key", order);
+            RecordMetadata result = orderProducer.send(record).get(5, TimeUnit.SECONDS);
+            System.out.println("Sent record with offset " + result.offset());
+        } catch (ExecutionException | InterruptedException | TimeoutException e) {
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+}
diff --git a/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java
new file mode 100644
index 0000000..c559447
--- /dev/null
+++ b/jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java
@@ -0,0 +1,673 @@
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package com.acme.example.kafka.models;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.SchemaStore;
+
+/** Fact schema of an order */
+@org.apache.avro.specific.AvroGenerated
+public class Order extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  private static final long serialVersionUID = -8676937297320921983L;
+
+
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.acme.example.kafka.models\",\"doc\":\"Fact schema of an order\",\"fields\":[{\"name\":\"orderId\",\"type\":\"int\",\"doc\":\"Unique id of the order.\"},{\"name\":\"itemId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Id of the ordered item.\"},{\"name\":\"userId\",\"type\":{\"type\":\"string\",\"avro.jav [...]
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+  private static final SpecificData MODEL$ = new SpecificData();
+
+  private static final BinaryMessageEncoder<Order> ENCODER =
+      new BinaryMessageEncoder<>(MODEL$, SCHEMA$);
+
+  private static final BinaryMessageDecoder<Order> DECODER =
+      new BinaryMessageDecoder<>(MODEL$, SCHEMA$);
+
+  /**
+   * Return the BinaryMessageEncoder instance used by this class.
+   * @return the message encoder used by this class
+   */
+  public static BinaryMessageEncoder<Order> getEncoder() {
+    return ENCODER;
+  }
+
+  /**
+   * Return the BinaryMessageDecoder instance used by this class.
+   * @return the message decoder used by this class
+   */
+  public static BinaryMessageDecoder<Order> getDecoder() {
+    return DECODER;
+  }
+
+  /**
+   * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
+   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+   * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore
+   */
+  public static BinaryMessageDecoder<Order> createDecoder(SchemaStore resolver) {
+    return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver);
+  }
+
+  /**
+   * Serializes this Order to a ByteBuffer.
+   * @return a buffer holding the serialized data for this instance
+   * @throws java.io.IOException if this instance could not be serialized
+   */
+  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+    return ENCODER.encode(this);
+  }
+
+  /**
+   * Deserializes a Order from a ByteBuffer.
+   * @param b a byte buffer holding serialized data for an instance of this class
+   * @return a Order instance decoded from the given buffer
+   * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class
+   */
+  public static Order fromByteBuffer(
+      java.nio.ByteBuffer b) throws java.io.IOException {
+    return DECODER.decode(b);
+  }
+
+  /** Unique id of the order. */
+  private int orderId;
+  /** Id of the ordered item. */
+  private java.lang.String itemId;
+  /** Id of the user who ordered the item. */
+  private java.lang.String userId;
+  /** How much was ordered. */
+  private double quantity;
+  /** Description of item. */
+  private java.lang.String description;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use <code>newBuilder()</code>.
+   */
+  public Order() {}
+
+  /**
+   * All-args constructor.
+   * @param orderId Unique id of the order.
+   * @param itemId Id of the ordered item.
+   * @param userId Id of the user who ordered the item.
+   * @param quantity How much was ordered.
+   * @param description Description of item.
+   */
+  public Order(java.lang.Integer orderId, java.lang.String itemId, java.lang.String userId, java.lang.Double quantity, java.lang.String description) {
+    this.orderId = orderId;
+    this.itemId = itemId;
+    this.userId = userId;
+    this.quantity = quantity;
+    this.description = description;
+  }
+
+  @Override
+  public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; }
+
+  @Override
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+
+  // Used by DatumWriter.  Applications should not call.
+  @Override
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return orderId;
+    case 1: return itemId;
+    case 2: return userId;
+    case 3: return quantity;
+    case 4: return description;
+    default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
+    }
+  }
+
+  // Used by DatumReader.  Applications should not call.
+  @Override
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: orderId = (java.lang.Integer)value$; break;
+    case 1: itemId = value$ != null ? value$.toString() : null; break;
+    case 2: userId = value$ != null ? value$.toString() : null; break;
+    case 3: quantity = (java.lang.Double)value$; break;
+    case 4: description = value$ != null ? value$.toString() : null; break;
+    default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
+    }
+  }
+
+  /**
+   * Gets the value of the 'orderId' field.
+   * @return Unique id of the order.
+   */
+  public int getOrderId() {
+    return orderId;
+  }
+
+
+  /**
+   * Sets the value of the 'orderId' field.
+   * Unique id of the order.
+   * @param value the value to set.
+   */
+  public void setOrderId(int value) {
+    this.orderId = value;
+  }
+
+  /**
+   * Gets the value of the 'itemId' field.
+   * @return Id of the ordered item.
+   */
+  public java.lang.String getItemId() {
+    return itemId;
+  }
+
+
+  /**
+   * Sets the value of the 'itemId' field.
+   * Id of the ordered item.
+   * @param value the value to set.
+   */
+  public void setItemId(java.lang.String value) {
+    this.itemId = value;
+  }
+
+  /**
+   * Gets the value of the 'userId' field.
+   * @return Id of the user who ordered the item.
+   */
+  public java.lang.String getUserId() {
+    return userId;
+  }
+
+
+  /**
+   * Sets the value of the 'userId' field.
+   * Id of the user who ordered the item.
+   * @param value the value to set.
+   */
+  public void setUserId(java.lang.String value) {
+    this.userId = value;
+  }
+
+  /**
+   * Gets the value of the 'quantity' field.
+   * @return How much was ordered.
+   */
+  public double getQuantity() {
+    return quantity;
+  }
+
+
+  /**
+   * Sets the value of the 'quantity' field.
+   * How much was ordered.
+   * @param value the value to set.
+   */
+  public void setQuantity(double value) {
+    this.quantity = value;
+  }
+
+  /**
+   * Gets the value of the 'description' field.
+   * @return Description of item.
+   */
+  public java.lang.String getDescription() {
+    return description;
+  }
+
+
+  /**
+   * Sets the value of the 'description' field.
+   * Description of item.
+   * @param value the value to set.
+   */
+  public void setDescription(java.lang.String value) {
+    this.description = value;
+  }
+
+  /**
+   * Creates a new Order RecordBuilder.
+   * @return A new Order RecordBuilder
+   */
+  public static com.acme.example.kafka.models.Order.Builder newBuilder() {
+    return new com.acme.example.kafka.models.Order.Builder();
+  }
+
+  /**
+   * Creates a new Order RecordBuilder by copying an existing Builder.
+   * @param other The existing builder to copy.
+   * @return A new Order RecordBuilder
+   */
+  public static com.acme.example.kafka.models.Order.Builder newBuilder(com.acme.example.kafka.models.Order.Builder other) {
+    if (other == null) {
+      return new com.acme.example.kafka.models.Order.Builder();
+    } else {
+      return new com.acme.example.kafka.models.Order.Builder(other);
+    }
+  }
+
+  /**
+   * Creates a new Order RecordBuilder by copying an existing Order instance.
+   * @param other The existing instance to copy.
+   * @return A new Order RecordBuilder
+   */
+  public static com.acme.example.kafka.models.Order.Builder newBuilder(com.acme.example.kafka.models.Order other) {
+    if (other == null) {
+      return new com.acme.example.kafka.models.Order.Builder();
+    } else {
+      return new com.acme.example.kafka.models.Order.Builder(other);
+    }
+  }
+
+  /**
+   * RecordBuilder for Order instances.
+   */
+  @org.apache.avro.specific.AvroGenerated
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Order>
+    implements org.apache.avro.data.RecordBuilder<Order> {
+
+    /** Unique id of the order. */
+    private int orderId;
+    /** Id of the ordered item. */
+    private java.lang.String itemId;
+    /** Id of the user who ordered the item. */
+    private java.lang.String userId;
+    /** How much was ordered. */
+    private double quantity;
+    /** Description of item. */
+    private java.lang.String description;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(SCHEMA$, MODEL$);
+    }
+
+    /**
+     * Creates a Builder by copying an existing Builder.
+     * @param other The existing Builder to copy.
+     */
+    private Builder(com.acme.example.kafka.models.Order.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.orderId)) {
+        this.orderId = data().deepCopy(fields()[0].schema(), other.orderId);
+        fieldSetFlags()[0] = other.fieldSetFlags()[0];
+      }
+      if (isValidValue(fields()[1], other.itemId)) {
+        this.itemId = data().deepCopy(fields()[1].schema(), other.itemId);
+        fieldSetFlags()[1] = other.fieldSetFlags()[1];
+      }
+      if (isValidValue(fields()[2], other.userId)) {
+        this.userId = data().deepCopy(fields()[2].schema(), other.userId);
+        fieldSetFlags()[2] = other.fieldSetFlags()[2];
+      }
+      if (isValidValue(fields()[3], other.quantity)) {
+        this.quantity = data().deepCopy(fields()[3].schema(), other.quantity);
+        fieldSetFlags()[3] = other.fieldSetFlags()[3];
+      }
+      if (isValidValue(fields()[4], other.description)) {
+        this.description = data().deepCopy(fields()[4].schema(), other.description);
+        fieldSetFlags()[4] = other.fieldSetFlags()[4];
+      }
+    }
+
+    /**
+     * Creates a Builder by copying an existing Order instance
+     * @param other The existing instance to copy.
+     */
+    private Builder(com.acme.example.kafka.models.Order other) {
+      super(SCHEMA$, MODEL$);
+      if (isValidValue(fields()[0], other.orderId)) {
+        this.orderId = data().deepCopy(fields()[0].schema(), other.orderId);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.itemId)) {
+        this.itemId = data().deepCopy(fields()[1].schema(), other.itemId);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.userId)) {
+        this.userId = data().deepCopy(fields()[2].schema(), other.userId);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.quantity)) {
+        this.quantity = data().deepCopy(fields()[3].schema(), other.quantity);
+        fieldSetFlags()[3] = true;
+      }
+      if (isValidValue(fields()[4], other.description)) {
+        this.description = data().deepCopy(fields()[4].schema(), other.description);
+        fieldSetFlags()[4] = true;
+      }
+    }
+
+    /**
+      * Gets the value of the 'orderId' field.
+      * Unique id of the order.
+      * @return The value.
+      */
+    public int getOrderId() {
+      return orderId;
+    }
+
+
+    /**
+      * Sets the value of the 'orderId' field.
+      * Unique id of the order.
+      * @param value The value of 'orderId'.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder setOrderId(int value) {
+      validate(fields()[0], value);
+      this.orderId = value;
+      fieldSetFlags()[0] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'orderId' field has been set.
+      * Unique id of the order.
+      * @return True if the 'orderId' field has been set, false otherwise.
+      */
+    public boolean hasOrderId() {
+      return fieldSetFlags()[0];
+    }
+
+
+    /**
+      * Clears the value of the 'orderId' field.
+      * Unique id of the order.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder clearOrderId() {
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'itemId' field.
+      * Id of the ordered item.
+      * @return The value.
+      */
+    public java.lang.String getItemId() {
+      return itemId;
+    }
+
+
+    /**
+      * Sets the value of the 'itemId' field.
+      * Id of the ordered item.
+      * @param value The value of 'itemId'.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder setItemId(java.lang.String value) {
+      validate(fields()[1], value);
+      this.itemId = value;
+      fieldSetFlags()[1] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'itemId' field has been set.
+      * Id of the ordered item.
+      * @return True if the 'itemId' field has been set, false otherwise.
+      */
+    public boolean hasItemId() {
+      return fieldSetFlags()[1];
+    }
+
+
+    /**
+      * Clears the value of the 'itemId' field.
+      * Id of the ordered item.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder clearItemId() {
+      itemId = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'userId' field.
+      * Id of the user who ordered the item.
+      * @return The value.
+      */
+    public java.lang.String getUserId() {
+      return userId;
+    }
+
+
+    /**
+      * Sets the value of the 'userId' field.
+      * Id of the user who ordered the item.
+      * @param value The value of 'userId'.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder setUserId(java.lang.String value) {
+      validate(fields()[2], value);
+      this.userId = value;
+      fieldSetFlags()[2] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'userId' field has been set.
+      * Id of the user who ordered the item.
+      * @return True if the 'userId' field has been set, false otherwise.
+      */
+    public boolean hasUserId() {
+      return fieldSetFlags()[2];
+    }
+
+
+    /**
+      * Clears the value of the 'userId' field.
+      * Id of the user who ordered the item.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder clearUserId() {
+      userId = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'quantity' field.
+      * How much was ordered.
+      * @return The value.
+      */
+    public double getQuantity() {
+      return quantity;
+    }
+
+
+    /**
+      * Sets the value of the 'quantity' field.
+      * How much was ordered.
+      * @param value The value of 'quantity'.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder setQuantity(double value) {
+      validate(fields()[3], value);
+      this.quantity = value;
+      fieldSetFlags()[3] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'quantity' field has been set.
+      * How much was ordered.
+      * @return True if the 'quantity' field has been set, false otherwise.
+      */
+    public boolean hasQuantity() {
+      return fieldSetFlags()[3];
+    }
+
+
+    /**
+      * Clears the value of the 'quantity' field.
+      * How much was ordered.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder clearQuantity() {
+      fieldSetFlags()[3] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'description' field.
+      * Description of item.
+      * @return The value.
+      */
+    public java.lang.String getDescription() {
+      return description;
+    }
+
+
+    /**
+      * Sets the value of the 'description' field.
+      * Description of item.
+      * @param value The value of 'description'.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder setDescription(java.lang.String value) {
+      validate(fields()[4], value);
+      this.description = value;
+      fieldSetFlags()[4] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'description' field has been set.
+      * Description of item.
+      * @return True if the 'description' field has been set, false otherwise.
+      */
+    public boolean hasDescription() {
+      return fieldSetFlags()[4];
+    }
+
+
+    /**
+      * Clears the value of the 'description' field.
+      * Description of item.
+      * @return This builder.
+      */
+    public com.acme.example.kafka.models.Order.Builder clearDescription() {
+      description = null;
+      fieldSetFlags()[4] = false;
+      return this;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Order build() {
+      try {
+        Order record = new Order();
+        record.orderId = fieldSetFlags()[0] ? this.orderId : (java.lang.Integer) defaultValue(fields()[0]);
+        record.itemId = fieldSetFlags()[1] ? this.itemId : (java.lang.String) defaultValue(fields()[1]);
+        record.userId = fieldSetFlags()[2] ? this.userId : (java.lang.String) defaultValue(fields()[2]);
+        record.quantity = fieldSetFlags()[3] ? this.quantity : (java.lang.Double) defaultValue(fields()[3]);
+        record.description = fieldSetFlags()[4] ? this.description : (java.lang.String) defaultValue(fields()[4]);
+        return record;
+      } catch (org.apache.avro.AvroMissingFieldException e) {
+        throw e;
+      } catch (java.lang.Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final org.apache.avro.io.DatumWriter<Order>
+    WRITER$ = (org.apache.avro.io.DatumWriter<Order>)MODEL$.createDatumWriter(SCHEMA$);
+
+  @Override public void writeExternal(java.io.ObjectOutput out)
+    throws java.io.IOException {
+    WRITER$.write(this, SpecificData.getEncoder(out));
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final org.apache.avro.io.DatumReader<Order>
+    READER$ = (org.apache.avro.io.DatumReader<Order>)MODEL$.createDatumReader(SCHEMA$);
+
+  @Override public void readExternal(java.io.ObjectInput in)
+    throws java.io.IOException {
+    READER$.read(this, SpecificData.getDecoder(in));
+  }
+
+  @Override protected boolean hasCustomCoders() { return true; }
+
+  @Override public void customEncode(org.apache.avro.io.Encoder out)
+    throws java.io.IOException
+  {
+    out.writeInt(this.orderId);
+
+    out.writeString(this.itemId);
+
+    out.writeString(this.userId);
+
+    out.writeDouble(this.quantity);
+
+    out.writeString(this.description);
+
+  }
+
+  @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in)
+    throws java.io.IOException
+  {
+    org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff();
+    if (fieldOrder == null) {
+      this.orderId = in.readInt();
+
+      this.itemId = in.readString();
+
+      this.userId = in.readString();
+
+      this.quantity = in.readDouble();
+
+      this.description = in.readString();
+
+    } else {
+      for (int i = 0; i < 5; i++) {
+        switch (fieldOrder[i].pos()) {
+        case 0:
+          this.orderId = in.readInt();
+          break;
+
+        case 1:
+          this.itemId = in.readString();
+          break;
+
+        case 2:
+          this.userId = in.readString();
+          break;
+
+        case 3:
+          this.quantity = in.readDouble();
+          break;
+
+        case 4:
+          this.description = in.readString();
+          break;
+
+        default:
+          throw new java.io.IOException("Corrupt ResolvingDecoder.");
+        }
+      }
+    }
+  }
+}
+
+
+
+
+
+
+
+
+
+


[camel-kamelets-examples] 02/02: Added an example of Apicurio Registry Usage with Kafka through YAML - Externalize Schema Registry URL

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit 44ea3b9d17c34f047b3b370e61cb1b390e1216d4
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Oct 10 13:49:28 2023 +0200

    Added an example of Apicurio Registry Usage with Kafka through YAML - Externalize Schema Registry URL
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 jbang/kafka-apicurio-schema-registry/application.properties | 1 +
 jbang/kafka-apicurio-schema-registry/kafka-log.yaml         | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/jbang/kafka-apicurio-schema-registry/application.properties b/jbang/kafka-apicurio-schema-registry/application.properties
index d491e63..8202fae 100644
--- a/jbang/kafka-apicurio-schema-registry/application.properties
+++ b/jbang/kafka-apicurio-schema-registry/application.properties
@@ -1,2 +1,3 @@
 bootstrap.servers=localhost:9092
 topic=my-topic
+schema.registry.url=http://localhost:8080/apis/registry/v2
diff --git a/jbang/kafka-apicurio-schema-registry/kafka-log.yaml b/jbang/kafka-apicurio-schema-registry/kafka-log.yaml
index 9ca2187..58fb7d2 100644
--- a/jbang/kafka-apicurio-schema-registry/kafka-log.yaml
+++ b/jbang/kafka-apicurio-schema-registry/kafka-log.yaml
@@ -31,7 +31,7 @@
         brokers: "{{bootstrap.servers}}"
         groupId: 'my-consumer-group'
         valueDeserializer: 'io.apicurio.registry.serde.avro.AvroKafkaDeserializer'
-        additionalProperties.apicurio.registry.url: 'http://localhost:8080/apis/registry/v2'
+        additionalProperties.apicurio.registry.url: '{{schema.registry.url}}'
         additionalProperties.apicurio.registry.avro-datum-provider: 'io.apicurio.registry.serde.avro.ReflectAvroDatumProvider'
       steps:
         - to: