You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sbcd90 <gi...@git.apache.org> on 2016/05/04 04:27:31 UTC

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

GitHub user sbcd90 opened a pull request:

    https://github.com/apache/flink/pull/1962

    [FLINK-3857][Streaming Connectors]Add reconnect attempt to Elasticsearch host

    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-3857] Add reconnect attempt to Elasticsearch host")
    
    - [ ] Documentation
      - Documentation added based on the changes made.
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sbcd90/flink elasticSearchRetryIssue

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1962.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1962
    
----
commit 62990c984f0d2eca3ba89ed9c2d22c469f16b136
Author: Subhobrata Dey <sb...@gmail.com>
Date:   2016-05-04T02:16:35Z

    [FLINK-3857][Streaming Connectors]Add reconnect attempt to Elasticsearch host

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r123150036
  
    --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java ---
    @@ -208,6 +222,13 @@ public void invoke(T value) throws Exception {
     		checkErrorAndRethrow();
     
     		elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
    +
    +		// if there is a connectivity failure, then retry
    +		if (failureThrowable.get() != null &&
    --- End diff --
    
    I'm wondering whether or not checking the exception type would be enough for verifying the connectivity failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Hi @sbcd90, will you like to continue working on this PR?
    
    There's going to be a restructuring of the ES connectors (#3112) perhaps soon after the 1.2 release, and this PR will very likely need a rebase. I'd like to include this fix after the restructuring, so please let me know on how you'd like to proceed with this contribution :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Thank you for picking this up again @sbcd90. I would wait until #3112 is merged before rebasing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r67637714
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -166,30 +265,37 @@ public void open(Configuration configuration) {
     			transportClient.addTransportAddress(transport);
     		}
     
    -		// verify that we actually are connected to a cluster
    +		return transportClient;
    +	}
    +
    +	private boolean checkConnectionStatus(TransportClient transportClient) {
    --- End diff --
    
    Static would be better for utility function that will be called a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r66388796
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -153,9 +165,90 @@ public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress>
     	 */
     	@Override
     	public void open(Configuration configuration) {
    +		connect();
    --- End diff --
    
    I think we should try to avoid side-effecting class field members within utility member functions. It reduces the readability of the code. Return the created TransportClient instead, and have a separate utility function to check if connection is lost?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r62172611
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -177,7 +180,72 @@ public void open(Configuration configuration) {
     		if (LOG.isInfoEnabled()) {
     			LOG.info("Created Elasticsearch TransportClient {}", client);
     		}
    +	}
    +
    +	@Override
    +	public void invoke(final T element) {
    +		ParameterTool params = ParameterTool.fromMap(userConfig);
    --- End diff --
    
    Its very inefficient to create a new `ParameterTool` instance for each incoming element.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r67636257
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -166,30 +265,37 @@ public void open(Configuration configuration) {
     			transportClient.addTransportAddress(transport);
     		}
     
    -		// verify that we actually are connected to a cluster
    +		return transportClient;
    +	}
    +
    +	private boolean checkConnectionStatus(TransportClient transportClient) {
    +		// Check if client is connected to any Elasticsearch nodes
     		ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes());
     		if (nodes.isEmpty()) {
    -			throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
    -		}
    +			LOG.error("Client is not connected to any Elasticsearch nodes!");
    --- End diff --
    
    Warn debug level instead of error might be more suitable? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Hi @tzulitai , thanks for the updates. I'll refactor the code & will rebase the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r123150100
  
    --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java ---
    @@ -234,4 +255,36 @@ private void checkErrorAndRethrow() {
     			throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
     		}
     	}
    +
    +	private void retry(T value) throws Exception {
    +		int retryCounter = 1;
    +
    +		while (retryCounter <= connectionRetries) {
    +			if (bulkProcessor != null) {
    +				bulkProcessor.close();
    +				bulkProcessor = null;
    +			}
    +
    +			if (client != null) {
    +				client.close();
    +			}
    +
    +			try {
    +				open(null);
    --- End diff --
    
    This open() call seems a bit odd to me. I don't think its a good practice to call that here, since essentially its a life cycle method used by the system.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1962#issuecomment-217126041
  
    The change is missing documentation updates & test cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r67636217
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -166,30 +265,37 @@ public void open(Configuration configuration) {
     			transportClient.addTransportAddress(transport);
     		}
     
    -		// verify that we actually are connected to a cluster
    +		return transportClient;
    +	}
    +
    +	private boolean checkConnectionStatus(TransportClient transportClient) {
    +		// Check if client is connected to any Elasticsearch nodes
     		ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes());
     		if (nodes.isEmpty()) {
    -			throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
    -		}
    +			LOG.error("Client is not connected to any Elasticsearch nodes!");
    +			return false;
    +		} else {
     
    -		client = transportClient;
    -
    -		if (LOG.isInfoEnabled()) {
    -			LOG.info("Created Elasticsearch TransportClient {}", client);
    +			if (LOG.isInfoEnabled()) {
    +				LOG.info("Created Elasticsearch TransportClient {}", client);
    +			}
    --- End diff --
    
    Log message here doesn't match the purpose of this method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Hello @tzulitai ,
    
    I have rebased the changes. Can you please review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1962#issuecomment-217125875
  
    Thank you for opening the pull request.
    
    I made some inline comments.
    I don't think the proposed changes fix the issue described in the JIRA.
    I would check on each `invoke()` if `hasFailure` is set. If that's the case, you can reconnect to EL.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r62172571
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -86,6 +88,7 @@
     	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
     	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
     	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
    +	public static final String CONFIG_NO_OF_CONN_RETRIES = "conn.retries";
    --- End diff --
    
    We usually don't abbreviate configuration keys. Can you rename it to "CONFIG_KEY_CONNECTION_RETRIES" ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r62174087
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -177,7 +180,72 @@ public void open(Configuration configuration) {
     		if (LOG.isInfoEnabled()) {
     			LOG.info("Created Elasticsearch TransportClient {}", client);
     		}
    +	}
    +
    +	@Override
    +	public void invoke(final T element) {
    +		ParameterTool params = ParameterTool.fromMap(userConfig);
    +
    +		if (params.has(CONFIG_NO_OF_CONN_RETRIES) && params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
    +			final Timer timer = new Timer(true);
    +			TimerTask task = new TimerTask() {
    +				@Override
    +				public void run() {
    +					// verify that we actually are connected to a cluster
    +					ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(((TransportClient) client).connectedNodes());
    +					if (nodes.isEmpty()) {
    +						if (LOG.isInfoEnabled()) {
    +							LOG.info("Connection Lost..Trying to reconnect to Elasticsearch nodes...");
    +						}
    +						open(new Configuration());
    --- End diff --
    
    I would not recommend calling the open() method from invoke(). Open() is a "lifecycle" method called by Flink. You should assume its only called one.
    However, you can move the (re)connect logic into a separate method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    I took a quick look at this. I am wondering if this actually needs an extra timer service for retries.
    
    Can this be solved without a timer? The failures could be detected in the `invoke(...)` method, and the retry done directly there (with some minimal backoff or so). 
    
    Triggering asynchronous timers is very complex and easily creates leaks, races, or leftover work / tasks at shutdown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the pull request:

    https://github.com/apache/flink/pull/1962#issuecomment-216874916
  
    Hello @fhueske , The tests do not fail because of the changes made in the PR. I tested the Junits for elasticsearch connector & all of them runs fine.
    Can you kindly have a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r67636398
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -153,9 +165,96 @@ public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress>
     	 */
     	@Override
     	public void open(Configuration configuration) {
    +		TransportClient transportClient = connect();
    +
    +		if (checkConnectionStatus(transportClient)) {
    +			client = transportClient;
    +		}
    --- End diff --
    
    retry in open() if not connected to any nodes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r62173042
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -177,7 +180,72 @@ public void open(Configuration configuration) {
     		if (LOG.isInfoEnabled()) {
     			LOG.info("Created Elasticsearch TransportClient {}", client);
     		}
    +	}
    +
    +	@Override
    +	public void invoke(final T element) {
    +		ParameterTool params = ParameterTool.fromMap(userConfig);
    +
    +		if (params.has(CONFIG_NO_OF_CONN_RETRIES) && params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
    +			final Timer timer = new Timer(true);
    +			TimerTask task = new TimerTask() {
    +				@Override
    +				public void run() {
    +					// verify that we actually are connected to a cluster
    +					ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(((TransportClient) client).connectedNodes());
    +					if (nodes.isEmpty()) {
    +						if (LOG.isInfoEnabled()) {
    +							LOG.info("Connection Lost..Trying to reconnect to Elasticsearch nodes...");
    +						}
    +						open(new Configuration());
    +					} else {
    +						timer.cancel();
    +						intializeAndCallElasticSearchSinkFunction(element);
    +					}
    +				}
    +			};
    +
    +			timer.scheduleAtFixedRate(task, 0, 3000);
    +
    +			try {
    +				Thread.sleep(3000 * params.getInt(CONFIG_NO_OF_CONN_RETRIES));
    +			} catch (InterruptedException e) {
    +				throw new RuntimeException(e.getMessage());
    +			}
    +			timer.cancel();
    +			// verify that we actually are connected to a cluster
    +			ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(((TransportClient) client).connectedNodes());
    +			if (nodes.isEmpty()) {
    +				throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
    +			}
    +		} else {
    +			intializeAndCallElasticSearchSinkFunction(element);
    +		}
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (bulkProcessor != null) {
    +			bulkProcessor.close();
    +			bulkProcessor = null;
    +		}
    +
    +		if (client != null) {
    +			client.close();
    +		}
    +
    +		if (hasFailure.get()) {
    +			Throwable cause = failureThrowable.get();
    +			if (cause != null) {
    +				throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
    +			} else {
    +				throw new RuntimeException("An error occured in ElasticsearchSink.");
    +			}
    +		}
     
    +	}
    +
    +	private void intializeAndCallElasticSearchSinkFunction(T element) {
    --- End diff --
    
    Where is the method initializing the ES sink?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r62172954
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -177,7 +180,72 @@ public void open(Configuration configuration) {
     		if (LOG.isInfoEnabled()) {
     			LOG.info("Created Elasticsearch TransportClient {}", client);
     		}
    +	}
    +
    +	@Override
    +	public void invoke(final T element) {
    +		ParameterTool params = ParameterTool.fromMap(userConfig);
    +
    +		if (params.has(CONFIG_NO_OF_CONN_RETRIES) && params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
    +			final Timer timer = new Timer(true);
    +			TimerTask task = new TimerTask() {
    +				@Override
    +				public void run() {
    +					// verify that we actually are connected to a cluster
    +					ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(((TransportClient) client).connectedNodes());
    +					if (nodes.isEmpty()) {
    +						if (LOG.isInfoEnabled()) {
    +							LOG.info("Connection Lost..Trying to reconnect to Elasticsearch nodes...");
    +						}
    +						open(new Configuration());
    --- End diff --
    
    The passed `element` is never send to ES in this case, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r123150004
  
    --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java ---
    @@ -208,6 +222,13 @@ public void invoke(T value) throws Exception {
     		checkErrorAndRethrow();
     
     		elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
    +
    +		// if there is a connectivity failure, then retry
    +		if (failureThrowable.get() != null &&
    +			client instanceof TransportClient &&
    --- End diff --
    
    Why exactly does the client need to be a `TransportClient`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Hi @sbcd90,
    
    Gave the changes a quick review and commented. please let me know your opinion on them. Hope they'll be helpful to get you going.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r66388492
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -153,9 +165,90 @@ public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress>
     	 */
     	@Override
     	public void open(Configuration configuration) {
    +		connect();
    +
    +		params = ParameterTool.fromMap(userConfig);
    +
    +		if (params.has(CONFIG_KEY_CONNECTION_RETRIES)) {
    +			this.connectionRetries = params.getInt(CONFIG_KEY_CONNECTION_RETRIES);
    +		}
    +
    +		buildBulkProcessorIndexer(client);
    +	}
    +
    +	@Override
    +	public void invoke(T element) {
    +		elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer);
    +
    +		if (hasFailure.get()) {
    --- End diff --
    
    Another problem with this implementation is that we're capturing the failure of a bulk operation, but only retrying for the currently processed element.
    
    Would it be simpler to check whether the Elasticsearch client is still connected to any nodes before `elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer);` via `client.connectedNodes().size()`? If not, then we retry establishing connection before processing the element.
    
    Another approach that might be better is to let the BulkProcessor set a `lostConnection` flag if it gets thrown a ES connection error for a batch, and we simply check the flag before in invoke() before doing anything else. But still we will need have a way to handle all incorrectly records in that batch due to the lost connection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the pull request:

    https://github.com/apache/flink/pull/1962#issuecomment-217298024
  
    Hello @rmetzger ,
    
    Thanks a lot for reviewing the PR.
    I have made all the changes mentioned by you as inline comments as well as added some documentation.
    Kindly have a look now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Hi @sbcd90,
    Sorry for the late reply, as I'm currently busy some other things. I'll be happy to help review again within the next 2~3 days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1962#issuecomment-217373539
  
    How did you test the code you've implemented in this pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r62172911
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -177,7 +180,72 @@ public void open(Configuration configuration) {
     		if (LOG.isInfoEnabled()) {
     			LOG.info("Created Elasticsearch TransportClient {}", client);
     		}
    +	}
    +
    +	@Override
    +	public void invoke(final T element) {
    +		ParameterTool params = ParameterTool.fromMap(userConfig);
    +
    +		if (params.has(CONFIG_NO_OF_CONN_RETRIES) && params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
    --- End diff --
    
    This is two hashmap lookups for each element. The check can be done only once in the open method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r67636370
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -166,30 +265,37 @@ public void open(Configuration configuration) {
     			transportClient.addTransportAddress(transport);
     		}
     
    -		// verify that we actually are connected to a cluster
    +		return transportClient;
    +	}
    +
    +	private boolean checkConnectionStatus(TransportClient transportClient) {
    --- End diff --
    
    `isConnected(TransportClient)` is a more suitable name for this method. Also, can simply return as
    `return ImmutableList.copyOf(transportClient.connectedNodes()).isEmpty()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r66385681
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -153,9 +165,90 @@ public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress>
     	 */
     	@Override
     	public void open(Configuration configuration) {
    +		connect();
    +
    +		params = ParameterTool.fromMap(userConfig);
    +
    +		if (params.has(CONFIG_KEY_CONNECTION_RETRIES)) {
    +			this.connectionRetries = params.getInt(CONFIG_KEY_CONNECTION_RETRIES);
    +		}
    +
    +		buildBulkProcessorIndexer(client);
    +	}
    +
    +	@Override
    +	public void invoke(T element) {
    +		elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer);
    +
    +		if (hasFailure.get()) {
    --- End diff --
    
    Won't there be other causes of failure besides connection error? Attempting to reconnect for every kind of failure doesn't seem right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r66388852
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -153,9 +165,90 @@ public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress>
     	 */
     	@Override
     	public void open(Configuration configuration) {
    +		connect();
    +
    +		params = ParameterTool.fromMap(userConfig);
    +
    +		if (params.has(CONFIG_KEY_CONNECTION_RETRIES)) {
    +			this.connectionRetries = params.getInt(CONFIG_KEY_CONNECTION_RETRIES);
    +		}
    +
    +		buildBulkProcessorIndexer(client);
    --- End diff --
    
    Same here. Should try to avoid side-effecting class field members within utility member functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r66385603
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -153,9 +165,90 @@ public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress>
     	 */
     	@Override
     	public void open(Configuration configuration) {
    +		connect();
    +
    +		params = ParameterTool.fromMap(userConfig);
    +
    +		if (params.has(CONFIG_KEY_CONNECTION_RETRIES)) {
    +			this.connectionRetries = params.getInt(CONFIG_KEY_CONNECTION_RETRIES);
    +		}
    --- End diff --
    
    Need to have a default value set if not specified by user? Otherwise null exception in invoke().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by HungUnicorn <gi...@git.apache.org>.
Github user HungUnicorn commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Using the sniffing feature of transport client can achieve this.
    The client will connect to all existing nodes and the connected list is updated every 5 seconds. It can fit our case because we will only have to specify one ip, and we will obtain a list of ips which updated periodically. It's done by
    `Settings settings = Settings.settingsBuilder().put(userConfig).put("client.transport.sniff", true).build();`
    
    Explanation:
    > The Transport client comes with a cluster sniffing feature which allows it to dynamically add new hosts and remove old ones. When sniffing is enabled the the transport client will connect to the nodes in its internal node list, which is built via calls to addTransportAddress. After this, the client will call the internal cluster state API on those nodes to discover available data nodes. The internal node list of the client will be replaced with those data nodes only. This list is refreshed every five seconds by default. 
    
    Source:
    https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.3//transport-client.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Thanks @HungUnicorn, thats useful info. I wonder though if this config should be set by the user, instead of letting the connector internally set this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Hello @tzulitai ,
    
    I think default value for int in Java is 0.
    The check if connection is lost or not & then retry for connection is a good suggestion. Made the change.
    separated the methods for connection creation & connection status check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r62173382
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -177,7 +180,72 @@ public void open(Configuration configuration) {
     		if (LOG.isInfoEnabled()) {
     			LOG.info("Created Elasticsearch TransportClient {}", client);
     		}
    +	}
    +
    +	@Override
    +	public void invoke(final T element) {
    +		ParameterTool params = ParameterTool.fromMap(userConfig);
    +
    +		if (params.has(CONFIG_NO_OF_CONN_RETRIES) && params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
    +			final Timer timer = new Timer(true);
    +			TimerTask task = new TimerTask() {
    +				@Override
    +				public void run() {
    +					// verify that we actually are connected to a cluster
    +					ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(((TransportClient) client).connectedNodes());
    +					if (nodes.isEmpty()) {
    +						if (LOG.isInfoEnabled()) {
    +							LOG.info("Connection Lost..Trying to reconnect to Elasticsearch nodes...");
    +						}
    +						open(new Configuration());
    --- End diff --
    
    The old client is not closed.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the pull request:

    https://github.com/apache/flink/pull/1962#issuecomment-217342680
  
    Hello @rmetzger ,
    
    Looking at the test case `ElasticsearchSinkItCase.testTransportClient`, I think to test the re-connect scenario the `hasFailure` may need to be made `public` so that the test-method can set it.
    Can you kindly provide some suggestions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Hi @sbcd90,
    I think to address "add reconnect attempt" alone, checking whether or not the transport client is connected to nodes and retry connect if lost connection in `invoke()` before processing the element should be fine.
    
    On the other hand, another problem that raises if we are to add reconnect attempt for the ES sink is that failing records due to connection errors also need to be caught in the `BulkProcessor` `afterBulk()` callback and re-processed. I wonder if we should be solving this together to resolve this issue. @rmetzger, what's your opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1962: [FLINK-3857][Streaming Connectors]Add reconnect at...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1962#discussion_r123149965
  
    --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java ---
    @@ -234,4 +255,36 @@ private void checkErrorAndRethrow() {
     			throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
     		}
     	}
    +
    +	private void retry(T value) throws Exception {
    +		int retryCounter = 1;
    +
    +		while (retryCounter <= connectionRetries) {
    +			if (bulkProcessor != null) {
    +				bulkProcessor.close();
    +				bulkProcessor = null;
    +			}
    +
    +			if (client != null) {
    +				client.close();
    +			}
    +
    +			try {
    +				open(null);
    +				elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
    +			} catch (Exception ex) {
    +				if (client instanceof TransportClient && !callBridge.isConnected(((TransportClient) client))) {
    +					TimeUnit.SECONDS.sleep(3);
    --- End diff --
    
    Should this be configurable?
    Also, could you explain a bit on why you've chosen 3 seconds?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3857][Streaming Connectors]Add reconnec...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the pull request:

    https://github.com/apache/flink/pull/1962#issuecomment-217540405
  
    Hello @rmetzger ,
    
    I added a testcase now to the `ElasticsearchSinkITCase.java` list of tests. Can you kindly have a look once?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the issue:

    https://github.com/apache/flink/pull/1962
  
    Hello @StephanEwen ,
    
    I have removed a timer & doing the retry logic directly now. The backoff is 3s. Please have a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---