You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/12/03 15:00:00 UTC
[1/3] camel git commit: CAMEL-8092 Fixed the cxf:producer Matrix
Params missing issue with thanks to kumarann
Repository: camel
Updated Branches:
refs/heads/master aa6e20a77 -> ced84063a
CAMEL-8092 Fixed the cxf:producer Matrix Params missing issue with thanks to kumarann
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a3bf8472
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a3bf8472
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a3bf8472
Branch: refs/heads/master
Commit: a3bf8472a527b96d3b5a974aaf47a355c207ba9f
Parents: ff59fae
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Dec 3 21:45:15 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Dec 3 21:57:52 2014 +0800
----------------------------------------------------------------------
.../component/cxf/jaxrs/CxfRsProducer.java | 42 ++++++++++++++++++++
1 file changed, 42 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a3bf8472/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index 6a092ce..734d972 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -44,6 +44,7 @@ import org.apache.cxf.jaxrs.JAXRSServiceFactoryBean;
import org.apache.cxf.jaxrs.client.Client;
import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean;
import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.message.MessageImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,6 +120,30 @@ public class CxfRsProducer extends DefaultProducer {
}
+ protected void setupClientMatrix(WebClient client, Exchange exchange) throws Exception {
+
+ org.apache.cxf.message.Message cxfMessage = (org.apache.cxf.message.Message) exchange.getIn().getHeader("CamelCxfMessage");
+ if (cxfMessage != null) {
+ String requestURL = (String)cxfMessage.get("org.apache.cxf.request.uri");
+ String matrixParam = null;
+ int matrixStart = requestURL.indexOf(";");
+ int matrixEnd = requestURL.indexOf("?") > -1 ? requestURL.indexOf("?") : requestURL.length();
+ Map<String, String> maps = null;
+ if (requestURL != null && matrixStart > 0) {
+ matrixParam = requestURL.substring(matrixStart + 1, matrixEnd);
+ if (matrixParam != null) {
+ maps = getMatrixParametersFromMatrixString(matrixParam, IOHelper.getCharsetName(exchange));
+ }
+ }
+ if (maps != null) {
+ for (Map.Entry<String, String> entry : maps.entrySet()) {
+ client.matrix(entry.getKey(), entry.getValue());
+ LOG.debug("Matrix param " + entry.getKey() + " :: " + entry.getValue());
+ }
+ }
+ }
+ }
+
protected void setupClientHeaders(Client client, Exchange exchange) throws Exception {
Message inMessage = exchange.getIn();
CxfRsEndpoint cxfRsEndpoint = (CxfRsEndpoint) getEndpoint();
@@ -175,6 +200,8 @@ public class CxfRsProducer extends DefaultProducer {
}
}
}
+
+ setupClientMatrix(client, exchange);
setupClientQueryAndHeaders(client, exchange);
@@ -345,6 +372,21 @@ public class CxfRsProducer extends DefaultProducer {
return answer;
}
+ private Map<String, String> getMatrixParametersFromMatrixString(String matrixString, String charset) throws UnsupportedEncodingException {
+ Map<String, String> answer = new LinkedHashMap<String, String>();
+ for (String param : matrixString.split(";")) {
+ String[] pair = param.split("=", 2);
+ if (pair.length == 2) {
+ String name = URLDecoder.decode(pair[0], charset);
+ String value = URLDecoder.decode(pair[1], charset);
+ answer.put(name, value);
+ } else {
+ throw new IllegalArgumentException("Invalid parameter, expected to be a pair but was " + param);
+ }
+ }
+ return answer;
+ }
+
private String arrayToString(Object[] array) {
StringBuilder buffer = new StringBuilder("[");
for (Object obj : array) {
[3/3] camel git commit: CAMEL-8105 support for redrivePolicy inside
SQSEndpoint with thanks to Rufus
Posted by ni...@apache.org.
CAMEL-8105 support for redrivePolicy inside SQSEndpoint with thanks to Rufus
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ced84063
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ced84063
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ced84063
Branch: refs/heads/master
Commit: ced84063a2f56ce555cc2a6a4df53afc4d35c5be
Parents: a3bf847
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Dec 3 21:51:18 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Dec 3 21:59:37 2014 +0800
----------------------------------------------------------------------
.../camel/component/aws/sqs/SqsConfiguration.java | 12 ++++++++++++
.../org/apache/camel/component/aws/sqs/SqsEndpoint.java | 6 ++++++
.../aws/sqs/SqsComponentConfigurationTest.java | 6 +++++-
3 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index de24cc5..718aee0 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -52,6 +52,9 @@ public class SqsConfiguration {
private Integer messageRetentionPeriod;
private Integer receiveMessageWaitTimeSeconds;
private String policy;
+
+ // dead letter queue properties
+ private String redrivePolicy;
public void setAmazonSQSEndpoint(String amazonSQSEndpoint) {
this.amazonSQSEndpoint = amazonSQSEndpoint;
@@ -165,6 +168,14 @@ public class SqsConfiguration {
this.policy = policy;
}
+ public String getRedrivePolicy() {
+ return redrivePolicy;
+ }
+
+ public void setRedrivePolicy(String redrivePolicy) {
+ this.redrivePolicy = redrivePolicy;
+ }
+
public boolean isExtendMessageVisibility() {
return this.extendMessageVisibility;
}
@@ -231,6 +242,7 @@ public class SqsConfiguration {
+ ", receiveMessageWaitTimeSeconds=" + receiveMessageWaitTimeSeconds
+ ", delaySeconds=" + delaySeconds
+ ", policy=" + policy
+ + ", redrivePolicy=" + redrivePolicy
+ ", extendMessageVisibility=" + extendMessageVisibility
+ ", queueOwnerAWSAccountId=" + queueOwnerAWSAccountId
+ ", region=" + region
http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index 8ed85c5..29ae3db 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -153,6 +153,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt
if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) {
request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds()));
}
+ if (getConfiguration().getRedrivePolicy() != null) {
+ request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy());
+ }
LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
CreateQueueResult queueResult = client.createQueue(request);
@@ -179,6 +182,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt
if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) {
request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds()));
}
+ if (getConfiguration().getRedrivePolicy() != null) {
+ request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy());
+ }
if (!request.getAttributes().isEmpty()) {
LOG.trace("Updating queue '{}' with the provided queue attributes...", configuration.getQueueName());
client.setQueueAttributes(request);
http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
index e7fb9dc..e7eac1f 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
@@ -43,6 +43,7 @@ public class SqsComponentConfigurationTest extends CamelTestSupport {
assertNull(endpoint.getConfiguration().getMaximumMessageSize());
assertNull(endpoint.getConfiguration().getMessageRetentionPeriod());
assertNull(endpoint.getConfiguration().getPolicy());
+ assertNull(endpoint.getConfiguration().getRedrivePolicy());
assertNull(endpoint.getConfiguration().getRegion());
}
@@ -67,6 +68,7 @@ public class SqsComponentConfigurationTest extends CamelTestSupport {
assertNull(endpoint.getConfiguration().getMaximumMessageSize());
assertNull(endpoint.getConfiguration().getMessageRetentionPeriod());
assertNull(endpoint.getConfiguration().getPolicy());
+ assertNull(endpoint.getConfiguration().getRedrivePolicy());
assertNull(endpoint.getConfiguration().getRegion());
}
@@ -88,7 +90,8 @@ public class SqsComponentConfigurationTest extends CamelTestSupport {
+ "%7B%22Version%22%3A%222008-10-17%22%2C%22Id%22%3A%22%2F195004372649%2FMyQueue%2FSQSDefaultPolicy%22%2C%22Statement%22%3A%5B%7B%22Sid%22%3A%22Queue1ReceiveMessage%22%2C%22"
+ "Effect%22%3A%22Allow%22%2C%22Principal%22%3A%7B%22AWS%22%3A%22*%22%7D%2C%22Action%22%3A%22SQS%3AReceiveMessage%22%2C%22Resource%22%3A%22%2F195004372649%2FMyQueue%22%7D%5D%7D"
+ "&delaySeconds=123&receiveMessageWaitTimeSeconds=10&waitTimeSeconds=20"
- + "&queueOwnerAWSAccountId=111222333®ion=us-east-1");
+ + "&queueOwnerAWSAccountId=111222333®ion=us-east-1"
+ + "&redrivePolicy={\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:195004372649:MyDeadLetterQueue\"}");
assertEquals("MyQueue", endpoint.getConfiguration().getQueueName());
assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
@@ -104,6 +107,7 @@ public class SqsComponentConfigurationTest extends CamelTestSupport {
assertEquals("{\"Version\":\"2008-10-17\",\"Id\":\"/195004372649/MyQueue/SQSDefaultPolicy\",\"Statement\":[{\"Sid\":\"Queue1ReceiveMessage\",\"Effect\":\"Allow\",\"Principal\":"
+ "{\"AWS\":\"*\"},\"Action\":\"SQS:ReceiveMessage\",\"Resource\":\"/195004372649/MyQueue\"}]}",
endpoint.getConfiguration().getPolicy());
+ assertEquals("{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:195004372649:MyDeadLetterQueue\"}", endpoint.getConfiguration().getRedrivePolicy());
assertEquals(new Integer(123), endpoint.getConfiguration().getDelaySeconds());
assertEquals(Integer.valueOf(10), endpoint.getConfiguration().getReceiveMessageWaitTimeSeconds());
assertEquals(Integer.valueOf(20), endpoint.getConfiguration().getWaitTimeSeconds());
[2/3] camel git commit: CAMEL-8076 Fixed the issue that CamelJob
cannot find right QuartzEndpoint when recoverying the job
Posted by ni...@apache.org.
CAMEL-8076 Fixed the issue that CamelJob cannot find right QuartzEndpoint when recoverying the job
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff59faef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff59faef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff59faef
Branch: refs/heads/master
Commit: ff59faefa559d6ffe22020b51b668b647ef01041
Parents: aa6e20a
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Dec 3 21:43:21 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Dec 3 21:57:52 2014 +0800
----------------------------------------------------------------------
.../camel/component/quartz2/CamelJob.java | 9 +++--
...rtzConsumerTwoAppsClusteredRecoveryTest.java | 35 ++++++++++++++++----
...ingQuartzConsumerRecoveryClusteredAppOne.xml | 6 ++--
3 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
index 407b93b..3e74abc 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
@@ -23,8 +23,10 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Route;
import org.quartz.Job;
+import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
import org.quartz.SchedulerContext;
import org.quartz.SchedulerException;
import org.quartz.TriggerKey;
@@ -91,10 +93,12 @@ public class CamelJob implements Job {
protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, JobExecutionContext quartzContext) throws JobExecutionException {
TriggerKey triggerKey = quartzContext.getTrigger().getKey();
+ JobDetail jobDetail = quartzContext.getJobDetail();
+ JobKey jobKey = jobDetail.getKey();
if (LOG.isDebugEnabled()) {
LOG.debug("Looking up existing QuartzEndpoint with triggerKey={}", triggerKey);
}
-
+
// check all active routes for the quartz endpoint this task matches
// as we prefer to use the existing endpoint from the routes
for (Route route : camelContext.getRoutes()) {
@@ -108,7 +112,8 @@ public class CamelJob implements Job {
if (LOG.isTraceEnabled()) {
LOG.trace("Checking route endpoint={} with checkTriggerKey={}", quartzEndpoint, checkTriggerKey);
}
- if (triggerKey.equals(checkTriggerKey)) {
+ if (triggerKey.equals(checkTriggerKey)
+ || (jobDetail.requestsRecovery() && jobKey.getGroup().equals(checkTriggerKey.getGroup()) && jobKey.getName().equals(checkTriggerKey.getName()))) {
return quartzEndpoint;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
index 4fe1ab9..9758097 100644
--- a/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
+++ b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
@@ -19,10 +19,14 @@ package org.apache.camel.component.quartz2;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.TestSupport;
import org.apache.camel.util.IOHelper;
import org.junit.Test;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
@@ -42,19 +46,20 @@ public class SpringQuartzConsumerTwoAppsClusteredRecoveryTest extends TestSuppor
// now launch the first clustered app which will acquire the quartz database lock and become the master
AbstractXmlApplicationContext app = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml");
app.start();
-
- // as well as the second one which will run in slave mode as it will not be able to acquire the same lock
- AbstractXmlApplicationContext app2 = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppTwo.xml");
- app2.start();
-
+
// now let's simulate a crash of the first app (the quartz instance 'app-one')
log.warn("The first app is going to crash NOW!");
- IOHelper.close(app);
log.warn("Crashed...");
log.warn("Crashed...");
log.warn("Crashed...");
-
+
+ Thread.sleep(2000);
+
+ // as well as the second one which will run in slave mode as it will not be able to acquire the same lock
+ AbstractXmlApplicationContext app2 = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppTwo.xml");
+ app2.start();
+
// wait long enough until the second app takes it over...
Thread.sleep(20000);
// inside the logs one can then clearly see how the route of the second app ('app-two') starts consuming:
@@ -91,5 +96,21 @@ public class SpringQuartzConsumerTwoAppsClusteredRecoveryTest extends TestSuppor
}
}
+
+ public static class MyProcessor implements Processor, ApplicationContextAware {
+ ApplicationContext applicationContext;
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // shutdown the application context;
+ ((AbstractXmlApplicationContext)applicationContext).close();
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml b/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
index a01b1c0..91c07b1 100644
--- a/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
+++ b/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
@@ -58,6 +58,8 @@
</props>
</property>
</bean>
+
+ <bean id="myProcessor" class="org.apache.camel.component.quartz2.SpringQuartzConsumerTwoAppsClusteredRecoveryTest$MyProcessor" />
<camelContext id="camelContext" shutdownEager="false" xmlns="http://camel.apache.org/schema/spring">
<template id="template" />
@@ -67,9 +69,7 @@
<simple>clustering PINGS!</simple>
</transform>
<to uri="log:triggered" />
- <!--delay>
- <constant>10000</constant>
- </delay-->
+ <process ref="myProcessor"/>
<to uri="mock:result" />
</route>
</camelContext>