You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/03/21 22:52:03 UTC
svn commit: r1083979 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/test/java/org/apache/camel/impl/
components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/
components/camel-aws/src/test/java/org/apache...
Author: cmueller
Date: Mon Mar 21 21:52:03 2011
New Revision: 1083979
URL: http://svn.apache.org/viewvc?rev=1083979&view=rev
Log:
CAMEL-3794: Polish the aws component
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointTest.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=1083979&r1=1083978&r2=1083979&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Mon Mar 21 21:52:03 2011
@@ -46,7 +46,7 @@ public abstract class DefaultEndpoint ex
//Match any key-value pair in the URI query string whose key contains "passphrase" or "password" (case-insensitive).
//First capture group is the key, second is the value.
- private static final Pattern SECRETS = Pattern.compile("([?&][^=]*(?:passphrase|password)[^=]*)=([^&]*)", Pattern.CASE_INSENSITIVE);
+ private static final Pattern SECRETS = Pattern.compile("([?&][^=]*(?:passphrase|password|secretKey)[^=]*)=([^&]*)", Pattern.CASE_INSENSITIVE);
private String endpointUri;
private CamelContext camelContext;
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointTest.java?rev=1083979&r1=1083978&r2=1083979&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultEndpointTest.java Mon Mar 21 21:52:03 2011
@@ -34,6 +34,8 @@ public class DefaultEndpointTest extends
DefaultEndpoint.sanitizeUri("ftp://host.mysite.com/records?passiveMode=true&user=someuser&password=superSecret"));
assertEquals("sftp://host.mysite.com/records?user=someuser&privateKeyFile=key.file&privateKeyFilePassphrase=******&knownHostsFile=hosts.list",
DefaultEndpoint.sanitizeUri("sftp://host.mysite.com/records?user=someuser&privateKeyFile=key.file&privateKeyFilePassphrase=superSecret&knownHostsFile=hosts.list"));
+ assertEquals("aws-sqs://MyQueue?accessKey=1672t4rflhnhli3&secretKey=******",
+ DefaultEndpoint.sanitizeUri("aws-sqs://MyQueue?accessKey=1672t4rflhnhli3&secretKey=qi472qfberu33dqjncq"));
}
public void assertSanitizedUriUnchanged(String uri) {
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java?rev=1083979&r1=1083978&r2=1083979&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java Mon Mar 21 21:52:03 2011
@@ -25,7 +25,6 @@ import org.apache.camel.impl.DefaultComp
/**
* Defines the <a href="http://camel.apache.org/aws.html">AWS Component</a>
*
- * @version
*/
public class SqsComponent extends DefaultComponent {
@@ -41,13 +40,13 @@ public class SqsComponent extends Defaul
SqsConfiguration configuration = new SqsConfiguration();
setProperties(configuration, parameters);
- if (remaining == null) {
- throw new IllegalArgumentException("Queue name not specified.");
+ if (remaining == null || remaining.trim().length() == 0) {
+ throw new IllegalArgumentException("Queue name must be specified.");
}
configuration.setQueueName(remaining);
if (configuration.getAmazonSQSClient() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) {
- throw new IllegalArgumentException("AmazonSQSClient or accessKey and secretKey must be set");
+ throw new IllegalArgumentException("AmazonSQSClient or accessKey and secretKey must be specified.");
}
SqsEndpoint sqsEndpoint = new SqsEndpoint(uri, this, configuration);
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java?rev=1083979&r1=1083978&r2=1083979&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java Mon Mar 21 21:52:03 2011
@@ -23,7 +23,6 @@ import com.amazonaws.services.sqs.Amazon
/**
* The AWS SQS component configuration properties
*
- * @version
*/
public class SqsConfiguration {
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java?rev=1083979&r1=1083978&r2=1083979&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java Mon Mar 21 21:52:03 2011
@@ -19,7 +19,6 @@ package org.apache.camel.component.aws.s
/**
* Constants used in Camel AWS SQS module
*
- * @version
*/
public interface SqsConstants {
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1083979&r1=1083978&r2=1083979&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Mon Mar 21 21:52:03 2011
@@ -32,6 +32,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.NoFactoryAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.Synchronization;
@@ -43,9 +44,8 @@ import org.slf4j.LoggerFactory;
/**
* A Consumer of messages from the Amazon Web Service Simple Queue Service
- * <a href="http://aws.amazon.com/aws-sqs/">AWS SQS</a>
+ * <a href="http://aws.amazon.com/sqs/">AWS SQS</a>
*
- * @version
*/
public class SqsConsumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
@@ -68,16 +68,19 @@ public class SqsConsumer extends Schedul
request.setMaxNumberOfMessages(getMaxMessagesPerPoll() > 0 ? getMaxMessagesPerPoll() : null);
request.setVisibilityTimeout(getConfiguration().getVisibilityTimeout() != null ? getConfiguration().getVisibilityTimeout() : null);
request.setAttributeNames(getConfiguration().getAttributeNames() != null ? getConfiguration().getAttributeNames() : null);
+
+ LOG.trace("Receiving messages with request [{}]...", request);
+
ReceiveMessageResult messageResult = getClient().receiveMessage(request);
+ LOG.trace("Received {} messages", messageResult.getMessages().size());
+
Queue<Exchange> exchanges = createExchanges(messageResult.getMessages());
return processBatch(CastUtils.cast(exchanges));
}
protected Queue<Exchange> createExchanges(List<Message> messages) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Received " + messages.size() + " messages in this poll");
- }
+ LOG.trace("Received {} messages in this poll", messages.size());
Queue<Exchange> answer = new LinkedList<Exchange>();
for (Message message : messages) {
@@ -118,9 +121,7 @@ public class SqsConsumer extends Schedul
}
});
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processing exchange [" + exchange + "]...");
- }
+ LOG.trace("Processing exchange [{}]...", exchange);
getProcessor().process(exchange);
}
@@ -139,11 +140,11 @@ public class SqsConsumer extends Schedul
String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class);
DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Deleting message with receipt handle " + receiptHandle + "...");
- }
+ LOG.trace("Deleting message with receipt handle {}...", receiptHandle);
getClient().deleteMessage(deleteRequest);
+
+ LOG.trace("Message deleted");
}
} catch (AmazonClientException e) {
LOG.warn("Error occurred during deleting message", e);
@@ -161,7 +162,7 @@ public class SqsConsumer extends Schedul
if (cause != null) {
LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause);
} else {
- LOG.warn("Exchange failed, so rolling back message status: " + exchange);
+ LOG.warn("Exchange failed, so rolling back message status: {}", exchange);
}
}
@@ -225,4 +226,9 @@ public class SqsConsumer extends Schedul
public int getMaxMessagesPerPoll() {
return getEndpoint().getMaxMessagesPerPoll();
}
+
+ @Override
+ public String toString() {
+ return "SqsConsumer[" + DefaultEndpoint.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
}
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1083979&r1=1083978&r2=1083979&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java Mon Mar 21 21:52:03 2011
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
/**
* Defines the <a href="http://camel.apache.org/aws.html">AWS SQS Endpoint</a>.
*
- * @version
*/
public class SqsEndpoint extends ScheduledPollEndpoint {
@@ -77,16 +76,12 @@ public class SqsEndpoint extends Schedul
CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName());
request.setDefaultVisibilityTimeout(getConfiguration().getDefaultVisibilityTimeout() != null ? getConfiguration().getDefaultVisibilityTimeout() : null);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Creating queue [" + configuration.getQueueName() + "] with request [" + request + "]...");
- }
+ LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
CreateQueueResult queueResult = client.createQueue(request);
queueUrl = queueResult.getQueueUrl();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Queue created and available at: " + queueUrl);
- }
+ LOG.trace("Queue created and available at: {}", queueUrl);
}
@Override
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java?rev=1083979&r1=1083978&r2=1083979&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java Mon Mar 21 21:52:03 2011
@@ -23,6 +23,7 @@ import com.amazonaws.services.sqs.model.
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.NoFactoryAvailableException;
+import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory;
* A Producer which sends messages to the Amazon Web Service Simple Queue Service
* <a href="http://aws.amazon.com/sqs/">AWS SQS</a>
*
- * @version
*/
public class SqsProducer extends DefaultProducer {
@@ -45,15 +45,11 @@ public class SqsProducer extends Default
String body = exchange.getIn().getBody(String.class);
SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Sending request [" + request + "] from exchange [" + exchange + "]...");
- }
+ LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
SendMessageResult result = getClient().sendMessage(request);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Received result [" + result + "]");
- }
+ LOG.trace("Received result [{}]", result);
Message message = getMessageForResponse(exchange);
message.setHeader(SqsConstants.MESSAGE_ID, result.getMessageId());
@@ -82,4 +78,9 @@ public class SqsProducer extends Default
public SqsEndpoint getEndpoint() {
return (SqsEndpoint) super.getEndpoint();
}
+
+ @Override
+ public String toString() {
+ return "SqsProducer[" + DefaultEndpoint.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
}
Modified: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java?rev=1083979&r1=1083978&r2=1083979&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java (original)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/integration/SqsComponentIntegrationTest.java Mon Mar 21 21:52:03 2011
@@ -57,7 +57,7 @@ public class SqsComponentIntegrationTest
assertNotNull(resultExchange.getIn().getHeader(SqsConstants.ATTRIBUTES));
assertNotNull(exchange.getIn().getHeader(SqsConstants.MESSAGE_ID));
- assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", resultExchange.getIn().getHeader(SqsConstants.MD5_OF_BODY));
+ assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", exchange.getIn().getHeader(SqsConstants.MD5_OF_BODY));
}
@Test