You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/20 14:47:27 UTC
camel git commit: CAMEL-9603 Documentation for Kinesis producer.
Repository: camel
Updated Branches:
refs/heads/master bbf15fdd8 -> ef669f95a
CAMEL-9603 Documentation for Kinesis producer.
Fixed one of the headers, even though it means something similar its a discreet value from Kinesis.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ef669f95
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ef669f95
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ef669f95
Branch: refs/heads/master
Commit: ef669f95a7cc95ad069db6e43cab39e294a26a19
Parents: bbf15fd
Author: John D. Ament <jo...@apache.org>
Authored: Sat Feb 20 07:39:53 2016 -0500
Committer: John D. Ament <jo...@apache.org>
Committed: Sat Feb 20 07:43:48 2016 -0500
----------------------------------------------------------------------
.../camel-aws/src/main/docs/aws-kinesis.adoc | 29 ++++++++++++++++++++
.../component/aws/kinesis/KinesisConstants.java | 5 ++--
.../component/aws/kinesis/KinesisProducer.java | 4 +--
.../aws/kinesis/KinesisConsumerTest.java | 11 ++++----
.../aws/kinesis/KinesisProducerTest.java | 6 ++--
5 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/main/docs/aws-kinesis.adoc
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/docs/aws-kinesis.adoc b/components/camel-aws/src/main/docs/aws-kinesis.adoc
index 5f0c28b..6889008 100644
--- a/components/camel-aws/src/main/docs/aws-kinesis.adoc
+++ b/components/camel-aws/src/main/docs/aws-kinesis.adoc
@@ -114,6 +114,35 @@ however, a
different http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html[AWSCredentialsProvider]
can be specified when calling createClient(...).
+[[AWS-Kinesis-MessageheaderssetbytheKinesisproducer]]
+Message headers used by the Kinesis producer to write to Kinesis. The producer expects that the message body is a `ByteBuffer`.
++++++++++++++++++++++++++++++++++++++++
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsKinesisPartitionKey` |`String` |The PartitionKey to pass to Kinesis to store this record.
+
+|`CamelAwsKinesisSequenceNumber` |`String` |Optional paramter to indicate the sequence number of this record.
+
+|=======================================================================
+
+Message headers set by the Kinesis producer on successful storage of a Record
++++++++++++++++++++++++++++++++++++++++
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsKinesisSequenceNumber` |`String` |The sequence number of the record, as defined in
+http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_ResponseSyntax[Response Syntax]
+
+|`CamelAwsKinesisShardId` |`String` |The shard ID of where the Record was stored
+
+
+|=======================================================================
+
[[AWS-KINESIS-Dependencies]]
Dependencies
^^^^^^^^^^^^
http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
index 22da493..23891c6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
@@ -23,8 +23,7 @@ public interface KinesisConstants {
String PARTITION_KEY = "CamelAwsKinesisPartitionKey";
/**
- * in a Kinesis Record object, the shard ID is obtained from the getPartitionKey method.
+ * in a Kinesis Record object, the shard ID is used on writes to indicate where the data was stored
*/
- String SHARD_ID = "CamelAwsKinesisPartitionKey";
-
+ String SHARD_ID = "CamelAwsKinesisShardId";
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
index 3c48239..8e0c0a3 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
@@ -54,12 +54,10 @@ public class KinesisProducer extends DefaultProducer {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setData(body);
putRecordRequest.setStreamName(getEndpoint().getStreamName());
+ putRecordRequest.setPartitionKey(partitionKey.toString());
if (sequenceNumber != null) {
putRecordRequest.setSequenceNumberForOrdering(sequenceNumber.toString());
}
- if (partitionKey != null) {
- putRecordRequest.setPartitionKey(partitionKey.toString());
- }
return putRecordRequest;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
index 62f5b14..8478f26 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
@@ -144,13 +144,15 @@ public class KinesisConsumerTest {
@Test
public void exchangePropertiesAreSet() throws Exception {
+ String partitionKey = "partitionKey";
+ String sequenceNumber = "1";
when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
.thenReturn(new GetRecordsResult()
.withNextShardIterator("nextShardIterator")
.withRecords(new Record()
- .withSequenceNumber("1")
+ .withSequenceNumber(sequenceNumber)
.withApproximateArrivalTimestamp(new Date(42))
- .withPartitionKey("shardId")
+ .withPartitionKey(partitionKey)
)
);
@@ -160,9 +162,8 @@ public class KinesisConsumerTest {
verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class));
assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L));
- assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is("shardId"));
- assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is("1"));
- assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SHARD_ID, String.class), is("shardId"));
+ assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is(partitionKey));
+ assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is(sequenceNumber));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
index 3db0023..d0e3250 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
@@ -43,6 +43,7 @@ public class KinesisProducerTest {
private static final String SEQUENCE_NUMBER = "SEQ123";
private static final String STREAM_NAME = "streams";
private static final String SAMPLE_RECORD_BODY = "SAMPLE";
+ private static final String PARTITION_KEY = "partition";
private static final ByteBuffer SAMPLE_BUFFER = ByteBuffer.wrap(SAMPLE_RECORD_BODY.getBytes());
@Mock
@@ -70,6 +71,7 @@ public class KinesisProducerTest {
when(exchange.getPattern()).thenReturn(ExchangePattern.InOut);
when(inMessage.getBody(ByteBuffer.class)).thenReturn(SAMPLE_BUFFER);
+ when(inMessage.getHeader(KinesisConstants.PARTITION_KEY)).thenReturn(PARTITION_KEY);
when(putRecordResult.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER);
when(putRecordResult.getShardId()).thenReturn(SHARD_ID);
@@ -95,10 +97,8 @@ public class KinesisProducerTest {
@Test
public void shouldHaveProperHeadersWhenSending() throws Exception {
- String partitionKey = "partition";
String seqNoForOrdering = "1851";
when(inMessage.getHeader(KinesisConstants.SEQUENCE_NUMBER)).thenReturn(seqNoForOrdering);
- when(inMessage.getHeader(KinesisConstants.PARTITION_KEY)).thenReturn(partitionKey);
kinesisProducer.process(exchange);
@@ -106,7 +106,7 @@ public class KinesisProducerTest {
verify(kinesisClient).putRecord(capture.capture());
PutRecordRequest request = capture.getValue();
- assertEquals(partitionKey, request.getPartitionKey());
+ assertEquals(PARTITION_KEY, request.getPartitionKey());
assertEquals(seqNoForOrdering, request.getSequenceNumberForOrdering());
verify(outMessage).setHeader(KinesisConstants.SEQUENCE_NUMBER, SEQUENCE_NUMBER);
verify(outMessage).setHeader(KinesisConstants.SHARD_ID, SHARD_ID);