You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by cjolif <gi...@git.apache.org> on 2018/01/26 21:56:00 UTC

[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

GitHub user cjolif opened a pull request:

    https://github.com/apache/flink/pull/5374

    [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (TransportClient) and 6.x (RestHighLevelClient) support

    
     
    
    ## What is the purpose of the change
    
    *The purpose of this PR is to add Elasticsearch 6.X support on top of the RestHighLevelClient. Indeed TransportClient is now deprecated and will be removed in 8.X. Also the hosted version of Elasticsearch often forbid the use of TransportClient.*
    
    ## Brief change log
    
    * First a set of changes are borrowed from #4675:
      * Add createRequestIndex method in ElasticsearchApiCallBridge
      * Add flink-connector-elasticsearch5.3 project
      * Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert ActionRequest to DocWriteRequest
    * Then on top of these changes and of being able to create a RestHighLevelClient instead of TransportClient:
       * Add createClient method in ElasticsearchApiCallBridge. As TransportClient and RestClient have only the AutoCloseable interface in common, this is what the method returns.
       * Make ElasticsearchSinkBase agnostic to whether it is using a TransportClient or RestClient by adding a createBulkProcessorBuilder method on ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this method on all bridges. 
      * Add flink-connector-elasticsearch6 project leveraging Rest Client while all the other ones still use TransportClient.
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    * Elasticsearch test base has also been reworked a little bit to leverage it for testing the Rest client base implementation.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): if you use the elasticsearch6 project, this adds dependencies on elasticsearch 6
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`:  no
      - The serializers:  no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? docs & javadocs


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cjolif/flink es53-es6

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5374
    
----
commit b6a2396b31ade29071c65efa72df9f8f1fab9af4
Author: zjureel <zj...@...>
Date:   2017-09-15T03:51:35Z

    [FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add createRequestIndex method

commit 1e5b21a331dfaed50844e89986c313f5fc40bdbe
Author: zjureel <zj...@...>
Date:   2017-09-15T03:55:16Z

    [FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and later versions

commit 5a6e840c316095dd4f65f559405b19dcda7a1ca0
Author: zjureel <zj...@...>
Date:   2017-09-15T04:42:44Z

    [FLINK-7386] add test case for ES53

commit 574818f0f56f6a2b644e271a05a0796d90598aef
Author: zjureel <zj...@...>
Date:   2017-09-15T05:33:43Z

    [FLINK-7386] add document for ES5.3

commit 14168825507ad98c49a63be8ceab23dc539ff977
Author: Christophe Jolif <cj...@...>
Date:   2018-01-25T21:31:57Z

    [FLINK-8101] Elasticsearch 6.X REST support

----


---

[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

Posted by cjolif <gi...@git.apache.org>.
Github user cjolif commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5374#discussion_r165186422
  
    --- Diff: flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java ---
    @@ -0,0 +1,57 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.elasticsearch53;
    +
    +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    +
    +import org.elasticsearch.action.ActionRequest;
    +import org.elasticsearch.action.DocWriteRequest;
    +import org.elasticsearch.action.bulk.BulkProcessor;
    +
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +/**
    + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
    + * {@link ActionRequest ActionRequests} will be converted to {@link DocWriteRequest}
    + * and will be buffered before sending a bulk request to the Elasticsearch cluster.
    + */
    +public class BulkProcessorIndexer implements RequestIndexer {
    +
    +	private final BulkProcessor bulkProcessor;
    +	private final boolean flushOnCheckpoint;
    +	private final AtomicLong numPendingRequestsRef;
    +
    +	public BulkProcessorIndexer(BulkProcessor bulkProcessor,
    +								boolean flushOnCheckpoint,
    +								AtomicLong numPendingRequests) {
    +		this.bulkProcessor = bulkProcessor;
    +		this.flushOnCheckpoint = flushOnCheckpoint;
    +		this.numPendingRequestsRef = numPendingRequests;
    +	}
    +
    +	@Override
    +	public void add(ActionRequest... actionRequests) {
    +		for (ActionRequest actionRequest : actionRequests) {
    +			if (flushOnCheckpoint) {
    +				numPendingRequestsRef.getAndIncrement();
    +			}
    +			this.bulkProcessor.add((DocWriteRequest) actionRequest);
    --- End diff --
    
    This is actually from the commit I brought into the PR from orignal @zjureel's PR. That said I think the answer is definitely yes in the case that matters for Flink. Indeed:
    
    * The ActionRequest values here are actually coming from the implementation of the `ElasticsearchSinkFunction.process` method which should create `ActionRequest` and add them to the indexer.
    * The idea here is not to create any sort of `ActionRequest` you would possibly dream of but indexing requests?
    * The way to create `ActionRequest` for indexing in Elasticsearch is to use `org.elasticsearch.action.index.IndexRequest` 
    * starting with Elasticsearch 5.3 IndexRequest inherits from `DocWriteRequest` while it was not before 5.3.
    
    See: 
    
    ![image](https://user-images.githubusercontent.com/623171/35646706-5723ab78-06d0-11e8-8d50-5b4545047a1f.png)
    
    vs
    
    ![image](https://user-images.githubusercontent.com/623171/35646719-63d7f1b2-06d0-11e8-8308-c330b3c11dad.png)
    
    So the only case I see where this could not be a `DocWriteRequest` would be if someone in the `ElasticsearchSinkFunction` would create something else than an index request. But I don't really see why? 
    
    That said this raises the question of why from the origin the API was not typed against `IndexRequest` instead of `ActionRequest` as this would avoid those questions and force the user to return a `IndexRequest`?
    
    In every case there is little choice because starting with 5.3 Elasticsearch does not accept ActionRequest in BulkProcessor anymore but just IndexRequest/DocWriteRequest.
    
    Do you have a suggestion on how to handle this better? Obviously I can add documentation saying starting with 5.3 the sink function MUST return DocWriteRequest? But is that enough for you?



---

[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5374
  
    @cjolif sorry, I didn't have the chance, yet.
    
    But I agree that we should try getting this in for 1.5 (I saw your reply on the 1.5 release discussion thread in the mailing lists). I'll try to get back to this as soon as possible.


---

[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

Posted by cjolif <gi...@git.apache.org>.
Github user cjolif closed the pull request at:

    https://github.com/apache/flink/pull/5374


---

[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

Posted by cjolif <gi...@git.apache.org>.
Github user cjolif commented on the issue:

    https://github.com/apache/flink/pull/5374
  
    @tzulitai sure. done.


---

[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

Posted by cjolif <gi...@git.apache.org>.
Github user cjolif commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5374#discussion_r164992369
  
    --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java ---
    @@ -32,7 +32,6 @@
     import org.elasticsearch.action.bulk.BulkProcessor;
     import org.elasticsearch.action.bulk.BulkRequest;
     import org.elasticsearch.action.bulk.BulkResponse;
    -import org.elasticsearch.client.Client;
    --- End diff --
    
    fixed.


---

[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

Posted by yew1eb <gi...@git.apache.org>.
Github user yew1eb commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5374#discussion_r164961589
  
    --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java ---
    @@ -32,7 +32,6 @@
     import org.elasticsearch.action.bulk.BulkProcessor;
     import org.elasticsearch.action.bulk.BulkRequest;
     import org.elasticsearch.action.bulk.BulkResponse;
    -import org.elasticsearch.client.Client;
    --- End diff --
    
    Should not be deleted, be used for {@link Client}.


---

[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5374
  
    @cjolif since we have now reached a conclusion on where the Elasticsearch connector should be improved in the future, could you maybe close this PR? I assume a new PR will be opened that subsumes this one.


---

[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

Posted by yew1eb <gi...@git.apache.org>.
Github user yew1eb commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5374#discussion_r165035180
  
    --- Diff: flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/Elasticsearch53ApiCallBridge.java ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.elasticsearch53;
    +
    +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
    +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
    +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    +import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.elasticsearch.action.bulk.BackoffPolicy;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkProcessor;
    +import org.elasticsearch.client.Client;
    +import org.elasticsearch.client.transport.TransportClient;
    +import org.elasticsearch.common.network.NetworkModule;
    +import org.elasticsearch.common.settings.Settings;
    +import org.elasticsearch.common.transport.TransportAddress;
    +import org.elasticsearch.common.unit.TimeValue;
    +import org.elasticsearch.transport.Netty3Plugin;
    +import org.elasticsearch.transport.client.PreBuiltTransportClient;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.net.InetSocketAddress;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +/**
    + * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.3 and later versions.
    + */
    +public class Elasticsearch53ApiCallBridge extends ElasticsearchApiCallBridge {
    +
    +	private static final long serialVersionUID = -5222683870097809633L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch53ApiCallBridge.class);
    +
    +	/**
    +	 * User-provided transport addresses.
    +	 *
    +	 * <p>We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 5.x.
    +	 */
    +	private final List<InetSocketAddress> transportAddresses;
    +
    +	Elasticsearch53ApiCallBridge(List<InetSocketAddress> transportAddresses) {
    +		Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
    +		this.transportAddresses = transportAddresses;
    +	}
    +
    +	@Override
    +	public AutoCloseable createClient(Map<String, String> clientConfig) {
    +		Settings settings = Settings.builder().put(clientConfig)
    +			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
    +			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
    +			.build();
    +
    +		TransportClient transportClient = new PreBuiltTransportClient(settings);
    +		for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
    +			transportClient.addTransportAddress(transport);
    +		}
    +
    +		// verify that we actually are connected to a cluster
    +		if (transportClient.connectedNodes().isEmpty()) {
    +			throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
    +		}
    +
    +		if (LOG.isInfoEnabled()) {
    --- End diff --
    
    Check` Logger info Enabled` is unnecessary, because in the info() method of Logger itself `if(Logger. isInfoEnabled()) `was checked.


---

[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

Posted by yew1eb <gi...@git.apache.org>.
Github user yew1eb commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5374#discussion_r165032079
  
    --- Diff: flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java ---
    @@ -0,0 +1,57 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.elasticsearch53;
    +
    +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    +
    +import org.elasticsearch.action.ActionRequest;
    +import org.elasticsearch.action.DocWriteRequest;
    +import org.elasticsearch.action.bulk.BulkProcessor;
    +
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +/**
    + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
    + * {@link ActionRequest ActionRequests} will be converted to {@link DocWriteRequest}
    + * and will be buffered before sending a bulk request to the Elasticsearch cluster.
    + */
    +public class BulkProcessorIndexer implements RequestIndexer {
    +
    +	private final BulkProcessor bulkProcessor;
    +	private final boolean flushOnCheckpoint;
    +	private final AtomicLong numPendingRequestsRef;
    +
    +	public BulkProcessorIndexer(BulkProcessor bulkProcessor,
    +								boolean flushOnCheckpoint,
    +								AtomicLong numPendingRequests) {
    +		this.bulkProcessor = bulkProcessor;
    +		this.flushOnCheckpoint = flushOnCheckpoint;
    +		this.numPendingRequestsRef = numPendingRequests;
    +	}
    +
    +	@Override
    +	public void add(ActionRequest... actionRequests) {
    +		for (ActionRequest actionRequest : actionRequests) {
    +			if (flushOnCheckpoint) {
    +				numPendingRequestsRef.getAndIncrement();
    +			}
    +			this.bulkProcessor.add((DocWriteRequest) actionRequest);
    --- End diff --
    
    Are you sure that `ActionRequest` type can be cast to `DocWriteRequest` type?


---

[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

Posted by cjolif <gi...@git.apache.org>.
Github user cjolif commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5374#discussion_r165109918
  
    --- Diff: flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/Elasticsearch53ApiCallBridge.java ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.elasticsearch53;
    +
    +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
    +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
    +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    +import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.elasticsearch.action.bulk.BackoffPolicy;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkProcessor;
    +import org.elasticsearch.client.Client;
    +import org.elasticsearch.client.transport.TransportClient;
    +import org.elasticsearch.common.network.NetworkModule;
    +import org.elasticsearch.common.settings.Settings;
    +import org.elasticsearch.common.transport.TransportAddress;
    +import org.elasticsearch.common.unit.TimeValue;
    +import org.elasticsearch.transport.Netty3Plugin;
    +import org.elasticsearch.transport.client.PreBuiltTransportClient;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.net.InetSocketAddress;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +/**
    + * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.3 and later versions.
    + */
    +public class Elasticsearch53ApiCallBridge extends ElasticsearchApiCallBridge {
    +
    +	private static final long serialVersionUID = -5222683870097809633L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch53ApiCallBridge.class);
    +
    +	/**
    +	 * User-provided transport addresses.
    +	 *
    +	 * <p>We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 5.x.
    +	 */
    +	private final List<InetSocketAddress> transportAddresses;
    +
    +	Elasticsearch53ApiCallBridge(List<InetSocketAddress> transportAddresses) {
    +		Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
    +		this.transportAddresses = transportAddresses;
    +	}
    +
    +	@Override
    +	public AutoCloseable createClient(Map<String, String> clientConfig) {
    +		Settings settings = Settings.builder().put(clientConfig)
    +			.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
    +			.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
    +			.build();
    +
    +		TransportClient transportClient = new PreBuiltTransportClient(settings);
    +		for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
    +			transportClient.addTransportAddress(transport);
    +		}
    +
    +		// verify that we actually are connected to a cluster
    +		if (transportClient.connectedNodes().isEmpty()) {
    +			throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
    +		}
    +
    +		if (LOG.isInfoEnabled()) {
    --- End diff --
    
    I think this is just a copy/paste of the practices in the other pre-exisiting bridges ;) But that should be fixed now.


---

[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

Posted by cjolif <gi...@git.apache.org>.
Github user cjolif commented on the issue:

    https://github.com/apache/flink/pull/5374
  
    thanks @tzulitai. Let me know if you have any question.


---

[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

Posted by cjolif <gi...@git.apache.org>.
Github user cjolif commented on the issue:

    https://github.com/apache/flink/pull/5374
  
    @tzulitai did you have a chance to look at this? If you have any question please let me know?


---

[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

Posted by jdewald <gi...@git.apache.org>.
Github user jdewald commented on the issue:

    https://github.com/apache/flink/pull/5374
  
    We were in the process of testing the Flink integration with the ES High Level Rest client in order to perform upgrades to ES6 and stumbled on some issues related to the bulk rejection retry handling. In short, swapping in the HLR backend will not perform retries correctly and it is not actually safe to simply call `add` on the `RequestIndexer` that is sent into the `ActionRequestFailureHander` from the Flink-side sink due to some synchronization issues. 
    
    I have created a bug report that has been accepted on the Elasticsearch side https://github.com/elastic/elasticsearch/issues/28885 to automatically perform retries.
    
    One additional note however is that in the case where the retries have been used up from the `BackoffPolicy`, using the `RetryRejectionFailureHandler` appears to cause a deadlock, so this may be something to fix on the ElasticsearchSink side. But it may be necessary to independently confirm that in case that is an issue particular to my testing. (TL;DR: The RetryRejectionFailureHandler is not safe to use with HLR and I am not certain it's safe to use with the `TransportClient` based one either in the case that it actually triggers). 


---

[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5374
  
    @cjolif
    Thanks a lot for the contribution! I’ll add reviewing the PR to my backlog.
    I would like / will try to take a look at the PR by the end of this week, after some 1.4.1 blockers which I’m still busy with.


---

[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

Posted by cjolif <gi...@git.apache.org>.
Github user cjolif commented on the issue:

    https://github.com/apache/flink/pull/5374
  
    good news the issue reported by @jdewald is now fixed in Elasticsearch master. Not sure which version of Elasticsearch will contain the fix though. 


---