You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by HilmiYildirim <gi...@git.apache.org> on 2015/10/09 14:33:37 UTC

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

GitHub user HilmiYildirim opened a pull request:

    https://github.com/apache/flink/pull/1243

    [FLINK-2624]: RabbitMQSource now extends MessageAcknowledgingSourceBase.

    It now participates in checkpointing.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HilmiYildirim/flink AckRabbitMQ

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1243.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1243
    
----
commit 50b3683c5ad0449032278600e22324784b709549
Author: Hilmi Yildirim <hi...@dfki.de>
Date:   2015-10-09T12:30:07Z

    [FLINK-2624]: RabbitMQSource now extends MessageAcknowledgingSourceBase.
    It now participates in checkpointing.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-147625208
  
    Hi Hilmi, 
    thank you for your PR. We'll review it soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r46048074
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---
    @@ -102,4 +116,18 @@ public void run(SourceContext<OUT> ctx) throws Exception {
     	public void cancel() {
     		running = false;
     	}
    +
    +	@Override
    +	protected void acknowledgeIDs(List<Long> ids) {
    +		try {
    +			channel.basicAck(ids.get(ids.size() - 1), true);
    +		} catch (IOException e) {
    +			Log.error("Messages could not be acknowledged", e);
    --- End diff --
    
    If this fails, the checkpoint is incomplete. We should throw an Exception here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-157522385
  
    I think RabbitMQ is written in Erlang and does not run in the JVM. We could provide a test that expects a running RabbitMQ server somewhere (locally).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r42249355
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---
    @@ -90,10 +99,11 @@ public void run(SourceContext<OUT> ctx) throws Exception {
     			delivery = consumer.nextDelivery();
     
     			OUT result = schema.deserialize(delivery.getBody());
    +			addId(ctx, delivery.getEnvelope().getDeliveryTag());
    --- End diff --
    
    There should be a lock scope around storing the ID and emitting the record. Have a look at the example code in the java docs of the MessageAcknowledingSourceBase class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-160988124
  
    Thank for your contribution @HilmiYildirim. I've merged your commit and put some changes on top of it in 9215b72422d3e638fe950b61fa01f2e4e04981a0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r46046784
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---
    @@ -102,4 +116,18 @@ public void run(SourceContext<OUT> ctx) throws Exception {
     	public void cancel() {
     		running = false;
     	}
    +
    +	@Override
    +	protected void acknowledgeIDs(List<Long> ids) {
    +		try {
    +			channel.basicAck(ids.get(ids.size() - 1), true);
    --- End diff --
    
    Since we acknowledge all ids <= the last stored id (the true flag), it is sufficient to store only the last id. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r46040790
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---
    @@ -89,12 +99,16 @@ public void run(SourceContext<OUT> ctx) throws Exception {
     		while (running) {
     			delivery = consumer.nextDelivery();
     
    -			OUT result = schema.deserialize(delivery.getBody());
    -			if (schema.isEndOfStream(result)) {
    -				break;
    +			synchronized (ctx.getCheckpointLock()) {
    +				OUT result = schema.deserialize(delivery.getBody());
    +				addId(ctx, delivery.getEnvelope().getDeliveryTag());
    --- End diff --
    
    This looks like it will grow the id array infinitely when the checkpointing is disabled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-148675785
  
    I would like to shepherd this PR.
    
    @HilmiYildirim, I do agree with your suggestion regarding the checkpoint log. Could you change that in the PR, please?
    Could you also do the logging that I have suggested?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-155783584
  
    I would like to look over this in the next days and merge it if it looks good.
    
    @HilmiYildirim Do you know of any simple way to run an embedded RabbitMQ broker in order to write some tests for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r42249695
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java ---
    @@ -157,17 +179,35 @@ public void restoreState(SerializedCheckpointData[] state) throws Exception {
     
     	@Override
     	public void notifyCheckpointComplete(long checkpointId) throws Exception {
    -		for (Iterator<Tuple2<Long, List<Id>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
    -			Tuple2<Long, List<Id>> checkpoint = iter.next();
    -			long id = checkpoint.f0;
    -			
    -			if (id <= checkpointId) {
    -				acknowledgeIDs(checkpoint.f1);
    -				iter.remove();
    -			}
    -			else {
    -				break;
    +		if (!running) {
    --- End diff --
    
    Probably redundant


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by HilmiYildirim <gi...@git.apache.org>.
Github user HilmiYildirim commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-149248114
  
    Somehow there was an error in the push. I fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-157720650
  
    Okay. I'll try to review this and get it merged in the next days. Maybe I can even use Mockito for a minimal test...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r42250317
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java ---
    @@ -157,17 +179,35 @@ public void restoreState(SerializedCheckpointData[] state) throws Exception {
     
     	@Override
     	public void notifyCheckpointComplete(long checkpointId) throws Exception {
    -		for (Iterator<Tuple2<Long, List<Id>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
    -			Tuple2<Long, List<Id>> checkpoint = iter.next();
    -			long id = checkpoint.f0;
    -			
    -			if (id <= checkpointId) {
    -				acknowledgeIDs(checkpoint.f1);
    -				iter.remove();
    -			}
    -			else {
    -				break;
    +		if (!running) {
    +			LOG.debug("notifyCheckpointComplete() called on closed source");
    +			return;
    +		}
    +		
    +		// only one commit operation must be in progress
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);
    +		}
    +		
    +		List<Id> idsOfCheckpointedMessages = new LinkedList<Id>();
    --- End diff --
    
    The extra copying into a list only to generate a log statement seems like too much overhead.
    Also, logging potentially millions of IDs the log (especially on INFO level) is probably a bit overkill...
    
    I would log the IDs on TRACE level only. Also, rather than copy the individual lists into one lists, I would save the overhead and log one line per list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by HilmiYildirim <gi...@git.apache.org>.
Github user HilmiYildirim commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-160650903
  
    Ok great


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r42250017
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java ---
    @@ -157,17 +179,35 @@ public void restoreState(SerializedCheckpointData[] state) throws Exception {
     
     	@Override
     	public void notifyCheckpointComplete(long checkpointId) throws Exception {
    -		for (Iterator<Tuple2<Long, List<Id>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
    -			Tuple2<Long, List<Id>> checkpoint = iter.next();
    -			long id = checkpoint.f0;
    -			
    -			if (id <= checkpointId) {
    -				acknowledgeIDs(checkpoint.f1);
    -				iter.remove();
    -			}
    -			else {
    -				break;
    +		if (!running) {
    +			LOG.debug("notifyCheckpointComplete() called on closed source");
    +			return;
    +		}
    +		
    +		// only one commit operation must be in progress
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);
    +		}
    +		
    +		List<Id> idsOfCheckpointedMessages = new LinkedList<Id>();
    +		synchronized (pendingCheckpoints) {
    +			for (Iterator<Tuple2<Long, List<Id>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
    +				Tuple2<Long, List<Id>> checkpoint = iter.next();
    +				long id = checkpoint.f0;
    +				
    +				if (id <= checkpointId) {
    +					idsForCurrentCheckpoint.addAll(checkpoint.f1);
    +					acknowledgeIDs(checkpoint.f1);
    +					iter.remove();
    +				}
    +				else {
    +					break;
    +				}
     			}
     		}
    +		
    +		if (LOG.isInfoEnabled()) {
    +			LOG.info("Committing Messages with following IDs {}", idsOfCheckpointedMessages);
    --- End diff --
    
    This will in most cases generate a gigantic log line that may bring the whole system to a stall.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by HilmiYildirim <gi...@git.apache.org>.
Github user HilmiYildirim commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-155820098
  
    Unfortunately, I do not know a simple way to run an embedded RabbitMQ broker


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by HilmiYildirim <gi...@git.apache.org>.
Github user HilmiYildirim commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-147963553
  
    There are test failures. But it seems this is not my fault.
    Furthermore, I have not synchronized by the checkpointingLock when calling addId. I think it is better if the synchronization is done inside the addId method. Then, someone who inherits from MessageAcknowledingSourceBase does not need to care about the synchronization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r42249791
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java ---
    @@ -157,17 +179,35 @@ public void restoreState(SerializedCheckpointData[] state) throws Exception {
     
     	@Override
     	public void notifyCheckpointComplete(long checkpointId) throws Exception {
    -		for (Iterator<Tuple2<Long, List<Id>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
    -			Tuple2<Long, List<Id>> checkpoint = iter.next();
    -			long id = checkpoint.f0;
    -			
    -			if (id <= checkpointId) {
    -				acknowledgeIDs(checkpoint.f1);
    -				iter.remove();
    -			}
    -			else {
    -				break;
    +		if (!running) {
    +			LOG.debug("notifyCheckpointComplete() called on closed source");
    +			return;
    +		}
    +		
    +		// only one commit operation must be in progress
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);
    +		}
    +		
    +		List<Id> idsOfCheckpointedMessages = new LinkedList<Id>();
    +		synchronized (pendingCheckpoints) {
    --- End diff --
    
    You can skip this synchronization if the source run method properly acquires the checkpoint lock.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1243


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-148737070
  
    I disagree with the locking in the base class. If you look at the documenation for sources, it states how to make sure you hold the checkpoint lock when emitting the elements and updating the state (here IDs to be remembered). That is the encouraged mechanism, as it gives the exactly once guarantee
    
    If you lock properly there, there is no need to hold any lock 
    
    In some sense all modifications to the base class are a bit tricky:
      - The locks are not required if you properly hold the checkpoint lock
      - the open/close checks are not necessary as well, this should be guaranteed by the runtime already
      - the logging will probably bring down the system, if used in a real setting with decent throughput.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by HilmiYildirim <gi...@git.apache.org>.
Github user HilmiYildirim commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-149195168
  
    I adapted the code and pushed it to my repo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-160146335
  
    I'm creating a unit test for the source which mocks the RabbitMQ classes. I would like to merge your pull request with a few changes for both the fault-tolerant and the non-fault-tolerant use case (see comments).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-160141917
  
    This currently only works when checkpointing is turned on. If checkpointing is turned off, not only the number of queued messages will grow unbounded, but also the messages will never be acknowledged. In terms of RabbitMQ that means that messages will be kept in the broker forever and will be redelivered once another consumer connects to the queue again.
    
    In terms of exactly once, I'm not 100% sure how we can support it. I think we have to default to per-message acknowledgments (instead of acknowledging all ids <= the last processed id). Only that way we can make sure that upon failure we acknowledge the right messages. The reason for that is that distribution of the messages may not be the same after a redeployment of a failed job, i.e. consumers may receive messages that have not seen the message beforehand, thus a message may be processed more than once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by HilmiYildirim <gi...@git.apache.org>.
Github user HilmiYildirim commented on the pull request:

    https://github.com/apache/flink/pull/1243#issuecomment-148726397
  
    Done. I hope this is ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r46054711
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---
    @@ -102,4 +116,18 @@ public void run(SourceContext<OUT> ctx) throws Exception {
     	public void cancel() {
     		running = false;
     	}
    +
    +	@Override
    +	protected void acknowledgeIDs(List<Long> ids) {
    +		try {
    +			channel.basicAck(ids.get(ids.size() - 1), true);
    --- End diff --
    
    This also fails if no messages have been processed between two checkpoint intervals (-1 index).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r42248976
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---
    @@ -102,4 +112,18 @@ public void run(SourceContext<OUT> ctx) throws Exception {
     	public void cancel() {
     		running = false;
     	}
    +
    +	@Override
    +	protected void acknowledgeIDs(List<Long> ids) {
    +		try {
    +			channel.basicAck(ids.get(ids.size() - 1), true);
    +		} catch (IOException e) {
    +			e.printStackTrace();
    --- End diff --
    
    Exceptions should never be swallowed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r42247633
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java ---
    @@ -144,6 +152,20 @@ protected void addId(Id id) {
     	
     	@Override
     	public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
    +		if (idsForCurrentCheckpoint == null) {
    --- End diff --
    
    I think these checks are pretty redundant. The tasks should guarantee that no calls happen on non-open functions, or closed functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r42249568
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java ---
    @@ -144,6 +152,20 @@ protected void addId(Id id) {
     	
     	@Override
     	public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
    +		if (idsForCurrentCheckpoint == null) {
    +			LOG.debug("snapshotState() requested on not yet opened source; returning null.");
    +			return null;
    +		}
    +		if (!running) {
    +			LOG.debug("snapshotState() called on closed source");
    +			return null;
    +		}
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
    --- End diff --
    
    The list of IDs can be very long. Not sure if it is a good idea to put that into a log statement (single line, string)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2624]: RabbitMQSource now extends Messa...

Posted by HilmiYildirim <gi...@git.apache.org>.
Github user HilmiYildirim commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1243#discussion_r42360149
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---
    @@ -102,4 +112,18 @@ public void run(SourceContext<OUT> ctx) throws Exception {
     	public void cancel() {
     		running = false;
     	}
    +
    +	@Override
    +	protected void acknowledgeIDs(List<Long> ids) {
    +		try {
    +			channel.basicAck(ids.get(ids.size() - 1), true);
    +		} catch (IOException e) {
    +			e.printStackTrace();
    --- End diff --
    
    Is it enough to log the exception? Because it can not be handled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---