You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2024/03/08 13:56:31 UTC
(camel) 01/03: CAMEL-20543 - Camel-AWS-Bedrock-Agent: Support more operations on the producer side
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch CAMEL-20543
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 84e8e92762b48ace7e93b9e25f4fa0d9c1b3bcdf
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Mar 8 14:23:38 2024 +0100
CAMEL-20543 - Camel-AWS-Bedrock-Agent: Support more operations on the producer side
Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
.../aws2/bedrock/agent/BedrockAgentOperations.java | 4 +-
.../aws2/bedrock/agent/BedrockAgentProducer.java | 64 ++++++++++++++++++++--
.../agent/integration/BedrockAgentProducerIT.java | 21 ++++++-
3 files changed, 81 insertions(+), 8 deletions(-)
diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
index 45b2b2cbff8..57710e6951e 100644
--- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
+++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
@@ -18,5 +18,7 @@ package org.apache.camel.component.aws2.bedrock.agent;
public enum BedrockAgentOperations {
- startIngestionJob
+ startIngestionJob,
+
+ listIngestionJobs
}
diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
index ba479bd3d0a..de54bc34ed8 100644
--- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
+++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
@@ -27,9 +27,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.services.bedrockagent.BedrockAgentClient;
+import software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsRequest;
+import software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsResponse;
import software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobRequest;
import software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobResponse;
-import software.amazon.awssdk.services.bedrockagentruntime.model.*;
/**
* A Producer which sends messages to the Amazon Bedrock Agent Service <a href="http://aws.amazon.com/bedrock/">AWS
@@ -50,6 +51,9 @@ public class BedrockAgentProducer extends DefaultProducer {
case startIngestionJob:
startIngestionJob(getEndpoint().getBedrockAgentClient(), exchange);
break;
+ case listIngestionJobs:
+ listIngestionJobs(getEndpoint().getBedrockAgentClient(), exchange);
+ break;
default:
throw new IllegalArgumentException("Unsupported operation");
}
@@ -95,7 +99,7 @@ public class BedrockAgentProducer extends DefaultProducer {
throw ase;
}
Message message = getMessageForResponse(exchange);
- prepareResponse(result, message);
+ prepareIngestionJobResponse(result, message);
}
} else {
String knowledgeBaseId;
@@ -123,14 +127,66 @@ public class BedrockAgentProducer extends DefaultProducer {
builder.dataSourceId(dataSourceId);
StartIngestionJobResponse output = bedrockAgentClient.startIngestionJob(builder.build());
Message message = getMessageForResponse(exchange);
- prepareResponse(output, message);
+ prepareIngestionJobResponse(output, message);
+ }
+ }
+
+
+ private void listIngestionJobs(BedrockAgentClient bedrockAgentClient, Exchange exchange)
+ throws InvalidPayloadException {
+ if (getConfiguration().isPojoRequest()) {
+ Object payload = exchange.getMessage().getMandatoryBody();
+ if (payload instanceof ListIngestionJobsRequest) {
+ ListIngestionJobsResponse result;
+ try {
+ result = bedrockAgentClient.listIngestionJobs((ListIngestionJobsRequest) payload);
+ } catch (AwsServiceException ase) {
+ LOG.trace("Start Ingestion Job command returned the error code {}", ase.awsErrorDetails().errorCode());
+ throw ase;
+ }
+ Message message = getMessageForResponse(exchange);
+ prepareListIngestionJobsResponse(result, message);
+ }
+ } else {
+ String knowledgeBaseId;
+ String dataSourceId;
+ ListIngestionJobsRequest.Builder builder = ListIngestionJobsRequest.builder();
+ if (ObjectHelper.isEmpty(getConfiguration().getKnowledgeBaseId())) {
+ if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID))) {
+ knowledgeBaseId = exchange.getIn().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID, String.class);
+ } else {
+ throw new IllegalArgumentException("KnowledgeBaseId must be specified");
+ }
+ } else {
+ knowledgeBaseId = getConfiguration().getKnowledgeBaseId();
+ }
+ if (ObjectHelper.isEmpty(getConfiguration().getDataSourceId())) {
+ if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.DATASOURCE_ID))) {
+ dataSourceId = exchange.getIn().getHeader(BedrockAgentConstants.DATASOURCE_ID, String.class);
+ } else {
+ throw new IllegalArgumentException("DataSourceId must be specified");
+ }
+ } else {
+ dataSourceId = getConfiguration().getDataSourceId();
+ }
+ builder.knowledgeBaseId(knowledgeBaseId);
+ builder.dataSourceId(dataSourceId);
+ ListIngestionJobsResponse output = bedrockAgentClient.listIngestionJobs(builder.build());
+ Message message = getMessageForResponse(exchange);
+ prepareListIngestionJobsResponse(output, message);
}
}
- private void prepareResponse(StartIngestionJobResponse result, Message message) {
+ private void prepareIngestionJobResponse(StartIngestionJobResponse result, Message message) {
message.setBody(result.ingestionJob().ingestionJobId());
}
+ private void prepareListIngestionJobsResponse(ListIngestionJobsResponse result, Message message) {
+ if (result.hasIngestionJobSummaries()) {
+ message.setBody(result.ingestionJobSummaries());
+ }
+ }
+
public static Message getMessageForResponse(final Exchange exchange) {
return exchange.getMessage();
}
diff --git a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
index cf370902ac7..7d75c128ebc 100644
--- a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
+++ b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
@@ -29,10 +29,10 @@ import org.junit.jupiter.api.condition.EnabledIfSystemProperties;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
// Must be manually tested. Provide your own accessKey and secretKey using -Daws.manual.access.key and -Daws.manual.secret.key
-@EnabledIfSystemProperties({
+/*@EnabledIfSystemProperties({
@EnabledIfSystemProperty(named = "aws.manual.access.key", matches = ".*", disabledReason = "Access key not provided"),
@EnabledIfSystemProperty(named = "aws.manual.secret.key", matches = ".*", disabledReason = "Secret key not provided")
-})
+})*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class BedrockAgentProducerIT extends CamelTestSupport {
@@ -54,13 +54,28 @@ class BedrockAgentProducerIT extends CamelTestSupport {
MockEndpoint.assertIsSatisfied(context);
}
+ @Test
+ public void testListIngestionJobs() throws InterruptedException {
+
+ result.expectedMessageCount(1);
+ final Exchange result = template.send("direct:list_ingestion_jobs", exchange -> {
+ exchange.getMessage().setHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID, "QOZ68KOXTS");
+ exchange.getMessage().setHeader(BedrockAgentConstants.DATASOURCE_ID, "9V85PTUEAH");
+ });
+
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("direct:start_ingestion")
- .to("aws-bedrock-agent:label?accessKey=RAW({{aws.manual.access.key}})&secretKey=RAW({{aws.manual.secret.key}}®ion=us-east-1&operation=startIngestionJob&knowledgeBaseId=QOZ68KOXTS")
+ .to("aws-bedrock-agent:label?useDefaultCredentialsProvider=true®ion=us-east-1&operation=startIngestionJob")
+ .to(result);
+ from("direct:list_ingestion_jobs")
+ .to("aws-bedrock-agent:label?useDefaultCredentialsProvider=true®ion=us-east-1&operation=listIngestionJobs")
.log("${body}")
.to(result);
}