You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/18 18:13:45 UTC
[1/6] camel git commit: Revert "CAMEL-9055: camel-aws - SQS should
not allow handover the delete task" CAMEL-9405: Amazon SQS message deletion
behaviour change on exception
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x ec2d2394c -> ee9071a6b
refs/heads/camel-2.16.x 3f0af0a22 -> d84c66f5a
refs/heads/master f3c4fc2a2 -> dcb048854
Revert "CAMEL-9055: camel-aws - SQS should not allow handover the delete task"
CAMEL-9405: Amazon SQS message deletion behaviour change on exception
This reverts commit 47c64ec9c6b9609a71113ad82b15d2c66463c4cd.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1a4760e1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1a4760e1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1a4760e1
Branch: refs/heads/master
Commit: 1a4760e1ae260ec610ebcdbeee6389d442abd0e4
Parents: f3c4fc2
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Dec 18 18:07:50 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Dec 18 18:07:50 2015 +0100
----------------------------------------------------------------------
.../camel/component/aws/sqs/SqsConsumer.java | 40 ++++++++------------
.../camel/component/aws/sqs/SqsProducer.java | 2 +-
2 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1a4760e1/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index d3e0a25..0a8d024 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -34,13 +34,13 @@ import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.NoFactoryAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.spi.Synchronization;
-import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
@@ -141,20 +141,17 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
// schedule task to extend visibility if enabled
Integer visibilityTimeout = getConfiguration().getVisibilityTimeout();
- if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout / 2) > 0) {
- int delay = visibilityTimeout / 2;
- int period = visibilityTimeout;
+ if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) {
+ int delay = visibilityTimeout.intValue() / 2;
+ int period = visibilityTimeout.intValue();
int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()});
}
-
final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS);
-
- // as the AWS client is not thread-safe we cannot handover the task
- exchange.addOnCompletion(new SynchronizationAdapter() {
+ exchange.addOnCompletion(new Synchronization() {
@Override
public void onComplete(Exchange exchange) {
cancelExtender(exchange);
@@ -165,11 +162,6 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
cancelExtender(exchange);
}
- @Override
- public boolean allowHandover() {
- return false;
- }
-
private void cancelExtender(Exchange exchange) {
// cancel task as we are done
LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}", exchange.getExchangeId());
@@ -178,30 +170,24 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
});
}
+
// add on completion to handle after work when the exchange is done
- // as the AWS client is not thread-safe we cannot handover the task
- exchange.addOnCompletion(new SynchronizationAdapter() {
- @Override
+ exchange.addOnCompletion(new Synchronization() {
public void onComplete(Exchange exchange) {
processCommit(exchange);
}
- @Override
public void onFailure(Exchange exchange) {
processRollback(exchange);
}
@Override
- public boolean allowHandover() {
- return false;
- }
-
- @Override
public String toString() {
return "SqsConsumerOnCompletion";
}
});
+
LOG.trace("Processing exchange [{}]...", exchange);
getAsyncProcessor().process(exchange, new AsyncCallback() {
@Override
@@ -221,6 +207,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
*/
protected void processCommit(Exchange exchange) {
try {
+
if (shouldDelete(exchange)) {
String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class);
DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle);
@@ -237,7 +224,10 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
}
private boolean shouldDelete(Exchange exchange) {
- return getConfiguration().isDeleteAfterRead() && (getConfiguration().isDeleteIfFiltered() || passedThroughFilter(exchange));
+ return getConfiguration().isDeleteAfterRead()
+ && (getConfiguration().isDeleteIfFiltered()
+ || (!getConfiguration().isDeleteIfFiltered()
+ && passedThroughFilter(exchange)));
}
private boolean passedThroughFilter(Exchange exchange) {
@@ -252,7 +242,9 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
protected void processRollback(Exchange exchange) {
Exception cause = exchange.getException();
if (cause != null) {
- getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, cause);
+ LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause);
+ } else {
+ LOG.warn("Exchange failed, so rolling back message status: {}", exchange);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1a4760e1/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index cd16707..69b1eb3 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -66,7 +66,7 @@ public class SqsProducer extends DefaultProducer {
private void addDelay(SendMessageRequest request, Exchange exchange) {
Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class);
- Integer delayValue;
+ Integer delayValue = Integer.valueOf(0);
if (headerValue == null) {
LOG.trace("Using the config delay");
delayValue = getEndpoint().getConfiguration().getDelaySeconds();
[2/6] camel git commit: CAMEL-9405: Polsihed
Posted by da...@apache.org.
CAMEL-9405: Polsihed
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dcb04885
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dcb04885
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dcb04885
Branch: refs/heads/master
Commit: dcb048854ba74c8f409afa0bdadd90fa0e23f0fc
Parents: 1a4760e
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Dec 18 18:10:59 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Dec 18 18:10:59 2015 +0100
----------------------------------------------------------------------
.../java/org/apache/camel/component/aws/sqs/SqsConsumer.java | 4 +---
.../java/org/apache/camel/component/aws/sqs/SqsProducer.java | 2 +-
2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/dcb04885/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index 0a8d024..51aa261 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -242,9 +242,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
protected void processRollback(Exchange exchange) {
Exception cause = exchange.getException();
if (cause != null) {
- LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause);
- } else {
- LOG.warn("Exchange failed, so rolling back message status: {}", exchange);
+ getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, cause);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/dcb04885/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index 69b1eb3..cd16707 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -66,7 +66,7 @@ public class SqsProducer extends DefaultProducer {
private void addDelay(SendMessageRequest request, Exchange exchange) {
Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class);
- Integer delayValue = Integer.valueOf(0);
+ Integer delayValue;
if (headerValue == null) {
LOG.trace("Using the config delay");
delayValue = getEndpoint().getConfiguration().getDelaySeconds();
[5/6] camel git commit: Revert "CAMEL-9055: camel-aws - SQS should
not allow handover the delete task" CAMEL-9405: Amazon SQS message deletion
behaviour change on exception
Posted by da...@apache.org.
Revert "CAMEL-9055: camel-aws - SQS should not allow handover the delete task"
CAMEL-9405: Amazon SQS message deletion behaviour change on exception
This reverts commit 47c64ec9c6b9609a71113ad82b15d2c66463c4cd.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/86e464df
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/86e464df
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/86e464df
Branch: refs/heads/camel-2.15.x
Commit: 86e464df2821b1dc2376dee599b0514777e385fd
Parents: ec2d239
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Dec 18 18:07:50 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Dec 18 18:13:30 2015 +0100
----------------------------------------------------------------------
.../camel/component/aws/sqs/SqsConsumer.java | 40 ++++++++------------
.../camel/component/aws/sqs/SqsProducer.java | 2 +-
2 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/86e464df/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index d3e0a25..0a8d024 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -34,13 +34,13 @@ import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.NoFactoryAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.spi.Synchronization;
-import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
@@ -141,20 +141,17 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
// schedule task to extend visibility if enabled
Integer visibilityTimeout = getConfiguration().getVisibilityTimeout();
- if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout / 2) > 0) {
- int delay = visibilityTimeout / 2;
- int period = visibilityTimeout;
+ if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) {
+ int delay = visibilityTimeout.intValue() / 2;
+ int period = visibilityTimeout.intValue();
int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()});
}
-
final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS);
-
- // as the AWS client is not thread-safe we cannot handover the task
- exchange.addOnCompletion(new SynchronizationAdapter() {
+ exchange.addOnCompletion(new Synchronization() {
@Override
public void onComplete(Exchange exchange) {
cancelExtender(exchange);
@@ -165,11 +162,6 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
cancelExtender(exchange);
}
- @Override
- public boolean allowHandover() {
- return false;
- }
-
private void cancelExtender(Exchange exchange) {
// cancel task as we are done
LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}", exchange.getExchangeId());
@@ -178,30 +170,24 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
});
}
+
// add on completion to handle after work when the exchange is done
- // as the AWS client is not thread-safe we cannot handover the task
- exchange.addOnCompletion(new SynchronizationAdapter() {
- @Override
+ exchange.addOnCompletion(new Synchronization() {
public void onComplete(Exchange exchange) {
processCommit(exchange);
}
- @Override
public void onFailure(Exchange exchange) {
processRollback(exchange);
}
@Override
- public boolean allowHandover() {
- return false;
- }
-
- @Override
public String toString() {
return "SqsConsumerOnCompletion";
}
});
+
LOG.trace("Processing exchange [{}]...", exchange);
getAsyncProcessor().process(exchange, new AsyncCallback() {
@Override
@@ -221,6 +207,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
*/
protected void processCommit(Exchange exchange) {
try {
+
if (shouldDelete(exchange)) {
String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class);
DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle);
@@ -237,7 +224,10 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
}
private boolean shouldDelete(Exchange exchange) {
- return getConfiguration().isDeleteAfterRead() && (getConfiguration().isDeleteIfFiltered() || passedThroughFilter(exchange));
+ return getConfiguration().isDeleteAfterRead()
+ && (getConfiguration().isDeleteIfFiltered()
+ || (!getConfiguration().isDeleteIfFiltered()
+ && passedThroughFilter(exchange)));
}
private boolean passedThroughFilter(Exchange exchange) {
@@ -252,7 +242,9 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
protected void processRollback(Exchange exchange) {
Exception cause = exchange.getException();
if (cause != null) {
- getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, cause);
+ LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause);
+ } else {
+ LOG.warn("Exchange failed, so rolling back message status: {}", exchange);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/86e464df/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index cd16707..69b1eb3 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -66,7 +66,7 @@ public class SqsProducer extends DefaultProducer {
private void addDelay(SendMessageRequest request, Exchange exchange) {
Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class);
- Integer delayValue;
+ Integer delayValue = Integer.valueOf(0);
if (headerValue == null) {
LOG.trace("Using the config delay");
delayValue = getEndpoint().getConfiguration().getDelaySeconds();
[3/6] camel git commit: Revert "CAMEL-9055: camel-aws - SQS should
not allow handover the delete task" CAMEL-9405: Amazon SQS message deletion
behaviour change on exception
Posted by da...@apache.org.
Revert "CAMEL-9055: camel-aws - SQS should not allow handover the delete task"
CAMEL-9405: Amazon SQS message deletion behaviour change on exception
This reverts commit 47c64ec9c6b9609a71113ad82b15d2c66463c4cd.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2382fd5e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2382fd5e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2382fd5e
Branch: refs/heads/camel-2.16.x
Commit: 2382fd5edd2147a8627ca9cf3ca0f7e4f39b49e3
Parents: 3f0af0a
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Dec 18 18:07:50 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Dec 18 18:13:08 2015 +0100
----------------------------------------------------------------------
.../camel/component/aws/sqs/SqsConsumer.java | 40 ++++++++------------
.../camel/component/aws/sqs/SqsProducer.java | 2 +-
2 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2382fd5e/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index d3e0a25..0a8d024 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -34,13 +34,13 @@ import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.NoFactoryAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.spi.Synchronization;
-import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
@@ -141,20 +141,17 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
// schedule task to extend visibility if enabled
Integer visibilityTimeout = getConfiguration().getVisibilityTimeout();
- if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout / 2) > 0) {
- int delay = visibilityTimeout / 2;
- int period = visibilityTimeout;
+ if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) {
+ int delay = visibilityTimeout.intValue() / 2;
+ int period = visibilityTimeout.intValue();
int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()});
}
-
final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS);
-
- // as the AWS client is not thread-safe we cannot handover the task
- exchange.addOnCompletion(new SynchronizationAdapter() {
+ exchange.addOnCompletion(new Synchronization() {
@Override
public void onComplete(Exchange exchange) {
cancelExtender(exchange);
@@ -165,11 +162,6 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
cancelExtender(exchange);
}
- @Override
- public boolean allowHandover() {
- return false;
- }
-
private void cancelExtender(Exchange exchange) {
// cancel task as we are done
LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}", exchange.getExchangeId());
@@ -178,30 +170,24 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
});
}
+
// add on completion to handle after work when the exchange is done
- // as the AWS client is not thread-safe we cannot handover the task
- exchange.addOnCompletion(new SynchronizationAdapter() {
- @Override
+ exchange.addOnCompletion(new Synchronization() {
public void onComplete(Exchange exchange) {
processCommit(exchange);
}
- @Override
public void onFailure(Exchange exchange) {
processRollback(exchange);
}
@Override
- public boolean allowHandover() {
- return false;
- }
-
- @Override
public String toString() {
return "SqsConsumerOnCompletion";
}
});
+
LOG.trace("Processing exchange [{}]...", exchange);
getAsyncProcessor().process(exchange, new AsyncCallback() {
@Override
@@ -221,6 +207,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
*/
protected void processCommit(Exchange exchange) {
try {
+
if (shouldDelete(exchange)) {
String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class);
DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle);
@@ -237,7 +224,10 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
}
private boolean shouldDelete(Exchange exchange) {
- return getConfiguration().isDeleteAfterRead() && (getConfiguration().isDeleteIfFiltered() || passedThroughFilter(exchange));
+ return getConfiguration().isDeleteAfterRead()
+ && (getConfiguration().isDeleteIfFiltered()
+ || (!getConfiguration().isDeleteIfFiltered()
+ && passedThroughFilter(exchange)));
}
private boolean passedThroughFilter(Exchange exchange) {
@@ -252,7 +242,9 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
protected void processRollback(Exchange exchange) {
Exception cause = exchange.getException();
if (cause != null) {
- getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, cause);
+ LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause);
+ } else {
+ LOG.warn("Exchange failed, so rolling back message status: {}", exchange);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2382fd5e/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index cd16707..69b1eb3 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -66,7 +66,7 @@ public class SqsProducer extends DefaultProducer {
private void addDelay(SendMessageRequest request, Exchange exchange) {
Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class);
- Integer delayValue;
+ Integer delayValue = Integer.valueOf(0);
if (headerValue == null) {
LOG.trace("Using the config delay");
delayValue = getEndpoint().getConfiguration().getDelaySeconds();
[4/6] camel git commit: CAMEL-9405: Polsihed
Posted by da...@apache.org.
CAMEL-9405: Polsihed
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d84c66f5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d84c66f5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d84c66f5
Branch: refs/heads/camel-2.16.x
Commit: d84c66f5a4ab6edcb11abbde44f7d3fc4d0dc529
Parents: 2382fd5
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Dec 18 18:10:59 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Dec 18 18:13:14 2015 +0100
----------------------------------------------------------------------
.../java/org/apache/camel/component/aws/sqs/SqsConsumer.java | 4 +---
.../java/org/apache/camel/component/aws/sqs/SqsProducer.java | 2 +-
2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d84c66f5/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index 0a8d024..51aa261 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -242,9 +242,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
protected void processRollback(Exchange exchange) {
Exception cause = exchange.getException();
if (cause != null) {
- LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause);
- } else {
- LOG.warn("Exchange failed, so rolling back message status: {}", exchange);
+ getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, cause);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d84c66f5/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index 69b1eb3..cd16707 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -66,7 +66,7 @@ public class SqsProducer extends DefaultProducer {
private void addDelay(SendMessageRequest request, Exchange exchange) {
Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class);
- Integer delayValue = Integer.valueOf(0);
+ Integer delayValue;
if (headerValue == null) {
LOG.trace("Using the config delay");
delayValue = getEndpoint().getConfiguration().getDelaySeconds();
[6/6] camel git commit: CAMEL-9405: Polsihed
Posted by da...@apache.org.
CAMEL-9405: Polsihed
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee9071a6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee9071a6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee9071a6
Branch: refs/heads/camel-2.15.x
Commit: ee9071a6b4635953b36c663b45c079e63b611434
Parents: 86e464d
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Dec 18 18:10:59 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Dec 18 18:13:36 2015 +0100
----------------------------------------------------------------------
.../java/org/apache/camel/component/aws/sqs/SqsConsumer.java | 4 +---
.../java/org/apache/camel/component/aws/sqs/SqsProducer.java | 2 +-
2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ee9071a6/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index 0a8d024..51aa261 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -242,9 +242,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
protected void processRollback(Exchange exchange) {
Exception cause = exchange.getException();
if (cause != null) {
- LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause);
- } else {
- LOG.warn("Exchange failed, so rolling back message status: {}", exchange);
+ getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, cause);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ee9071a6/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index 69b1eb3..cd16707 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -66,7 +66,7 @@ public class SqsProducer extends DefaultProducer {
private void addDelay(SendMessageRequest request, Exchange exchange) {
Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class);
- Integer delayValue = Integer.valueOf(0);
+ Integer delayValue;
if (headerValue == null) {
LOG.trace("Using the config delay");
delayValue = getEndpoint().getConfiguration().getDelaySeconds();