You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2022/03/18 13:11:15 UTC

[camel-quarkus] 05/08: Fix #3592 add some ReflectiveClassBuildItem for camel-kafka (#3594)

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch camel-main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit 1c06e87c21b81e51e0a26162553e363a11db97bb
Author: Amos Feng <zh...@gmail.com>
AuthorDate: Fri Mar 11 17:06:30 2022 +0800

    Fix #3592 add some ReflectiveClassBuildItem for camel-kafka (#3594)
---
 .../component/kafka/deployment/KafkaProcessor.java | 26 ++++++++++
 .../quarkus/test/support/kafka/InjectKafka.java    | 27 ++++++++++
 .../test/support/kafka/KafkaTestResource.java      |  7 +++
 integration-tests/kafka/pom.xml                    |  4 ++
 .../kafka/it/CamelKafkaHealthCheckIT.java          | 23 +++++++++
 .../kafka/it/CamelKafkaHealthCheckTest.java        | 59 ++++++++++++++++++++++
 .../kafka/it/KafkaHealthCheckProfile.java          | 29 +++++++++++
 7 files changed, 175 insertions(+)

diff --git a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
index 90e2945..270a7a1 100644
--- a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
+++ b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.quarkus.component.kafka.deployment;
 
+import java.util.Collection;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
 import io.quarkus.deployment.Capabilities;
@@ -24,19 +26,28 @@ import io.quarkus.deployment.Capability;
 import io.quarkus.deployment.IsNormal;
 import io.quarkus.deployment.annotations.BuildProducer;
 import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
 import io.quarkus.deployment.builditem.DevServicesLauncherConfigResultBuildItem;
 import io.quarkus.deployment.builditem.FeatureBuildItem;
 import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
 import io.quarkus.deployment.dev.devservices.GlobalDevServicesConfig;
 import io.quarkus.kafka.client.deployment.KafkaBuildTimeConfig;
 import org.apache.camel.quarkus.component.kafka.KafkaClientFactoryProducer;
 import org.eclipse.microprofile.config.Config;
 import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.jandex.ClassInfo;
+import org.jboss.jandex.DotName;
+import org.jboss.jandex.IndexView;
 
 class KafkaProcessor {
     private static final String FEATURE = "camel-kafka";
     private static final String CAMEL_KAFKA_BROKERS = "camel.component.kafka.brokers";
     private static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+    private static final DotName[] KAFKA_CLIENTS_TYPES = {
+            DotName.createSimple("org.apache.kafka.clients.producer.Producer"),
+            DotName.createSimple("org.apache.kafka.clients.consumer.Consumer")
+    };
 
     @BuildStep
     FeatureBuildItem feature() {
@@ -68,4 +79,19 @@ class KafkaProcessor {
             }
         }
     }
+
+    @BuildStep
+    public void reflectiveClasses(CombinedIndexBuildItem combinedIndex,
+            BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
+        IndexView index = combinedIndex.getIndex();
+
+        Stream.of(KAFKA_CLIENTS_TYPES)
+                .map(index::getAllKnownImplementors)
+                .flatMap(Collection::stream)
+                .map(ClassInfo::toString)
+                .forEach(name -> reflectiveClass.produce(new ReflectiveClassBuildItem(false, true, name)));
+
+        reflectiveClass
+                .produce(new ReflectiveClassBuildItem(false, true, "org.apache.kafka.clients.producer.internals.Sender"));
+    }
 }
diff --git a/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/InjectKafka.java b/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/InjectKafka.java
new file mode 100644
index 0000000..02dca6a
--- /dev/null
+++ b/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/InjectKafka.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.test.support.kafka;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ ElementType.ANNOTATION_TYPE, ElementType.METHOD, ElementType.FIELD })
+@Retention(RetentionPolicy.RUNTIME)
+public @interface InjectKafka {
+}
diff --git a/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/KafkaTestResource.java b/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/KafkaTestResource.java
index dce93d4..9c81c69 100644
--- a/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/KafkaTestResource.java
+++ b/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/KafkaTestResource.java
@@ -63,4 +63,11 @@ public class KafkaTestResource implements QuarkusTestResourceLifecycleManager {
             }
         }
     }
+
+    @Override
+    public void inject(TestInjector testInjector) {
+        testInjector.injectIntoFields(container,
+                new TestInjector.AnnotatedAndMatchesType(InjectKafka.class, KafkaContainer.class));
+    }
+
 }
diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml
index e9e3e24..d298715 100644
--- a/integration-tests/kafka/pom.xml
+++ b/integration-tests/kafka/pom.xml
@@ -57,6 +57,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-microprofile-health</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
             <artifactId>camel-quarkus-integration-tests-support-kafka</artifactId>
         </dependency>
         <dependency>
diff --git a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckIT.java b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckIT.java
new file mode 100644
index 0000000..2961643
--- /dev/null
+++ b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckIT.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka.it;
+
+import io.quarkus.test.junit.NativeImageTest;
+
+@NativeImageTest
+public class CamelKafkaHealthCheckIT extends CamelKafkaHealthCheckTest {
+}
diff --git a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckTest.java b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckTest.java
new file mode 100644
index 0000000..27bd629
--- /dev/null
+++ b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka.it;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.TestProfile;
+import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
+import org.apache.camel.quarkus.test.support.kafka.InjectKafka;
+import org.apache.camel.quarkus.test.support.kafka.KafkaTestResource;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.KafkaContainer;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@QuarkusTest
+@QuarkusTestResource(KafkaTestResource.class)
+@TestProfile(KafkaHealthCheckProfile.class)
+public class CamelKafkaHealthCheckTest {
+
+    @InjectKafka
+    KafkaContainer container;
+
+    @Test
+    void testHealthCheck() {
+        RestAssured.when().get("/q/health").then()
+                .contentType(ContentType.JSON)
+                .header("Content-Type", containsString("charset=UTF-8"))
+                .body("status", is("UP"));
+
+        // stop the kafka container to test health-check DOWN
+        container.stop();
+
+        RestAssured.when().get("/q/health").then()
+                .contentType(ContentType.JSON)
+                .header("Content-Type", containsString("charset=UTF-8"))
+                .body("status", is("DOWN"),
+                        "checks.find { it.name == 'camel-kafka' }.status", is("DOWN"),
+                        "checks.find { it.name == 'camel-kafka' }.data.topic", notNullValue(),
+                        "checks.find { it.name == 'camel-kafka' }.data.'client.id'", notNullValue());
+    }
+}
diff --git a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/KafkaHealthCheckProfile.java b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/KafkaHealthCheckProfile.java
new file mode 100644
index 0000000..620c5dc
--- /dev/null
+++ b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/KafkaHealthCheckProfile.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka.it;
+
+import java.util.Map;
+
+import io.quarkus.test.junit.QuarkusTestProfile;
+
+public class KafkaHealthCheckProfile implements QuarkusTestProfile {
+    @Override
+    public Map<String, String> getConfigOverrides() {
+        // force shutdown
+        return Map.of("camel.main.shutdownTimeout", "10");
+    }
+}