You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/05 10:41:24 UTC
[2/3] camel git commit: Fixed CS. Fixes #708. Fixes #706. Fixes #705.
Fixed CS. Fixes #708. Fixes #706. Fixes #705.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92f6c9b9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92f6c9b9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92f6c9b9
Branch: refs/heads/master
Commit: 92f6c9b93dfde4ae3ffe3182c082db29b4c03e71
Parents: 72370df
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Dec 5 10:35:53 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Dec 5 10:35:53 2015 +0100
----------------------------------------------------------------------
.../component/aws/kinesis/KinesisComponent.java | 1 +
.../component/aws/kinesis/KinesisConstants.java | 9 +--
.../component/aws/kinesis/KinesisConsumer.java | 25 +++----
.../component/aws/kinesis/KinesisEndpoint.java | 11 ++-
.../aws/kinesis/RecordStringConverter.java | 29 ++++----
.../services/org/apache/camel/TypeConverter | 2 +-
.../aws/kinesis/KinesisConsumerTest.java | 70 +++++++++++---------
.../aws/kinesis/KinesisEndpointTest.java | 16 +++--
.../aws/kinesis/RecordStringConverterTest.java | 11 +--
9 files changed, 91 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
index d3f34ab..9740600 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.aws.kinesis;
import java.util.Map;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
index b028123..22da493 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
@@ -18,12 +18,13 @@ package org.apache.camel.component.aws.kinesis;
public interface KinesisConstants {
- public static final String SEQUENCE_NUMBER = "KinesisSequenceNumber";
- public static final String APPROX_ARRIVAL_TIME = "KinesisApproximateArrivalTimestamp";
- public static final String PARTITION_KEY = "KinesisPartitionKey";
+ String SEQUENCE_NUMBER = "CamelAwsKinesisSequenceNumber";
+ String APPROX_ARRIVAL_TIME = "CamelAwsKinesisApproximateArrivalTimestamp";
+ String PARTITION_KEY = "CamelAwsKinesisPartitionKey";
+
/**
* in a Kinesis Record object, the shard ID is obtained from the getPartitionKey method.
*/
- public static final String SHARD_ID = "KinesisPartitionKey";
+ String SHARD_ID = "CamelAwsKinesisPartitionKey";
}
http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
index b301f38..def7e63 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.aws.kinesis;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
@@ -24,9 +28,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.Record;
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -39,21 +40,17 @@ import org.slf4j.LoggerFactory;
public class KinesisConsumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
- private String currentShardIterator = null;
+ private String currentShardIterator;
public KinesisConsumer(KinesisEndpoint endpoint, Processor processor) {
super(endpoint, processor);
}
- /*
- * Returns the number of messages polled.
- */
@Override
protected int poll() throws Exception {
GetRecordsRequest req = new GetRecordsRequest()
.withShardIterator(getShardItertor())
- .withLimit(getEndpoint().getMaxResultsPerRequest())
- ;
+ .withLimit(getEndpoint().getMaxResultsPerRequest());
GetRecordsResult result = getClient().getRecords(req);
Queue<Exchange> exchanges = createExchanges(result.getRecords());
@@ -61,8 +58,8 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
// May cache the last successful sequence number, and pass it to the
// getRecords request. That way, on the next poll, we start from where
- // we left off, however, I don't know what happens to subsiquent
- // exchanges when an earlier echange fails.
+ // we left off, however, I don't know what happens to subsequent
+ // exchanges when an earlier echangee fails.
currentShardIterator = result.getNextShardIterator();
@@ -100,15 +97,13 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
// either return a cached one or get a new one via a GetShardIterator request.
if (currentShardIterator == null) {
DescribeStreamRequest req1 = new DescribeStreamRequest()
- .withStreamName(getEndpoint().getStreamName())
- ;
+ .withStreamName(getEndpoint().getStreamName());
DescribeStreamResult res1 = getClient().describeStream(req1);
GetShardIteratorRequest req = new GetShardIteratorRequest()
.withStreamName(getEndpoint().getStreamName())
.withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard
- .withShardIteratorType(getEndpoint().getIteratorType())
- ;
+ .withShardIteratorType(getEndpoint().getIteratorType());
GetShardIteratorResult result = getClient().getShardIterator(req);
currentShardIterator = result.getShardIterator();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
index b4c7597..e34da43 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
@@ -29,10 +29,10 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
-@UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = "aws-kinesis:streamName", consumerClass = KinesisConsumer.class, label = "cloud,messaging")
+@UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = "aws-kinesis:streamName", consumerOnly = true, consumerClass = KinesisConsumer.class, label = "cloud,messaging")
public class KinesisEndpoint extends ScheduledPollEndpoint {
- @UriPath(label = "consumer,producer", description = "Name of the stream")
+ @UriPath(label = "consumer", description = "Name of the stream")
@Metadata(required = "true")
private String streamName;
@@ -41,7 +41,7 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
@Metadata(required = "true")
private AmazonKinesis amazonKinesisClient;
- @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll")
+ @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll", defaultValue = "1")
private int maxResultsPerRequest = 1;
@UriParam(label = "consumer", description = "Defines where in the Kinesis stream to start getting records")
@@ -74,7 +74,6 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
@Override
public boolean isSingleton() {
- // probably right.
return true;
}
@@ -117,7 +116,7 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
@Override
public String toString() {
- return "KinesisEndpoint{" + "amazonKinesisClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest + ", iteratorType=" + iteratorType + ", streamName=" + streamName + '}';
+ return "KinesisEndpoint{amazonKinesisClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest + ", iteratorType=" + iteratorType + ", streamName=" + streamName + '}';
}
-
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java
index bda8983..19b8590 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java
@@ -16,28 +16,31 @@
*/
package org.apache.camel.component.aws.kinesis;
-import com.amazonaws.services.kinesis.model.Record;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
+
+import com.amazonaws.services.kinesis.model.Record;
import org.apache.camel.Converter;
@Converter
-public class RecordStringConverter {
+public final class RecordStringConverter {
+
+ private RecordStringConverter() {
+ }
@Converter
public static String toString(Record record) {
- List<Byte> bytes = new ArrayList<>();
- ByteBuffer buf = record.getData().asReadOnlyBuffer();
- while (buf.hasRemaining()) {
- bytes.add(buf.get());
- }
- byte[] a = new byte[bytes.size()];
- for (int i = 0; i < bytes.size(); ++i) {
- a[i] = bytes.get(i);
+ Charset charset = Charset.forName("UTF-8");
+
+ ByteBuffer buffer = record.getData();
+ if (buffer.hasArray()) {
+ byte[] bytes = record.getData().array();
+ return new String(bytes, charset);
+ } else {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return new String(bytes, charset);
}
- return new String(a, Charset.forName("UTF-8"));
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
index 28707d6..4873a46 100644
--- a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
+++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -15,4 +15,4 @@
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.camel.component.aws.kinesis
+org.apache.camel.component.aws.kinesis.RecordStringConverter
http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
index db0df68..3f31986 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.aws.kinesis;
+import java.util.Date;
+
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
@@ -27,27 +29,32 @@ import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
-import java.util.Date;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultCamelContext;
-import static org.hamcrest.CoreMatchers.is;
-import org.junit.Test;
-import static org.junit.Assert.*;
import org.junit.Before;
+import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
-import static org.mockito.Mockito.*;
import org.mockito.runners.MockitoJUnitRunner;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
@RunWith(MockitoJUnitRunner.class)
public class KinesisConsumerTest {
- @Mock private AmazonKinesis kinesisClient;
- @Mock private AsyncProcessor processor;
+ @Mock
+ private AmazonKinesis kinesisClient;
+ @Mock
+ private AsyncProcessor processor;
private final CamelContext context = new DefaultCamelContext();
private final KinesisComponent component = new KinesisComponent(context);
@@ -62,19 +69,19 @@ public class KinesisConsumerTest {
undertest = new KinesisConsumer(endpoint, processor);
when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
- .thenReturn(new GetRecordsResult()
- .withNextShardIterator("nextShardIterator")
- );
+ .thenReturn(new GetRecordsResult()
+ .withNextShardIterator("nextShardIterator")
+ );
when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
- .thenReturn(new DescribeStreamResult()
- .withStreamDescription(new StreamDescription()
- .withShards(new Shard().withShardId("shardId"))
- )
- );
+ .thenReturn(new DescribeStreamResult()
+ .withStreamDescription(new StreamDescription()
+ .withShards(new Shard().withShardId("shardId"))
+ )
+ );
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
- .thenReturn(new GetShardIteratorResult()
- .withShardIterator("shardIterator")
- );
+ .thenReturn(new GetShardIteratorResult()
+ .withShardIterator("shardIterator")
+ );
}
@Test
@@ -103,7 +110,6 @@ public class KinesisConsumerTest {
assertThat(getRecordsReqCap.getValue().getShardIterator(), is("shardIterator"));
}
-
@Test
public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception {
undertest.poll();
@@ -121,10 +127,10 @@ public class KinesisConsumerTest {
@Test
public void recordsAreSentToTheProcessor() throws Exception {
when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
- .thenReturn(new GetRecordsResult()
- .withNextShardIterator("nextShardIterator")
- .withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2"))
- );
+ .thenReturn(new GetRecordsResult()
+ .withNextShardIterator("nextShardIterator")
+ .withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2"))
+ );
int messageCount = undertest.poll();
@@ -138,16 +144,15 @@ public class KinesisConsumerTest {
@Test
public void exchangePropertiesAreSet() throws Exception {
-
when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
- .thenReturn(new GetRecordsResult()
- .withNextShardIterator("nextShardIterator")
- .withRecords(new Record()
- .withSequenceNumber("1")
- .withApproximateArrivalTimestamp(new Date(42))
- .withPartitionKey("shardId")
- )
- );
+ .thenReturn(new GetRecordsResult()
+ .withNextShardIterator("nextShardIterator")
+ .withRecords(new Record()
+ .withSequenceNumber("1")
+ .withApproximateArrivalTimestamp(new Date(42))
+ .withPartitionKey("shardId")
+ )
+ );
undertest.poll();
@@ -159,4 +164,5 @@ public class KinesisConsumerTest {
assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.SEQUENCE_NUMBER, String.class), is("1"));
assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.SHARD_ID, String.class), is("shardId"));
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
index 50653e3..a8f87c2 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
@@ -21,18 +21,20 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
@RunWith(MockitoJUnitRunner.class)
public class KinesisEndpointTest {
- @Mock private AmazonKinesis amazonKinesisClient;
+ @Mock
+ private AmazonKinesis amazonKinesisClient;
private CamelContext camelContext;
@@ -45,11 +47,11 @@ public class KinesisEndpointTest {
@Test
public void allTheEndpointParams() throws Exception {
- KinesisEndpoint endpoint = (KinesisEndpoint)camelContext.getEndpoint("aws-kinesis://some_stream_name"
+ KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name"
+ "?amazonKinesisClient=#kinesisClient"
+ "&maxResultsPerRequest=101"
+ "&iteratorType=latest"
- );
+ );
assertThat(endpoint.getClient(), is(amazonKinesisClient));
assertThat(endpoint.getStreamName(), is("some_stream_name"));
@@ -59,9 +61,9 @@ public class KinesisEndpointTest {
@Test
public void onlyRequiredEndpointParams() throws Exception {
- KinesisEndpoint endpoint = (KinesisEndpoint)camelContext.getEndpoint("aws-kinesis://some_stream_name"
+ KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name"
+ "?amazonKinesisClient=#kinesisClient"
- );
+ );
assertThat(endpoint.getClient(), is(amazonKinesisClient));
assertThat(endpoint.getStreamName(), is("some_stream_name"));
http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java
index 48f8edb..15556d9 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java
@@ -16,12 +16,14 @@
*/
package org.apache.camel.component.aws.kinesis;
-import com.amazonaws.services.kinesis.model.Record;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
-import static org.hamcrest.CoreMatchers.is;
+
+import com.amazonaws.services.kinesis.model.Record;
import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
public class RecordStringConverterTest {
@@ -29,8 +31,7 @@ public class RecordStringConverterTest {
public void convertRecordToString() throws Exception {
Record record = new Record()
.withSequenceNumber("1")
- .withData(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8"))))
- ;
+ .withData(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8"))));
String result = RecordStringConverter.toString(record);
assertThat(result, is("this is a String"));