You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by sanjeet rath <ra...@gmail.com> on 2020/09/09 16:14:07 UTC

Modifying putElasticSearchHTTP processor to use AWS IAM role based awscredentialprovide service for access

Hi ,

We are using *AWS managed ElasticSearch* and our *nifi is hosted in EC2*.
I have a use case of building a custom processor on top of
putElasticSearchHTTP, where it will use aws IAM based role
awscredentialprovider service to connect AWS ElasticSearch.
This will be similar to PUTSQS where we are using IAM role based
awscredentialprovider service to connect SQS and its working fine.

But there is no awscredentailprovider controller service is available
in putElasticSearchHTTP.

So my plan is adding a *awscredentailprovider controller service to
putElasticSearchHTTP* , where i will use bellow code  to connect to
elasticsearch.

Is my approach correct ? Could you provide any better thought on this ?

public class AmazonElasticsearchServiceSample { private static String
serviceName = "es"; private static String region = "us-west-1"; private
static String aesEndpoint = "https://domain.us-west-1.es.amazonaws.com";
private static String payload = "{ \"type\": \"s3\", \"settings\": {
\"bucket\": \"your-bucket\", \"region\": \"us-west-1\", \"role_arn\": \"
arn:aws:iam::123456789012:role/TheServiceRole\" } }"; private static String
snapshotPath = "/_snapshot/my-snapshot-repo"; private static String
sampleDocument = "{" + "\"title\":\"Walk the Line\"," + "\"director\":\"James
Mangold\"," + "\"year\":\"2005\"}"; private static String indexingPath =
"/my-index/_doc"; static final AWSCredentialsProvider credentialsProvider =
new DefaultAWSCredentialsProviderChain(); public static void main(String[]
args) throws IOException { RestClient esClient = esClient(serviceName,
region); // Register a snapshot repository HttpEntity entity = new
NStringEntity(payload, ContentType.APPLICATION_JSON); Request request = new
Request("PUT", snapshotPath); request.setEntity(entity); //
request.addParameter(name, value); // optional parameters Response response
= esClient.performRequest(request);
System.out.println(response.toString()); // Index a document entity = new
NStringEntity(sampleDocument, ContentType.APPLICATION_JSON); String id = "1";
request = new Request("PUT", indexingPath + "/" + id);
request.setEntity(entity); // Using a String instead of an HttpEntity sets
Content-Type to application/json automatically. //
request.setJsonEntity(sampleDocument); response =
esClient.performRequest(request); System.out.println(response.toString()); }
public static RestClient esClient(String serviceName, String region) {
AWS4Signer signer = new AWS4Signer(); signer.setServiceName(serviceName);
signer.setRegionName(region); HttpRequestInterceptor interceptor = new
AWSRequestSigningApacheInterceptor(serviceName, signer,
credentialsProvider); return
RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(hacb
-> hacb.addInterceptorLast(interceptor)).build(); }
https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-request-signing.html



Regards,
Sanjeet

-- 
Sanjeet Kumar Rath,
mob- +91 8777577470

Re: Modifying putElasticSearchHTTP processor to use AWS IAM role based awscredentialprovide service for access

Posted by Mike Thomsen <mi...@gmail.com>.
You would probably be better off implementing your own controller
service with the same interface.

On Wed, Sep 9, 2020 at 10:13 PM sanjeet rath <ra...@gmail.com> wrote:
>
> Thank you MIke for the quick reply.
> I was really struggling with this functionality.
> i have gone through the code ,what i understood is i should use the "nifi-elastic-search-restapi-processor" project.
>
> In it the JsonQueryelasticSearch processor, it uses the "Client Service" Controller service. and i need to modify this controler. service to use AWS shared code which i shared with you in the trailed mail chain.
>
> Is my understanding is correct ?
>
> Regards,
> Sanjeet
>
>
>
> On Thu, Sep 10, 2020 at 3:18 AM Mike Thomsen <mi...@gmail.com> wrote:
>>
>> Sanjeet,
>>
>> As provided, this won't integrate well with the existing NiFi
>> processors. You would need to implement it as a controller service
>> object and update the processors to use it. Also, if you want to use
>> processors based on the official Elasticsearch client API, the ones
>> under the "REST API bundle" are the best fit because they already use
>> controller services that use the official Elastic clients.
>>
>> Thanks,
>>
>> Mike
>>
>> On Wed, Sep 9, 2020 at 12:14 PM sanjeet rath <ra...@gmail.com> wrote:
>> >
>> > Hi ,
>> >
>> > We are using AWS managed ElasticSearch and our nifi is hosted in EC2.
>> > I have a use case of building a custom processor on top of putElasticSearchHTTP, where it will use aws IAM based role awscredentialprovider service to connect AWS ElasticSearch.
>> > This will be similar to PUTSQS where we are using IAM role based awscredentialprovider service to connect SQS and its working fine.
>> >
>> > But there is no awscredentailprovider controller service is available in putElasticSearchHTTP.
>> >
>> > So my plan is adding a awscredentailprovider controller service to putElasticSearchHTTP , where i will use bellow code  to connect to elasticsearch.
>> >
>> > Is my approach correct ? Could you provide any better thought on this ?
>> >
>> > public class AmazonElasticsearchServiceSample { private static String serviceName = "es"; private static String region = "us-west-1"; private static String aesEndpoint = "https://domain.us-west-1.es.amazonaws.com"; private static String payload = "{ \"type\": \"s3\", \"settings\": { \"bucket\": \"your-bucket\", \"region\": \"us-west-1\", \"role_arn\": \"arn:aws:iam::123456789012:role/TheServiceRole\" } }"; private static String snapshotPath = "/_snapshot/my-snapshot-repo"; private static String sampleDocument = "{" + "\"title\":\"Walk the Line\"," + "\"director\":\"James Mangold\"," + "\"year\":\"2005\"}"; private static String indexingPath = "/my-index/_doc"; static final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); public static void main(String[] args) throws IOException { RestClient esClient = esClient(serviceName, region); // Register a snapshot repository HttpEntity entity = new NStringEntity(payload, ContentType.APPLICATION_JSON); Request request = new Request("PUT", snapshotPath); request.setEntity(entity); // request.addParameter(name, value); // optional parameters Response response = esClient.performRequest(request); System.out.println(response.toString()); // Index a document entity = new NStringEntity(sampleDocument, ContentType.APPLICATION_JSON); String id = "1"; request = new Request("PUT", indexingPath + "/" + id); request.setEntity(entity); // Using a String instead of an HttpEntity sets Content-Type to application/json automatically. // request.setJsonEntity(sampleDocument); response = esClient.performRequest(request); System.out.println(response.toString()); }
>> > public static RestClient esClient(String serviceName, String region) { AWS4Signer signer = new AWS4Signer(); signer.setServiceName(serviceName); signer.setRegionName(region); HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(serviceName, signer, credentialsProvider); return RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(hacb -> hacb.addInterceptorLast(interceptor)).build(); }
>> > https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-request-signing.html
>> >
>> >
>> >
>> > Regards,
>> > Sanjeet
>> >
>> > --
>> > Sanjeet Kumar Rath,
>> > mob- +91 8777577470
>> >
>
>
>
> --
> Sanjeet Kumar Rath,
> mob- +91 8777577470
>

Re: Modifying putElasticSearchHTTP processor to use AWS IAM role based awscredentialprovide service for access

Posted by sanjeet rath <ra...@gmail.com>.
Thank you MIke for the quick reply.
I was really struggling with this functionality.
i have gone through the code ,what i understood is i should use the
"nifi-elastic-search-restapi-processor" project.

In it the JsonQueryelasticSearch processor, it uses the "Client
Service" Controller service. and i need to modify this controler. service
to use AWS shared code which i shared with you in the trailed mail chain.

Is my understanding is correct ?

Regards,
Sanjeet



On Thu, Sep 10, 2020 at 3:18 AM Mike Thomsen <mi...@gmail.com> wrote:

> Sanjeet,
>
> As provided, this won't integrate well with the existing NiFi
> processors. You would need to implement it as a controller service
> object and update the processors to use it. Also, if you want to use
> processors based on the official Elasticsearch client API, the ones
> under the "REST API bundle" are the best fit because they already use
> controller services that use the official Elastic clients.
>
> Thanks,
>
> Mike
>
> On Wed, Sep 9, 2020 at 12:14 PM sanjeet rath <ra...@gmail.com>
> wrote:
> >
> > Hi ,
> >
> > We are using AWS managed ElasticSearch and our nifi is hosted in EC2.
> > I have a use case of building a custom processor on top of
> putElasticSearchHTTP, where it will use aws IAM based role
> awscredentialprovider service to connect AWS ElasticSearch.
> > This will be similar to PUTSQS where we are using IAM role based
> awscredentialprovider service to connect SQS and its working fine.
> >
> > But there is no awscredentailprovider controller service is available in
> putElasticSearchHTTP.
> >
> > So my plan is adding a awscredentailprovider controller service to
> putElasticSearchHTTP , where i will use bellow code  to connect to
> elasticsearch.
> >
> > Is my approach correct ? Could you provide any better thought on this ?
> >
> > public class AmazonElasticsearchServiceSample { private static String
> serviceName = "es"; private static String region = "us-west-1"; private
> static String aesEndpoint = "https://domain.us-west-1.es.amazonaws.com";
> private static String payload = "{ \"type\": \"s3\", \"settings\": {
> \"bucket\": \"your-bucket\", \"region\": \"us-west-1\", \"role_arn\":
> \"arn:aws:iam::123456789012:role/TheServiceRole\" } }"; private static
> String snapshotPath = "/_snapshot/my-snapshot-repo"; private static String
> sampleDocument = "{" + "\"title\":\"Walk the Line\"," +
> "\"director\":\"James Mangold\"," + "\"year\":\"2005\"}"; private static
> String indexingPath = "/my-index/_doc"; static final AWSCredentialsProvider
> credentialsProvider = new DefaultAWSCredentialsProviderChain(); public
> static void main(String[] args) throws IOException { RestClient esClient =
> esClient(serviceName, region); // Register a snapshot repository HttpEntity
> entity = new NStringEntity(payload, ContentType.APPLICATION_JSON); Request
> request = new Request("PUT", snapshotPath); request.setEntity(entity); //
> request.addParameter(name, value); // optional parameters Response response
> = esClient.performRequest(request);
> System.out.println(response.toString()); // Index a document entity = new
> NStringEntity(sampleDocument, ContentType.APPLICATION_JSON); String id =
> "1"; request = new Request("PUT", indexingPath + "/" + id);
> request.setEntity(entity); // Using a String instead of an HttpEntity sets
> Content-Type to application/json automatically. //
> request.setJsonEntity(sampleDocument); response =
> esClient.performRequest(request); System.out.println(response.toString()); }
> > public static RestClient esClient(String serviceName, String region) {
> AWS4Signer signer = new AWS4Signer(); signer.setServiceName(serviceName);
> signer.setRegionName(region); HttpRequestInterceptor interceptor = new
> AWSRequestSigningApacheInterceptor(serviceName, signer,
> credentialsProvider); return
> RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(hacb
> -> hacb.addInterceptorLast(interceptor)).build(); }
> >
> https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-request-signing.html
> >
> >
> >
> > Regards,
> > Sanjeet
> >
> > --
> > Sanjeet Kumar Rath,
> > mob- +91 8777577470
> >
>


-- 
Sanjeet Kumar Rath,
mob- +91 8777577470

Re: Modifying putElasticSearchHTTP processor to use AWS IAM role based awscredentialprovide service for access

Posted by Mike Thomsen <mi...@gmail.com>.
Sanjeet,

As provided, this won't integrate well with the existing NiFi
processors. You would need to implement it as a controller service
object and update the processors to use it. Also, if you want to use
processors based on the official Elasticsearch client API, the ones
under the "REST API bundle" are the best fit because they already use
controller services that use the official Elastic clients.

Thanks,

Mike

On Wed, Sep 9, 2020 at 12:14 PM sanjeet rath <ra...@gmail.com> wrote:
>
> Hi ,
>
> We are using AWS managed ElasticSearch and our nifi is hosted in EC2.
> I have a use case of building a custom processor on top of putElasticSearchHTTP, where it will use aws IAM based role awscredentialprovider service to connect AWS ElasticSearch.
> This will be similar to PUTSQS where we are using IAM role based awscredentialprovider service to connect SQS and its working fine.
>
> But there is no awscredentailprovider controller service is available in putElasticSearchHTTP.
>
> So my plan is adding a awscredentailprovider controller service to putElasticSearchHTTP , where i will use bellow code  to connect to elasticsearch.
>
> Is my approach correct ? Could you provide any better thought on this ?
>
> public class AmazonElasticsearchServiceSample { private static String serviceName = "es"; private static String region = "us-west-1"; private static String aesEndpoint = "https://domain.us-west-1.es.amazonaws.com"; private static String payload = "{ \"type\": \"s3\", \"settings\": { \"bucket\": \"your-bucket\", \"region\": \"us-west-1\", \"role_arn\": \"arn:aws:iam::123456789012:role/TheServiceRole\" } }"; private static String snapshotPath = "/_snapshot/my-snapshot-repo"; private static String sampleDocument = "{" + "\"title\":\"Walk the Line\"," + "\"director\":\"James Mangold\"," + "\"year\":\"2005\"}"; private static String indexingPath = "/my-index/_doc"; static final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); public static void main(String[] args) throws IOException { RestClient esClient = esClient(serviceName, region); // Register a snapshot repository HttpEntity entity = new NStringEntity(payload, ContentType.APPLICATION_JSON); Request request = new Request("PUT", snapshotPath); request.setEntity(entity); // request.addParameter(name, value); // optional parameters Response response = esClient.performRequest(request); System.out.println(response.toString()); // Index a document entity = new NStringEntity(sampleDocument, ContentType.APPLICATION_JSON); String id = "1"; request = new Request("PUT", indexingPath + "/" + id); request.setEntity(entity); // Using a String instead of an HttpEntity sets Content-Type to application/json automatically. // request.setJsonEntity(sampleDocument); response = esClient.performRequest(request); System.out.println(response.toString()); }
> public static RestClient esClient(String serviceName, String region) { AWS4Signer signer = new AWS4Signer(); signer.setServiceName(serviceName); signer.setRegionName(region); HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(serviceName, signer, credentialsProvider); return RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(hacb -> hacb.addInterceptorLast(interceptor)).build(); }
> https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-request-signing.html
>
>
>
> Regards,
> Sanjeet
>
> --
> Sanjeet Kumar Rath,
> mob- +91 8777577470
>