You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/10/21 11:56:21 UTC

[camel] branch main updated (5db583b1cc8 -> c0401ffab6a)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 5db583b1cc8 Sync deps
     new 0f74cd588db camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis
     new bc74f6b58c1 camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis
     new 6b87a31bbb4 camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis
     new c0401ffab6a camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 components/camel-aws/camel-aws2-kinesis/pom.xml    |  4 +
 .../component/aws2/kinesis/Kinesis2Component.java  |  2 -
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 19 +++++
 .../aws2/kinesis/Kinesis2ConsumerHealthCheck.java} | 33 ++++----
 ...nesis2ConsumerHealthCheckProfileCredsTest.java} | 22 +++---
 ...inesis2ConsumerHealthCheckStaticCredsTest.java} | 24 +++---
 .../Kinesis2ConsumerHealthCustomClientTest.java}   | 20 ++---
 .../KinesisComponentVerifierExtensionTest.java     | 91 ----------------------
 8 files changed, 73 insertions(+), 142 deletions(-)
 copy components/camel-aws/{camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheck.java => camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java} (71%)
 copy components/camel-aws/{camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCustomClientTest.java => camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java} (81%)
 copy components/camel-aws/{camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheckProfileCredsTest.java => camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java} (81%)
 copy components/camel-aws/{camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCustomClientTest.java => camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java} (85%)
 delete mode 100644 components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentVerifierExtensionTest.java


[camel] 04/04: camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c0401ffab6af85fece582f8be23fe16d5938c1f3
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Oct 21 13:54:43 2022 +0200

    camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 .../aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java   | 6 +++---
 .../aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java    | 6 +++---
 .../aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java        | 6 +++---
 3 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java
index 027815735d9..30c8e6696a3 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.health.HealthCheck;
@@ -32,9 +35,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
 import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
 
 public class Kinesis2ConsumerHealthCheckProfileCredsTest extends CamelTestSupport {
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java
index c7a8df6823f..3e6e7e8e376 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.health.HealthCheck;
@@ -32,9 +35,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
 import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
 
 public class Kinesis2ConsumerHealthCheckStaticCredsTest extends CamelTestSupport {
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java
index 0d0bd9fd6ab..d39aff7173d 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.health.HealthCheck;
@@ -32,9 +35,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
 import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
 
 public class Kinesis2ConsumerHealthCustomClientTest extends CamelTestSupport {


[camel] 02/04: camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bc74f6b58c1672ede174efcd816fbbbd6a4360eb
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Oct 21 13:31:15 2022 +0200

    camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 .../camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java    | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java
index 1a420defb94..985704d7d20 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
+import java.util.Map;
+
 import org.apache.camel.health.HealthCheckResultBuilder;
 import org.apache.camel.impl.health.AbstractHealthCheck;
 import org.apache.camel.util.ObjectHelper;
@@ -26,8 +28,6 @@ import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
 
-import java.util.Map;
-
 public class Kinesis2ConsumerHealthCheck extends AbstractHealthCheck {
 
     private final Kinesis2Consumer kinesis2Consumer;
@@ -85,5 +85,4 @@ public class Kinesis2ConsumerHealthCheck extends AbstractHealthCheck {
 
     }
 
-
 }


[camel] 03/04: camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6b87a31bbb4fa9a71a2c5a3f168d603673dce5a3
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Oct 21 13:53:03 2022 +0200

    camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 ...inesis2ConsumerHealthCheckProfileCredsTest.java | 106 ++++++++++++++++++++
 ...Kinesis2ConsumerHealthCheckStaticCredsTest.java | 107 +++++++++++++++++++++
 .../Kinesis2ConsumerHealthCustomClientTest.java    | 106 ++++++++++++++++++++
 3 files changed, 319 insertions(+)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java
new file mode 100644
index 00000000000..027815735d9
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+public class Kinesis2ConsumerHealthCheckProfileCredsTest extends CamelTestSupport {
+
+    @RegisterExtension
+    public static AWSService service = AWSServiceFactory.createS3Service();
+
+    private static final Logger LOG = LoggerFactory.getLogger(Kinesis2ConsumerHealthCheckProfileCredsTest.class);
+
+    CamelContext context;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        context = super.createCamelContext();
+        context.getPropertiesComponent().setLocation("ref:prop");
+        Kinesis2Component component = new Kinesis2Component(context);
+        component.getConfiguration().setAmazonKinesisClient(AWSSDKClientUtils.newKinesisClient());
+        component.init();
+        context.addComponent("aws2-kinesis", component);
+
+        HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
+        registry.setCamelContext(context);
+        Object hc = registry.resolveById("context");
+        registry.register(hc);
+        hc = registry.resolveById("routes");
+        registry.register(hc);
+        hc = registry.resolveById("consumers");
+        registry.register(hc);
+        context.setExtension(HealthCheckRegistry.class, registry);
+
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("aws2-kinesis://stream?region=l&useDefaultCredentialsProvider=true")
+                        .startupOrder(2).log("${body}").routeId("test-health-it");
+            }
+        };
+    }
+
+    @Test
+    public void testConnectivity() {
+
+        Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
+        boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+
+        // health-check readiness should be down
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+            boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN));
+            boolean containsKinesis2HealthCheck = res2.stream()
+                    .filter(result -> result.getCheck().getId().startsWith("aws2-kinesis-consumer"))
+                    .findAny()
+                    .isPresent();
+            boolean hasRegionMessage = res2.stream()
+                    .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region")));
+            Assertions.assertTrue(down, "liveness check");
+            Assertions.assertTrue(containsKinesis2HealthCheck, "aws2-kinesis check");
+            Assertions.assertTrue(hasRegionMessage, "aws2-kinesis check error message");
+        });
+
+    }
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java
new file mode 100644
index 00000000000..c7a8df6823f
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+public class Kinesis2ConsumerHealthCheckStaticCredsTest extends CamelTestSupport {
+
+    @RegisterExtension
+    public static AWSService service = AWSServiceFactory.createS3Service();
+
+    private static final Logger LOG = LoggerFactory.getLogger(Kinesis2ConsumerHealthCheckStaticCredsTest.class);
+
+    CamelContext context;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        context = super.createCamelContext();
+        context.getPropertiesComponent().setLocation("ref:prop");
+        Kinesis2Component component = new Kinesis2Component(context);
+        component.getConfiguration().setAmazonKinesisClient(AWSSDKClientUtils.newKinesisClient());
+        component.init();
+        context.addComponent("aws2-kinesis", component);
+
+        // install health check manually (yes a bit cumbersome)
+        HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
+        registry.setCamelContext(context);
+        Object hc = registry.resolveById("context");
+        registry.register(hc);
+        hc = registry.resolveById("routes");
+        registry.register(hc);
+        hc = registry.resolveById("consumers");
+        registry.register(hc);
+        context.setExtension(HealthCheckRegistry.class, registry);
+
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("aws2-kinesis://stream?region=l&secretKey=l&accessKey=k")
+                        .startupOrder(2).log("${body}").routeId("test-health-it");
+            }
+        };
+    }
+
+    @Test
+    public void testConnectivity() {
+
+        Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
+        boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+
+        // health-check readiness should be down
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+            boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN));
+            boolean containsKinesis2HealthCheck = res2.stream()
+                    .filter(result -> result.getCheck().getId().startsWith("aws2-kinesis-consumer"))
+                    .findAny()
+                    .isPresent();
+            boolean hasRegionMessage = res2.stream()
+                    .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region")));
+            Assertions.assertTrue(down, "liveness check");
+            Assertions.assertTrue(containsKinesis2HealthCheck, "aws2-kinesis check");
+            Assertions.assertTrue(hasRegionMessage, "aws2-kinesis check error message");
+        });
+
+    }
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java
new file mode 100644
index 00000000000..0d0bd9fd6ab
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+public class Kinesis2ConsumerHealthCustomClientTest extends CamelTestSupport {
+
+    @RegisterExtension
+    public static AWSService service = AWSServiceFactory.createS3Service();
+
+    private static final Logger LOG = LoggerFactory.getLogger(Kinesis2ConsumerHealthCustomClientTest.class);
+
+    CamelContext context;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        context = super.createCamelContext();
+        context.getPropertiesComponent().setLocation("ref:prop");
+        Kinesis2Component component = new Kinesis2Component(context);
+        component.getConfiguration().setAmazonKinesisClient(AWSSDKClientUtils.newKinesisClient());
+        component.init();
+        context.addComponent("aws2-kinesis", component);
+
+        HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
+        registry.setCamelContext(context);
+        Object hc = registry.resolveById("context");
+        registry.register(hc);
+        hc = registry.resolveById("routes");
+        registry.register(hc);
+        hc = registry.resolveById("consumers");
+        registry.register(hc);
+        context.setExtension(HealthCheckRegistry.class, registry);
+
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("aws2-kinesis://stream")
+                        .startupOrder(2).log("${body}").routeId("test-health-it");
+            }
+        };
+    }
+
+    @Test
+    public void testConnectivity() {
+
+        Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
+        boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
+        Assertions.assertTrue(up, "liveness check");
+
+        // health-check readiness should be down
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
+            boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN));
+            boolean containsAws2S3HealthCheck = res2.stream()
+                    .filter(result -> result.getCheck().getId().startsWith("aws2-kinesis-consumer"))
+                    .findAny()
+                    .isPresent();
+            boolean hasRegionMessage = res2.stream()
+                    .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region")));
+            Assertions.assertTrue(down, "liveness check");
+            Assertions.assertTrue(containsAws2S3HealthCheck, "aws2-kinesis check");
+            Assertions.assertFalse(hasRegionMessage, "aws2-kinesis check error message");
+        });
+
+    }
+}


[camel] 01/04: camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0f74cd588db47bb3342987d282b424cdb90bb4ea
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Oct 21 13:28:17 2022 +0200

    camel-health - Add health checks for components that has extension for connectivity verification - AWS Kinesis
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 components/camel-aws/camel-aws2-kinesis/pom.xml    |  4 +
 .../component/aws2/kinesis/Kinesis2Component.java  |  2 -
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 19 +++++
 .../aws2/kinesis/Kinesis2ConsumerHealthCheck.java  | 89 +++++++++++++++++++++
 .../KinesisComponentVerifierExtensionTest.java     | 91 ----------------------
 5 files changed, 112 insertions(+), 93 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/pom.xml b/components/camel-aws/camel-aws2-kinesis/pom.xml
index 8e9002e8acb..02635dffa8e 100644
--- a/components/camel-aws/camel-aws2-kinesis/pom.xml
+++ b/components/camel-aws/camel-aws2-kinesis/pom.xml
@@ -55,6 +55,10 @@
             <artifactId>apache-client</artifactId>
             <version>${aws-java-sdk2-version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-health</artifactId>
+        </dependency>
 
         <!-- for testing -->
         <dependency>
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
index 6d8fd79379f..1bdf0e6ff9f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
@@ -36,8 +36,6 @@ public class Kinesis2Component extends DefaultComponent {
 
     public Kinesis2Component(CamelContext context) {
         super(context);
-
-        registerExtension(new Kinesis2ComponentVerifierExtension());
     }
 
     @Override
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 2ec8a11410a..85104c473b4 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -24,6 +24,8 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.WritableHealthCheckRepository;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
@@ -50,6 +52,9 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     private boolean isShardClosed;
     private ResumeStrategy resumeStrategy;
 
+    private WritableHealthCheckRepository healthCheckRepository;
+    private Kinesis2ConsumerHealthCheck consumerHealthCheck;
+
     public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
     }
@@ -236,8 +241,22 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     protected void doStart() throws Exception {
         super.doStart();
 
+        healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(
+                getEndpoint().getCamelContext(),
+                "components",
+                WritableHealthCheckRepository.class);
+
+        if (healthCheckRepository != null) {
+            consumerHealthCheck = new Kinesis2ConsumerHealthCheck(this, getRouteId());
+            healthCheckRepository.addHealthCheck(consumerHealthCheck);
+        }
+
         if (resumeStrategy != null) {
             resumeStrategy.loadCache();
         }
     }
+
+    protected Kinesis2Configuration getConfiguration() {
+        return getEndpoint().getConfiguration();
+    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java
new file mode 100644
index 00000000000..1a420defb94
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis;
+
+import org.apache.camel.health.HealthCheckResultBuilder;
+import org.apache.camel.impl.health.AbstractHealthCheck;
+import org.apache.camel.util.ObjectHelper;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
+
+import java.util.Map;
+
+public class Kinesis2ConsumerHealthCheck extends AbstractHealthCheck {
+
+    private final Kinesis2Consumer kinesis2Consumer;
+    private final String routeId;
+
+    public Kinesis2ConsumerHealthCheck(Kinesis2Consumer kinesis2Consumer, String routeId) {
+        super("camel", "aws2-kinesis-consumer-" + routeId);
+        this.kinesis2Consumer = kinesis2Consumer;
+        this.routeId = routeId;
+    }
+
+    @Override
+    public boolean isLiveness() {
+        // this health check is only readiness
+        return false;
+    }
+
+    @Override
+    protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) {
+
+        try {
+            Kinesis2Configuration configuration = kinesis2Consumer.getConfiguration();
+            if (!KinesisClient.serviceMetadata().regions().contains(Region.of(configuration.getRegion()))) {
+                builder.message("The service is not supported in this region");
+                builder.down();
+                return;
+            }
+            KinesisClient client;
+            if (!configuration.isUseDefaultCredentialsProvider()) {
+                AwsBasicCredentials cred
+                        = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey());
+                KinesisClientBuilder clientBuilder = KinesisClient.builder();
+                client = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred))
+                        .region(Region.of(configuration.getRegion())).build();
+            } else if (ObjectHelper.isNotEmpty(configuration.getAmazonKinesisClient())) {
+                client = configuration.getAmazonKinesisClient();
+            } else {
+                KinesisClientBuilder clientBuilder = KinesisClient.builder();
+                client = clientBuilder.region(Region.of(configuration.getRegion())).build();
+            }
+            client.listStreams();
+        } catch (SdkClientException e) {
+            builder.message(e.getMessage());
+            builder.error(e);
+            builder.down();
+            return;
+
+        } catch (Exception e) {
+            builder.error(e);
+            builder.down();
+            return;
+        }
+
+        builder.up();
+
+    }
+
+
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentVerifierExtensionTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentVerifierExtensionTest.java
deleted file mode 100644
index 4c6aac74959..00000000000
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentVerifierExtensionTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.camel.Component;
-import org.apache.camel.component.extension.ComponentVerifierExtension;
-import org.apache.camel.test.junit5.CamelTestSupport;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class KinesisComponentVerifierExtensionTest extends CamelTestSupport {
-
-    // *************************************************
-    // Tests (parameters)
-    // *************************************************
-    @Override
-    public boolean isUseRouteBuilder() {
-        return false;
-    }
-
-    @Test
-    public void testParameters() {
-        Component component = context().getComponent("aws2-kinesis");
-
-        ComponentVerifierExtension verifier
-                = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
-
-        Map<String, Object> parameters = new HashMap<>();
-        parameters.put("secretKey", "l");
-        parameters.put("accessKey", "k");
-        parameters.put("region", "l");
-        parameters.put("streamName", "test");
-
-        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters);
-
-        assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus());
-    }
-
-    @Test
-    public void testConnectivity() {
-        Component component = context().getComponent("aws2-kinesis");
-        ComponentVerifierExtension verifier
-                = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
-
-        Map<String, Object> parameters = new HashMap<>();
-        parameters.put("secretKey", "l");
-        parameters.put("accessKey", "k");
-        parameters.put("region", "US_EAST_1");
-        parameters.put("streamName", "test");
-
-        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
-
-        assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
-    }
-
-    @Test
-    public void testConnectivityAndRegion() {
-        Component component = context().getComponent("aws2-kinesis");
-        ComponentVerifierExtension verifier
-                = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
-
-        Map<String, Object> parameters = new HashMap<>();
-        parameters.put("secretKey", "l");
-        parameters.put("accessKey", "k");
-        parameters.put("region", "l");
-        parameters.put("streamName", "test");
-
-        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
-
-        assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
-    }
-
-}