You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/20 14:47:27 UTC

camel git commit: CAMEL-9603 Documentation for Kinesis producer.

Repository: camel
Updated Branches:
  refs/heads/master bbf15fdd8 -> ef669f95a


CAMEL-9603 Documentation for Kinesis producer.

Fixed one of the headers, even though it means something similar its a discreet value from Kinesis.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ef669f95
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ef669f95
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ef669f95

Branch: refs/heads/master
Commit: ef669f95a7cc95ad069db6e43cab39e294a26a19
Parents: bbf15fd
Author: John D. Ament <jo...@apache.org>
Authored: Sat Feb 20 07:39:53 2016 -0500
Committer: John D. Ament <jo...@apache.org>
Committed: Sat Feb 20 07:43:48 2016 -0500

----------------------------------------------------------------------
 .../camel-aws/src/main/docs/aws-kinesis.adoc    | 29 ++++++++++++++++++++
 .../component/aws/kinesis/KinesisConstants.java |  5 ++--
 .../component/aws/kinesis/KinesisProducer.java  |  4 +--
 .../aws/kinesis/KinesisConsumerTest.java        | 11 ++++----
 .../aws/kinesis/KinesisProducerTest.java        |  6 ++--
 5 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/main/docs/aws-kinesis.adoc
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/docs/aws-kinesis.adoc b/components/camel-aws/src/main/docs/aws-kinesis.adoc
index 5f0c28b..6889008 100644
--- a/components/camel-aws/src/main/docs/aws-kinesis.adoc
+++ b/components/camel-aws/src/main/docs/aws-kinesis.adoc
@@ -114,6 +114,35 @@ however, a
 different http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html[AWSCredentialsProvider]
 can be specified when calling createClient(...).
 
+[[AWS-Kinesis-MessageheaderssetbytheKinesisproducer]]
+Message headers used by the Kinesis producer to write to Kinesis.  The producer expects that the message body is a `ByteBuffer`.
++++++++++++++++++++++++++++++++++++++++
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsKinesisPartitionKey` |`String` |The PartitionKey to pass to Kinesis to store this record.
+
+|`CamelAwsKinesisSequenceNumber` |`String` |Optional paramter to indicate the sequence number of this record.
+
+|=======================================================================
+
+Message headers set by the Kinesis producer on successful storage of a Record
++++++++++++++++++++++++++++++++++++++++
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsKinesisSequenceNumber` |`String` |The sequence number of the record, as defined in
+http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_ResponseSyntax[Response Syntax]
+
+|`CamelAwsKinesisShardId` |`String` |The shard ID of where the Record was stored
+
+
+|=======================================================================
+
 [[AWS-KINESIS-Dependencies]]
 Dependencies
 ^^^^^^^^^^^^

http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
index 22da493..23891c6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
@@ -23,8 +23,7 @@ public interface KinesisConstants {
     String PARTITION_KEY = "CamelAwsKinesisPartitionKey";
 
     /**
-     * in a Kinesis Record object, the shard ID is obtained from the getPartitionKey method.
+     * in a Kinesis Record object, the shard ID is used on writes to indicate where the data was stored
      */
-    String SHARD_ID = "CamelAwsKinesisPartitionKey";
-
+    String SHARD_ID = "CamelAwsKinesisShardId";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
index 3c48239..8e0c0a3 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
@@ -54,12 +54,10 @@ public class KinesisProducer extends DefaultProducer {
         PutRecordRequest putRecordRequest = new PutRecordRequest();
         putRecordRequest.setData(body);
         putRecordRequest.setStreamName(getEndpoint().getStreamName());
+        putRecordRequest.setPartitionKey(partitionKey.toString());
         if (sequenceNumber != null) {
             putRecordRequest.setSequenceNumberForOrdering(sequenceNumber.toString());
         }
-        if (partitionKey != null) {
-            putRecordRequest.setPartitionKey(partitionKey.toString());
-        }
         return putRecordRequest;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
index 62f5b14..8478f26 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
@@ -144,13 +144,15 @@ public class KinesisConsumerTest {
 
     @Test
     public void exchangePropertiesAreSet() throws Exception {
+        String partitionKey = "partitionKey";
+        String sequenceNumber = "1";
         when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
             .thenReturn(new GetRecordsResult()
                 .withNextShardIterator("nextShardIterator")
                 .withRecords(new Record()
-                    .withSequenceNumber("1")
+                    .withSequenceNumber(sequenceNumber)
                     .withApproximateArrivalTimestamp(new Date(42))
-                    .withPartitionKey("shardId")
+                    .withPartitionKey(partitionKey)
                 )
             );
 
@@ -160,9 +162,8 @@ public class KinesisConsumerTest {
 
         verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class));
         assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L));
-        assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is("shardId"));
-        assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is("1"));
-        assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SHARD_ID, String.class), is("shardId"));
+        assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is(partitionKey));
+        assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is(sequenceNumber));
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
index 3db0023..d0e3250 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
@@ -43,6 +43,7 @@ public class KinesisProducerTest {
     private static final String SEQUENCE_NUMBER = "SEQ123";
     private static final String STREAM_NAME = "streams";
     private static final String SAMPLE_RECORD_BODY = "SAMPLE";
+    private static final String PARTITION_KEY = "partition";
     private static final ByteBuffer SAMPLE_BUFFER = ByteBuffer.wrap(SAMPLE_RECORD_BODY.getBytes());
 
     @Mock
@@ -70,6 +71,7 @@ public class KinesisProducerTest {
         when(exchange.getPattern()).thenReturn(ExchangePattern.InOut);
 
         when(inMessage.getBody(ByteBuffer.class)).thenReturn(SAMPLE_BUFFER);
+        when(inMessage.getHeader(KinesisConstants.PARTITION_KEY)).thenReturn(PARTITION_KEY);
 
         when(putRecordResult.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER);
         when(putRecordResult.getShardId()).thenReturn(SHARD_ID);
@@ -95,10 +97,8 @@ public class KinesisProducerTest {
 
     @Test
     public void shouldHaveProperHeadersWhenSending() throws Exception {
-        String partitionKey = "partition";
         String seqNoForOrdering = "1851";
         when(inMessage.getHeader(KinesisConstants.SEQUENCE_NUMBER)).thenReturn(seqNoForOrdering);
-        when(inMessage.getHeader(KinesisConstants.PARTITION_KEY)).thenReturn(partitionKey);
 
         kinesisProducer.process(exchange);
 
@@ -106,7 +106,7 @@ public class KinesisProducerTest {
         verify(kinesisClient).putRecord(capture.capture());
         PutRecordRequest request = capture.getValue();
 
-        assertEquals(partitionKey, request.getPartitionKey());
+        assertEquals(PARTITION_KEY, request.getPartitionKey());
         assertEquals(seqNoForOrdering, request.getSequenceNumberForOrdering());
         verify(outMessage).setHeader(KinesisConstants.SEQUENCE_NUMBER, SEQUENCE_NUMBER);
         verify(outMessage).setHeader(KinesisConstants.SHARD_ID, SHARD_ID);