You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "jiang7chengzitc (Jira)" <ji...@apache.org> on 2020/11/18 14:58:00 UTC
[jira] [Created] (FLINK-20224) add username&password to provide a
credential for es rest client
jiang7chengzitc created FLINK-20224:
---------------------------------------
Summary: add username&password to provide a credential for es rest client
Key: FLINK-20224
URL: https://issues.apache.org/jira/browse/FLINK-20224
Project: Flink
Issue Type: Improvement
Components: Connectors / ElasticSearch
Affects Versions: 1.11.0
Reporter: jiang7chengzitc
Fix For: 1.11.3
hello,
Flink ElasticSearch Connector use Java High Level REST Client to process request for index, delete, get, update, etc. but some ES clusters (version 6 and higher) require security credentials to connect, So it can be considered to add username and password option to build security credentials, then connect to this ES cluster.
for example:
{code:java}
//代码占位符
org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSink
@Override
protected SinkFunction<Tuple2<Boolean, Row>> createSinkFunction(
List<Host> hosts,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions,
ElasticsearchUpsertSinkFunction upsertSinkFunction) {
......
builder.setRestClientFactory(
new DefaultRestClientFactory(
Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT))
.map(Integer::valueOf)
.orElse(null),
sinkOptions.get(REST_PATH_PREFIX),
sinkOptions.get(USERNAME),
sinkOptions.get(PASSWORD)));
......
}
@VisibleForTesting
static class DefaultRestClientFactory implements RestClientFactory {
private Integer maxRetryTimeout;
private String pathPrefix;
private String username;
private String password;
public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix,@Nullable String username, @Nullable String password) {
this.maxRetryTimeout = maxRetryTimeout;
this.pathPrefix = pathPrefix;
this.username = username;
this.password = password;
}
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
if (maxRetryTimeout != null) {
restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);
}
if (pathPrefix != null) {
restClientBuilder.setPathPrefix(pathPrefix);
}
// build credentialsProvider
if (username != null && password != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultRestClientFactory that = (DefaultRestClientFactory) o;
return Objects.equals(maxRetryTimeout, that.maxRetryTimeout) &&
Objects.equals(pathPrefix, that.pathPrefix) &&
Objects.equals(username, that.username) &&
Objects.equals(password, that.password);
}
@Override
public int hashCode() {
return Objects.hash(
maxRetryTimeout,
pathPrefix,
username,
password);
}
}{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)