You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2016/06/29 14:26:30 UTC

[GitHub] flink pull request #2183: [FLINK-4123] Cassandra sink checks for exceptions ...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/2183

    [FLINK-4123] Cassandra sink checks for exceptions in ack phase

    This PR prevents the Cassandra Sink from locking up during the acknowledgement phase.
    
    We know check for exceptions in the ack phase. Should an exception have occurred we will retry the submission on the next notifyCheckpointComplete.
    
    In addition, separate sendValues() calls are now properly isolated. Previously the callbacks of an aborted checkpoint could interfere with the exception field and response counter for other checkpoints.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 4123_cass_hang

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2183.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2183
    
----
commit 0ec0220e794e6b71053cc1061b2b70ee2865016a
Author: zentol <ch...@apache.org>
Date:   2016-06-29T14:21:21Z

    [FLINK-4123] Cassandra sink checks for exceptions in ack phase

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2183: [FLINK-4123] Cassandra sink checks for exceptions in ack ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2183
  
    @tillrohrmann i've added the wait-notify stuff.
    
    We can't add a test since we can't reliable cause a callback to throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2183: [FLINK-4123] Cassandra sink checks for exceptions in ack ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2183
  
    @tillrohrmann test added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2183: [FLINK-4123] Cassandra sink checks for exceptions ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2183#discussion_r69153660
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---
    @@ -110,11 +92,25 @@ public void close() throws Exception {
     	}
     
     	@Override
    -	protected void sendValues(Iterable<IN> values, long timestamp) throws Exception {
    -		//verify that no query failed until now
    -		if (exception != null) {
    -			throw new Exception(exception);
    -		}
    +	protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception {
    +		int updatesSent = 0;
    +		final AtomicInteger updatesConfirmed = new AtomicInteger(0);
    +
    +		final AtomicContainer<Throwable> exception = new AtomicContainer<>();
    --- End diff --
    
    The Java library provides an `AtomicReference` implementation which we could use instead of `AtomicContainer`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2183: [FLINK-4123] Cassandra sink checks for exceptions in ack ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2183
  
    If our components are not testable, then this is good indicator that we can still improve our design. But actually I think that it should be doable in this case. Can't we mock the `Session` such that it stores all `ResultSetFutures`. We can then manually let one of the futures fail and let the other complete normally. That way, we could test the newly implemented behaviour.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2183: [FLINK-4123] Cassandra sink checks for exceptions in ack ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2183
  
    I have one final remark concerning the busy wait loop. I think it's better to replace it with a wait-notify construct. Maybe we could also add a test which assures that the `sendValues` loop is left in case of an exception.
    
    After addressing this comment, I think the PR is good to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2183: [FLINK-4123] Cassandra sink checks for exceptions in ack ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2183
  
    Changes look good to me. Thanks for your contribution @zentol. Will be merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2183: [FLINK-4123] Cassandra sink checks for exceptions ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2183#discussion_r69153181
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---
    @@ -49,14 +49,8 @@
     	private final String insertQuery;
    --- End diff --
    
    The `CassandraTupleWriteAheadSink` and the `GenericWriteAheadSink` don't define a `serialVersionUID`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2183: [FLINK-4123] Cassandra sink checks for exceptions in ack ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2183
  
    I think the change is good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2183: [FLINK-4123] Cassandra sink checks for exceptions ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2183#discussion_r69163543
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---
    @@ -110,11 +92,25 @@ public void close() throws Exception {
     	}
     
     	@Override
    -	protected void sendValues(Iterable<IN> values, long timestamp) throws Exception {
    -		//verify that no query failed until now
    -		if (exception != null) {
    -			throw new Exception(exception);
    -		}
    +	protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception {
    +		int updatesSent = 0;
    +		final AtomicInteger updatesConfirmed = new AtomicInteger(0);
    +
    +		final AtomicContainer<Throwable> exception = new AtomicContainer<>();
    +
    +		FutureCallback<ResultSet> callback = new FutureCallback<ResultSet>() {
    +			@Override
    +			public void onSuccess(ResultSet resultSet) {
    +				updatesConfirmed.incrementAndGet();
    +			}
    +
    +			@Override
    +			public void onFailure(Throwable throwable) {
    +				exception.set(throwable);
    --- End diff --
    
    i don't think it matters too much, as long as _some_ exception is noticed. The first exception would probably be the most reasonable approach though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2183: [FLINK-4123] Cassandra sink checks for exceptions ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2183#discussion_r69153934
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---
    @@ -110,11 +92,25 @@ public void close() throws Exception {
     	}
     
     	@Override
    -	protected void sendValues(Iterable<IN> values, long timestamp) throws Exception {
    -		//verify that no query failed until now
    -		if (exception != null) {
    -			throw new Exception(exception);
    -		}
    +	protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception {
    +		int updatesSent = 0;
    +		final AtomicInteger updatesConfirmed = new AtomicInteger(0);
    +
    +		final AtomicContainer<Throwable> exception = new AtomicContainer<>();
    +
    +		FutureCallback<ResultSet> callback = new FutureCallback<ResultSet>() {
    +			@Override
    +			public void onSuccess(ResultSet resultSet) {
    +				updatesConfirmed.incrementAndGet();
    +			}
    +
    +			@Override
    +			public void onFailure(Throwable throwable) {
    +				exception.set(throwable);
    --- End diff --
    
    Do we always want to report the last set exception or the first exception which occurred? In the latter case we would have to use the `AtomicReference#compareAndSet(null, throwable)` method. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2183: [FLINK-4123] Cassandra sink checks for exceptions ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2183#discussion_r69440065
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---
    @@ -132,11 +132,15 @@ protected void sendValues(Iterable<IN> values, long timestamp) throws Exception
     		}
     		try {
     			while (updatesSent != updatesConfirmed.get()) {
    +				if (exception.get() != null) { // verify that no query failed until now
    +					LOG.warn("Sending a value failed.", exception.get());
    +					break;
    +				}
     				Thread.sleep(100);
     			}
    --- End diff --
    
    Could we try to replace the busy waiting loop with a wait-notify construct? That would decrease latency and wouldn't burn unnecessary CPU cycles. We could sync on the `updateConfirmed` variable. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2183: [FLINK-4123] Cassandra sink checks for exceptions in ack ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2183
  
    @tillrohrmann Addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2183: [FLINK-4123] Cassandra sink checks for exceptions ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2183


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---