You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "jiang7chengzitc (Jira)" <ji...@apache.org> on 2020/11/18 14:58:00 UTC

[jira] [Created] (FLINK-20224) add username&password to provide a credential for es rest client

jiang7chengzitc created FLINK-20224:
---------------------------------------

             Summary: add username&password to provide a credential for es rest client
                 Key: FLINK-20224
                 URL: https://issues.apache.org/jira/browse/FLINK-20224
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / ElasticSearch
    Affects Versions: 1.11.0
            Reporter: jiang7chengzitc
             Fix For: 1.11.3


hello,

Flink ElasticSearch Connector use Java High Level REST Client to process request for index, delete, get, update, etc.  but some ES clusters (version 6 and higher) require security credentials to connect, So it can be considered to add username and password option to build security credentials, then  connect to this ES cluster.

for example:
{code:java}
//代码占位符
org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSink

@Override
protected SinkFunction<Tuple2<Boolean, Row>> createSinkFunction(
      List<Host> hosts,
      ActionRequestFailureHandler failureHandler,
      Map<SinkOption, String> sinkOptions,
      ElasticsearchUpsertSinkFunction upsertSinkFunction) {
  ......
  builder.setRestClientFactory(
   new DefaultRestClientFactory(
      Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT))
         .map(Integer::valueOf)
         .orElse(null),
      sinkOptions.get(REST_PATH_PREFIX),
      sinkOptions.get(USERNAME),
      sinkOptions.get(PASSWORD)));
  ......
}

@VisibleForTesting
static class DefaultRestClientFactory implements RestClientFactory {

   private Integer maxRetryTimeout;
   private String pathPrefix;
   private String username;
   private String password;

   public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix,@Nullable String username, @Nullable String password) {
      this.maxRetryTimeout = maxRetryTimeout;
      this.pathPrefix = pathPrefix;
      this.username = username;
      this.password = password;
   }

   @Override
   public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
      if (maxRetryTimeout != null) {
         restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);
      }
      if (pathPrefix != null) {
         restClientBuilder.setPathPrefix(pathPrefix);
      }
      // build credentialsProvider
      if (username != null && password != null) {
         final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
         credentialsProvider.setCredentials(AuthScope.ANY,
            new UsernamePasswordCredentials(username, password));
         restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
         );
      }
   }

   @Override
   public boolean equals(Object o) {
      if (this == o) {
         return true;
      }
      if (o == null || getClass() != o.getClass()) {
         return false;
      }
      DefaultRestClientFactory that = (DefaultRestClientFactory) o;
      return Objects.equals(maxRetryTimeout, that.maxRetryTimeout) &&
         Objects.equals(pathPrefix, that.pathPrefix) &&
         Objects.equals(username, that.username) &&
         Objects.equals(password, that.password);
   }

   @Override
   public int hashCode() {
      return Objects.hash(
         maxRetryTimeout,
         pathPrefix,
         username,
         password);
   }
}{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)