You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/12/03 15:00:00 UTC

[1/3] camel git commit: CAMEL-8092 Fixed the cxf:producer Matrix Params missing issue with thanks to kumarann

Repository: camel
Updated Branches:
  refs/heads/master aa6e20a77 -> ced84063a


CAMEL-8092 Fixed the cxf:producer Matrix Params missing issue with thanks to kumarann


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a3bf8472
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a3bf8472
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a3bf8472

Branch: refs/heads/master
Commit: a3bf8472a527b96d3b5a974aaf47a355c207ba9f
Parents: ff59fae
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Dec 3 21:45:15 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Dec 3 21:57:52 2014 +0800

----------------------------------------------------------------------
 .../component/cxf/jaxrs/CxfRsProducer.java      | 42 ++++++++++++++++++++
 1 file changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a3bf8472/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index 6a092ce..734d972 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -44,6 +44,7 @@ import org.apache.cxf.jaxrs.JAXRSServiceFactoryBean;
 import org.apache.cxf.jaxrs.client.Client;
 import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean;
 import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.message.MessageImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,6 +120,30 @@ public class CxfRsProducer extends DefaultProducer {
         
     }
     
+    protected void setupClientMatrix(WebClient client, Exchange exchange) throws Exception {
+        
+        org.apache.cxf.message.Message cxfMessage = (org.apache.cxf.message.Message) exchange.getIn().getHeader("CamelCxfMessage");
+        if (cxfMessage != null) {
+            String requestURL = (String)cxfMessage.get("org.apache.cxf.request.uri"); 
+            String matrixParam = null;
+            int matrixStart = requestURL.indexOf(";");
+            int matrixEnd = requestURL.indexOf("?") > -1 ? requestURL.indexOf("?") : requestURL.length();
+            Map<String, String> maps = null;
+            if (requestURL != null && matrixStart > 0) {
+                matrixParam = requestURL.substring(matrixStart + 1, matrixEnd);
+                if (matrixParam != null) {
+                    maps = getMatrixParametersFromMatrixString(matrixParam, IOHelper.getCharsetName(exchange));
+                }
+            }
+            if (maps != null) {
+                for (Map.Entry<String, String> entry : maps.entrySet()) {
+                    client.matrix(entry.getKey(), entry.getValue());
+                    LOG.debug("Matrix param " + entry.getKey() + " :: " + entry.getValue());
+                }
+            }
+        }
+    }
+    
     protected void setupClientHeaders(Client client, Exchange exchange) throws Exception {
         Message inMessage = exchange.getIn();
         CxfRsEndpoint cxfRsEndpoint = (CxfRsEndpoint) getEndpoint();
@@ -175,6 +200,8 @@ public class CxfRsProducer extends DefaultProducer {
                 }
             }
         }
+        
+        setupClientMatrix(client, exchange); 
 
         setupClientQueryAndHeaders(client, exchange);
         
@@ -345,6 +372,21 @@ public class CxfRsProducer extends DefaultProducer {
         return answer;
     }
 
+    private Map<String, String> getMatrixParametersFromMatrixString(String matrixString, String charset) throws UnsupportedEncodingException {
+        Map<String, String> answer  = new LinkedHashMap<String, String>();
+        for (String param : matrixString.split(";")) {
+            String[] pair = param.split("=", 2);
+            if (pair.length == 2) {
+                String name = URLDecoder.decode(pair[0], charset);
+                String value = URLDecoder.decode(pair[1], charset);
+                answer.put(name, value);
+            } else {
+                throw new IllegalArgumentException("Invalid parameter, expected to be a pair but was " + param);
+            }
+        }
+        return answer;
+    }
+    
     private String arrayToString(Object[] array) {
         StringBuilder buffer = new StringBuilder("[");
         for (Object obj : array) {


[3/3] camel git commit: CAMEL-8105 support for redrivePolicy inside SQSEndpoint with thanks to Rufus

Posted by ni...@apache.org.
CAMEL-8105 support for redrivePolicy inside SQSEndpoint with thanks to Rufus


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ced84063
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ced84063
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ced84063

Branch: refs/heads/master
Commit: ced84063a2f56ce555cc2a6a4df53afc4d35c5be
Parents: a3bf847
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Dec 3 21:51:18 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Dec 3 21:59:37 2014 +0800

----------------------------------------------------------------------
 .../camel/component/aws/sqs/SqsConfiguration.java       | 12 ++++++++++++
 .../org/apache/camel/component/aws/sqs/SqsEndpoint.java |  6 ++++++
 .../aws/sqs/SqsComponentConfigurationTest.java          |  6 +++++-
 3 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index de24cc5..718aee0 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -52,6 +52,9 @@ public class SqsConfiguration {
     private Integer messageRetentionPeriod;
     private Integer receiveMessageWaitTimeSeconds;
     private String policy;
+    
+    // dead letter queue properties
+    private String redrivePolicy;
 
     public void setAmazonSQSEndpoint(String amazonSQSEndpoint) {
         this.amazonSQSEndpoint = amazonSQSEndpoint;
@@ -165,6 +168,14 @@ public class SqsConfiguration {
         this.policy = policy;
     }
 
+    public String getRedrivePolicy() {
+        return redrivePolicy;
+    }
+
+    public void setRedrivePolicy(String redrivePolicy) {
+        this.redrivePolicy = redrivePolicy;
+    }
+
     public boolean isExtendMessageVisibility() {
         return this.extendMessageVisibility;
     }
@@ -231,6 +242,7 @@ public class SqsConfiguration {
             + ", receiveMessageWaitTimeSeconds=" + receiveMessageWaitTimeSeconds
             + ", delaySeconds=" + delaySeconds
             + ", policy=" + policy
+            + ", redrivePolicy=" + redrivePolicy
             + ", extendMessageVisibility=" + extendMessageVisibility
             + ", queueOwnerAWSAccountId=" + queueOwnerAWSAccountId
             + ", region=" + region

http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index 8ed85c5..29ae3db 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -153,6 +153,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt
         if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) {
             request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds()));
         }
+        if (getConfiguration().getRedrivePolicy() != null) {
+            request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy());
+        }
         LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
         
         CreateQueueResult queueResult = client.createQueue(request);
@@ -179,6 +182,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt
         if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) {
             request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds()));
         }
+        if (getConfiguration().getRedrivePolicy() != null) {
+            request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy());
+        }
         if (!request.getAttributes().isEmpty()) {
             LOG.trace("Updating queue '{}' with the provided queue attributes...", configuration.getQueueName());
             client.setQueueAttributes(request);

http://git-wip-us.apache.org/repos/asf/camel/blob/ced84063/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
index e7fb9dc..e7eac1f 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java
@@ -43,6 +43,7 @@ public class SqsComponentConfigurationTest extends CamelTestSupport {
         assertNull(endpoint.getConfiguration().getMaximumMessageSize());
         assertNull(endpoint.getConfiguration().getMessageRetentionPeriod());
         assertNull(endpoint.getConfiguration().getPolicy());
+        assertNull(endpoint.getConfiguration().getRedrivePolicy());
         assertNull(endpoint.getConfiguration().getRegion());
     }
     
@@ -67,6 +68,7 @@ public class SqsComponentConfigurationTest extends CamelTestSupport {
         assertNull(endpoint.getConfiguration().getMaximumMessageSize());
         assertNull(endpoint.getConfiguration().getMessageRetentionPeriod());
         assertNull(endpoint.getConfiguration().getPolicy());
+        assertNull(endpoint.getConfiguration().getRedrivePolicy());
         assertNull(endpoint.getConfiguration().getRegion());
     }
     
@@ -88,7 +90,8 @@ public class SqsComponentConfigurationTest extends CamelTestSupport {
                 + "%7B%22Version%22%3A%222008-10-17%22%2C%22Id%22%3A%22%2F195004372649%2FMyQueue%2FSQSDefaultPolicy%22%2C%22Statement%22%3A%5B%7B%22Sid%22%3A%22Queue1ReceiveMessage%22%2C%22"
                 + "Effect%22%3A%22Allow%22%2C%22Principal%22%3A%7B%22AWS%22%3A%22*%22%7D%2C%22Action%22%3A%22SQS%3AReceiveMessage%22%2C%22Resource%22%3A%22%2F195004372649%2FMyQueue%22%7D%5D%7D"
                 + "&delaySeconds=123&receiveMessageWaitTimeSeconds=10&waitTimeSeconds=20"
-                + "&queueOwnerAWSAccountId=111222333&region=us-east-1");
+                + "&queueOwnerAWSAccountId=111222333&region=us-east-1"
+                + "&redrivePolicy={\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:195004372649:MyDeadLetterQueue\"}");
         
         assertEquals("MyQueue", endpoint.getConfiguration().getQueueName());
         assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
@@ -104,6 +107,7 @@ public class SqsComponentConfigurationTest extends CamelTestSupport {
         assertEquals("{\"Version\":\"2008-10-17\",\"Id\":\"/195004372649/MyQueue/SQSDefaultPolicy\",\"Statement\":[{\"Sid\":\"Queue1ReceiveMessage\",\"Effect\":\"Allow\",\"Principal\":"
                 + "{\"AWS\":\"*\"},\"Action\":\"SQS:ReceiveMessage\",\"Resource\":\"/195004372649/MyQueue\"}]}",
                 endpoint.getConfiguration().getPolicy());
+        assertEquals("{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:195004372649:MyDeadLetterQueue\"}", endpoint.getConfiguration().getRedrivePolicy());
         assertEquals(new Integer(123), endpoint.getConfiguration().getDelaySeconds());
         assertEquals(Integer.valueOf(10), endpoint.getConfiguration().getReceiveMessageWaitTimeSeconds());
         assertEquals(Integer.valueOf(20), endpoint.getConfiguration().getWaitTimeSeconds());


[2/3] camel git commit: CAMEL-8076 Fixed the issue that CamelJob cannot find right QuartzEndpoint when recoverying the job

Posted by ni...@apache.org.
CAMEL-8076 Fixed the issue that CamelJob cannot find right QuartzEndpoint when recoverying the job


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff59faef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff59faef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff59faef

Branch: refs/heads/master
Commit: ff59faefa559d6ffe22020b51b668b647ef01041
Parents: aa6e20a
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Dec 3 21:43:21 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Dec 3 21:57:52 2014 +0800

----------------------------------------------------------------------
 .../camel/component/quartz2/CamelJob.java       |  9 +++--
 ...rtzConsumerTwoAppsClusteredRecoveryTest.java | 35 ++++++++++++++++----
 ...ingQuartzConsumerRecoveryClusteredAppOne.xml |  6 ++--
 3 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
index 407b93b..3e74abc 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
@@ -23,8 +23,10 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Route;
 import org.quartz.Job;
+import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
 import org.quartz.SchedulerContext;
 import org.quartz.SchedulerException;
 import org.quartz.TriggerKey;
@@ -91,10 +93,12 @@ public class CamelJob implements Job {
 
     protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, JobExecutionContext quartzContext) throws JobExecutionException {
         TriggerKey triggerKey = quartzContext.getTrigger().getKey();
+        JobDetail jobDetail = quartzContext.getJobDetail(); 
+        JobKey jobKey =  jobDetail.getKey();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Looking up existing QuartzEndpoint with triggerKey={}", triggerKey);
         }
-
+        
         // check all active routes for the quartz endpoint this task matches
         // as we prefer to use the existing endpoint from the routes
         for (Route route : camelContext.getRoutes()) {
@@ -108,7 +112,8 @@ public class CamelJob implements Job {
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Checking route endpoint={} with checkTriggerKey={}", quartzEndpoint, checkTriggerKey);
                 }
-                if (triggerKey.equals(checkTriggerKey)) {
+                if (triggerKey.equals(checkTriggerKey)
+                    || (jobDetail.requestsRecovery() && jobKey.getGroup().equals(checkTriggerKey.getGroup()) && jobKey.getName().equals(checkTriggerKey.getName()))) {
                     return quartzEndpoint;
                 }
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
index 4fe1ab9..9758097 100644
--- a/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
+++ b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
@@ -19,10 +19,14 @@ package org.apache.camel.component.quartz2;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.TestSupport;
 import org.apache.camel.util.IOHelper;
 import org.junit.Test;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.context.support.AbstractXmlApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
@@ -42,19 +46,20 @@ public class SpringQuartzConsumerTwoAppsClusteredRecoveryTest extends TestSuppor
         // now launch the first clustered app which will acquire the quartz database lock and become the master
         AbstractXmlApplicationContext app = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml");
         app.start();
-
-        // as well as the second one which will run in slave mode as it will not be able to acquire the same lock
-        AbstractXmlApplicationContext app2 = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppTwo.xml");
-        app2.start();
-
+        
         // now let's simulate a crash of the first app (the quartz instance 'app-one')
         log.warn("The first app is going to crash NOW!");
-        IOHelper.close(app);
 
         log.warn("Crashed...");
         log.warn("Crashed...");
         log.warn("Crashed...");
-
+        
+        Thread.sleep(2000);
+        
+        // as well as the second one which will run in slave mode as it will not be able to acquire the same lock
+        AbstractXmlApplicationContext app2 = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppTwo.xml");
+        app2.start();
+        
         // wait long enough until the second app takes it over...
         Thread.sleep(20000);
         // inside the logs one can then clearly see how the route of the second app ('app-two') starts consuming:
@@ -91,5 +96,21 @@ public class SpringQuartzConsumerTwoAppsClusteredRecoveryTest extends TestSuppor
         }
 
     }
+    
+    public static class MyProcessor implements Processor, ApplicationContextAware {
+        ApplicationContext applicationContext;
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            // shutdown the application context;
+            ((AbstractXmlApplicationContext)applicationContext).close();
+        }
+
+        @Override
+        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+            this.applicationContext = applicationContext;
+        }
+        
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml b/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
index a01b1c0..91c07b1 100644
--- a/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
+++ b/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
@@ -58,6 +58,8 @@
       </props>
     </property>
   </bean>
+  
+  <bean id="myProcessor" class="org.apache.camel.component.quartz2.SpringQuartzConsumerTwoAppsClusteredRecoveryTest$MyProcessor" />
 
   <camelContext id="camelContext" shutdownEager="false" xmlns="http://camel.apache.org/schema/spring">
     <template id="template" />
@@ -67,9 +69,7 @@
         <simple>clustering PINGS!</simple>
       </transform>
       <to uri="log:triggered" />
-      <!--delay>
-         <constant>10000</constant>
-       </delay-->
+      <process ref="myProcessor"/>
       <to uri="mock:result" />
     </route>
   </camelContext>