You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2024/03/08 13:56:31 UTC

(camel) 01/03: CAMEL-20543 - Camel-AWS-Bedrock-Agent: Support more operations on the producer side

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch CAMEL-20543
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 84e8e92762b48ace7e93b9e25f4fa0d9c1b3bcdf
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Mar 8 14:23:38 2024 +0100

    CAMEL-20543 - Camel-AWS-Bedrock-Agent: Support more operations on the producer side
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 .../aws2/bedrock/agent/BedrockAgentOperations.java |  4 +-
 .../aws2/bedrock/agent/BedrockAgentProducer.java   | 64 ++++++++++++++++++++--
 .../agent/integration/BedrockAgentProducerIT.java  | 21 ++++++-
 3 files changed, 81 insertions(+), 8 deletions(-)

diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
index 45b2b2cbff8..57710e6951e 100644
--- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
+++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
@@ -18,5 +18,7 @@ package org.apache.camel.component.aws2.bedrock.agent;
 
 public enum BedrockAgentOperations {
 
-    startIngestionJob
+    startIngestionJob,
+
+    listIngestionJobs
 }
diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
index ba479bd3d0a..de54bc34ed8 100644
--- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
+++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
@@ -27,9 +27,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.services.bedrockagent.BedrockAgentClient;
+import software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsRequest;
+import software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsResponse;
 import software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobRequest;
 import software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobResponse;
-import software.amazon.awssdk.services.bedrockagentruntime.model.*;
 
 /**
  * A Producer which sends messages to the Amazon Bedrock Agent Service <a href="http://aws.amazon.com/bedrock/">AWS
@@ -50,6 +51,9 @@ public class BedrockAgentProducer extends DefaultProducer {
             case startIngestionJob:
                 startIngestionJob(getEndpoint().getBedrockAgentClient(), exchange);
                 break;
+            case listIngestionJobs:
+                listIngestionJobs(getEndpoint().getBedrockAgentClient(), exchange);
+                break;
             default:
                 throw new IllegalArgumentException("Unsupported operation");
         }
@@ -95,7 +99,7 @@ public class BedrockAgentProducer extends DefaultProducer {
                     throw ase;
                 }
                 Message message = getMessageForResponse(exchange);
-                prepareResponse(result, message);
+                prepareIngestionJobResponse(result, message);
             }
         } else {
             String knowledgeBaseId;
@@ -123,14 +127,66 @@ public class BedrockAgentProducer extends DefaultProducer {
             builder.dataSourceId(dataSourceId);
             StartIngestionJobResponse output = bedrockAgentClient.startIngestionJob(builder.build());
             Message message = getMessageForResponse(exchange);
-            prepareResponse(output, message);
+            prepareIngestionJobResponse(output, message);
+        }
+    }
+
+
+    private void listIngestionJobs(BedrockAgentClient bedrockAgentClient, Exchange exchange)
+            throws InvalidPayloadException {
+        if (getConfiguration().isPojoRequest()) {
+            Object payload = exchange.getMessage().getMandatoryBody();
+            if (payload instanceof ListIngestionJobsRequest) {
+                ListIngestionJobsResponse result;
+                try {
+                    result = bedrockAgentClient.listIngestionJobs((ListIngestionJobsRequest) payload);
+                } catch (AwsServiceException ase) {
+                    LOG.trace("Start Ingestion Job command returned the error code {}", ase.awsErrorDetails().errorCode());
+                    throw ase;
+                }
+                Message message = getMessageForResponse(exchange);
+                prepareListIngestionJobsResponse(result, message);
+            }
+        } else {
+            String knowledgeBaseId;
+            String dataSourceId;
+            ListIngestionJobsRequest.Builder builder = ListIngestionJobsRequest.builder();
+            if (ObjectHelper.isEmpty(getConfiguration().getKnowledgeBaseId())) {
+                if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID))) {
+                    knowledgeBaseId = exchange.getIn().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID, String.class);
+                } else {
+                    throw new IllegalArgumentException("KnowledgeBaseId must be specified");
+                }
+            } else {
+                knowledgeBaseId = getConfiguration().getKnowledgeBaseId();
+            }
+            if (ObjectHelper.isEmpty(getConfiguration().getDataSourceId())) {
+                if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.DATASOURCE_ID))) {
+                    dataSourceId = exchange.getIn().getHeader(BedrockAgentConstants.DATASOURCE_ID, String.class);
+                } else {
+                    throw new IllegalArgumentException("DataSourceId must be specified");
+                }
+            } else {
+                dataSourceId = getConfiguration().getDataSourceId();
+            }
+            builder.knowledgeBaseId(knowledgeBaseId);
+            builder.dataSourceId(dataSourceId);
+            ListIngestionJobsResponse output = bedrockAgentClient.listIngestionJobs(builder.build());
+            Message message = getMessageForResponse(exchange);
+            prepareListIngestionJobsResponse(output, message);
         }
     }
 
-    private void prepareResponse(StartIngestionJobResponse result, Message message) {
+    private void prepareIngestionJobResponse(StartIngestionJobResponse result, Message message) {
         message.setBody(result.ingestionJob().ingestionJobId());
     }
 
+    private void prepareListIngestionJobsResponse(ListIngestionJobsResponse result, Message message) {
+        if (result.hasIngestionJobSummaries()) {
+            message.setBody(result.ingestionJobSummaries());
+        }
+    }
+
     public static Message getMessageForResponse(final Exchange exchange) {
         return exchange.getMessage();
     }
diff --git a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
index cf370902ac7..7d75c128ebc 100644
--- a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
+++ b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
@@ -29,10 +29,10 @@ import org.junit.jupiter.api.condition.EnabledIfSystemProperties;
 import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 
 // Must be manually tested. Provide your own accessKey and secretKey using -Daws.manual.access.key and -Daws.manual.secret.key
-@EnabledIfSystemProperties({
+/*@EnabledIfSystemProperties({
         @EnabledIfSystemProperty(named = "aws.manual.access.key", matches = ".*", disabledReason = "Access key not provided"),
         @EnabledIfSystemProperty(named = "aws.manual.secret.key", matches = ".*", disabledReason = "Secret key not provided")
-})
+})*/
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 class BedrockAgentProducerIT extends CamelTestSupport {
 
@@ -54,13 +54,28 @@ class BedrockAgentProducerIT extends CamelTestSupport {
         MockEndpoint.assertIsSatisfied(context);
     }
 
+    @Test
+    public void testListIngestionJobs() throws InterruptedException {
+
+        result.expectedMessageCount(1);
+        final Exchange result = template.send("direct:list_ingestion_jobs", exchange -> {
+            exchange.getMessage().setHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID, "QOZ68KOXTS");
+            exchange.getMessage().setHeader(BedrockAgentConstants.DATASOURCE_ID, "9V85PTUEAH");
+        });
+
+        MockEndpoint.assertIsSatisfied(context);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             @Override
             public void configure() {
                 from("direct:start_ingestion")
-                        .to("aws-bedrock-agent:label?accessKey=RAW({{aws.manual.access.key}})&secretKey=RAW({{aws.manual.secret.key}}&region=us-east-1&operation=startIngestionJob&knowledgeBaseId=QOZ68KOXTS")
+                        .to("aws-bedrock-agent:label?useDefaultCredentialsProvider=true&region=us-east-1&operation=startIngestionJob")
+                        .to(result);
+                from("direct:list_ingestion_jobs")
+                        .to("aws-bedrock-agent:label?useDefaultCredentialsProvider=true&region=us-east-1&operation=listIngestionJobs")
                         .log("${body}")
                         .to(result);
             }