You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/11 13:48:05 UTC
[1/4] beam git commit: Reformatting Kinesis IO to comply with
official code style
Repository: beam
Updated Branches:
refs/heads/master af08f5352 -> 138641f14
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index 49e806d..4b2190f 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -25,8 +25,10 @@ import static org.mockito.Matchers.anyListOf;
import static org.mockito.Mockito.when;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+
import java.io.IOException;
import java.util.Collections;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -40,112 +42,114 @@ import org.mockito.stubbing.Answer;
*/
@RunWith(MockitoJUnitRunner.class)
public class ShardRecordsIteratorTest {
- private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
- private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
- private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
- private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
- private static final String STREAM_NAME = "STREAM_NAME";
- private static final String SHARD_ID = "SHARD_ID";
-
- @Mock
- private SimplifiedKinesisClient kinesisClient;
- @Mock
- private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
- @Mock
- private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
- @Mock
- private KinesisRecord a, b, c, d;
- @Mock
- private RecordFilter recordFilter;
-
- private ShardRecordsIterator iterator;
-
- @Before
- public void setUp() throws IOException, TransientKinesisException {
- when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
- when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
- when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
- when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
- when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
- when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
- when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
- when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
- when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
- when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
- when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
- when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
- when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
- when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
- when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
- when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
- .thenReturn(firstResult);
- when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
- .thenReturn(secondResult);
- when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
- .thenReturn(thirdResult);
-
- when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
- when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
- when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
-
- when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
- when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
- when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-
- when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
- .class))).thenAnswer(new IdentityAnswer());
-
- iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
- }
-
- @Test
- public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- }
-
- @Test
- public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
- when(firstResult.getRecords()).thenReturn(asList(a, b, c));
- when(secondResult.getRecords()).thenReturn(singletonList(d));
-
- assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
- assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
- assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
- assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
- assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
- }
-
- @Test
- public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
- when(firstResult.getRecords()).thenReturn(singletonList(a));
- when(secondResult.getRecords()).thenReturn(singletonList(b));
-
- when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
- .thenThrow(ExpiredIteratorException.class);
- when(aCheckpoint.getShardIterator(kinesisClient))
- .thenReturn(SECOND_REFRESHED_ITERATOR);
- when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
- .thenReturn(secondResult);
-
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- }
- private static class IdentityAnswer implements Answer<Object> {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- return invocation.getArguments()[0];
- }
+ private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
+ private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
+ private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
+ private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
+ private static final String STREAM_NAME = "STREAM_NAME";
+ private static final String SHARD_ID = "SHARD_ID";
+
+ @Mock
+ private SimplifiedKinesisClient kinesisClient;
+ @Mock
+ private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
+ @Mock
+ private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
+ @Mock
+ private KinesisRecord a, b, c, d;
+ @Mock
+ private RecordFilter recordFilter;
+
+ private ShardRecordsIterator iterator;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
+ when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+ when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
+ when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
+ when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
+ when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
+ when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+ when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(firstResult);
+ when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(secondResult);
+ when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(thirdResult);
+
+ when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
+ when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+ when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+
+ when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+ when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+ when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+ when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
+ .class))).thenAnswer(new IdentityAnswer());
+
+ iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
+ }
+
+ @Test
+ public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ }
+
+ @Test
+ public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+ when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+ when(secondResult.getRecords()).thenReturn(singletonList(d));
+
+ assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+ assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+ assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+ assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
+ assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+ }
+
+ @Test
+ public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
+ when(firstResult.getRecords()).thenReturn(singletonList(a));
+ when(secondResult.getRecords()).thenReturn(singletonList(b));
+
+ when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenThrow(ExpiredIteratorException.class);
+ when(aCheckpoint.getShardIterator(kinesisClient))
+ .thenReturn(SECOND_REFRESHED_ITERATOR);
+ when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(secondResult);
+
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ }
+
+ private static class IdentityAnswer implements Answer<Object> {
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ return invocation.getArguments()[0];
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
index 96434fd..2f8757c 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -34,7 +34,9 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
+
import java.util.List;
+
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -46,179 +48,180 @@ import org.mockito.runners.MockitoJUnitRunner;
*/
@RunWith(MockitoJUnitRunner.class)
public class SimplifiedKinesisClientTest {
- private static final String STREAM = "stream";
- private static final String SHARD_1 = "shard-01";
- private static final String SHARD_2 = "shard-02";
- private static final String SHARD_3 = "shard-03";
- private static final String SHARD_ITERATOR = "iterator";
- private static final String SEQUENCE_NUMBER = "abc123";
-
- @Mock
- private AmazonKinesis kinesis;
- @InjectMocks
- private SimplifiedKinesisClient underTest;
-
- @Test
- public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
- given(kinesis.getShardIterator(new GetShardIteratorRequest()
- .withStreamName(STREAM)
- .withShardId(SHARD_1)
- .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
- .withStartingSequenceNumber(SEQUENCE_NUMBER)
- )).willReturn(new GetShardIteratorResult()
- .withShardIterator(SHARD_ITERATOR));
-
- String stream = underTest.getShardIterator(STREAM, SHARD_1,
- ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
-
- assertThat(stream).isEqualTo(SHARD_ITERATOR);
- }
-
- @Test
- public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
- Instant timestamp = Instant.now();
- given(kinesis.getShardIterator(new GetShardIteratorRequest()
- .withStreamName(STREAM)
- .withShardId(SHARD_1)
- .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
- .withTimestamp(timestamp.toDate())
- )).willReturn(new GetShardIteratorResult()
- .withShardIterator(SHARD_ITERATOR));
-
- String stream = underTest.getShardIterator(STREAM, SHARD_1,
- ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
-
- assertThat(stream).isEqualTo(SHARD_ITERATOR);
- }
-
- @Test
- public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
- shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
- ExpiredIteratorException.class);
- }
-
- @Test
- public void shouldHandleLimitExceededExceptionForGetShardIterator() {
- shouldHandleGetShardIteratorError(new LimitExceededException(""),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
- shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleServiceErrorForGetShardIterator() {
- shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleClientErrorForGetShardIterator() {
- shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
- RuntimeException.class);
- }
-
- @Test
- public void shouldHandleUnexpectedExceptionForGetShardIterator() {
- shouldHandleGetShardIteratorError(new NullPointerException(),
- RuntimeException.class);
- }
-
- private void shouldHandleGetShardIteratorError(
- Exception thrownException,
- Class<? extends Exception> expectedExceptionClass) {
- GetShardIteratorRequest request = new GetShardIteratorRequest()
- .withStreamName(STREAM)
- .withShardId(SHARD_1)
- .withShardIteratorType(ShardIteratorType.LATEST);
-
- given(kinesis.getShardIterator(request)).willThrow(thrownException);
-
- try {
- underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
- failBecauseExceptionWasNotThrown(expectedExceptionClass);
- } catch (Exception e) {
- assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
- } finally {
- reset(kinesis);
- }
- }
-
- @Test
- public void shouldListAllShards() throws Exception {
- Shard shard1 = new Shard().withShardId(SHARD_1);
- Shard shard2 = new Shard().withShardId(SHARD_2);
- Shard shard3 = new Shard().withShardId(SHARD_3);
- given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
- .withStreamDescription(new StreamDescription()
- .withShards(shard1, shard2)
- .withHasMoreShards(true)));
- given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
- .withStreamDescription(new StreamDescription()
- .withShards(shard3)
- .withHasMoreShards(false)));
-
- List<Shard> shards = underTest.listShards(STREAM);
-
- assertThat(shards).containsOnly(shard1, shard2, shard3);
- }
-
- @Test
- public void shouldHandleExpiredIterationExceptionForShardListing() {
- shouldHandleShardListingError(new ExpiredIteratorException(""),
- ExpiredIteratorException.class);
- }
-
- @Test
- public void shouldHandleLimitExceededExceptionForShardListing() {
- shouldHandleShardListingError(new LimitExceededException(""),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
- shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
- TransientKinesisException.class);
- }
- @Test
- public void shouldHandleServiceErrorForShardListing() {
- shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleClientErrorForShardListing() {
- shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
- RuntimeException.class);
- }
-
- @Test
- public void shouldHandleUnexpectedExceptionForShardListing() {
- shouldHandleShardListingError(new NullPointerException(),
- RuntimeException.class);
- }
-
- private void shouldHandleShardListingError(
- Exception thrownException,
- Class<? extends Exception> expectedExceptionClass) {
- given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
- try {
- underTest.listShards(STREAM);
- failBecauseExceptionWasNotThrown(expectedExceptionClass);
- } catch (Exception e) {
- assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
- } finally {
- reset(kinesis);
- }
- }
-
- private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
- AmazonServiceException exception = new AmazonServiceException("");
- exception.setErrorType(errorType);
- return exception;
- }
+ private static final String STREAM = "stream";
+ private static final String SHARD_1 = "shard-01";
+ private static final String SHARD_2 = "shard-02";
+ private static final String SHARD_3 = "shard-03";
+ private static final String SHARD_ITERATOR = "iterator";
+ private static final String SEQUENCE_NUMBER = "abc123";
+
+ @Mock
+ private AmazonKinesis kinesis;
+ @InjectMocks
+ private SimplifiedKinesisClient underTest;
+
+ @Test
+ public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
+ given(kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ .withStartingSequenceNumber(SEQUENCE_NUMBER)
+ )).willReturn(new GetShardIteratorResult()
+ .withShardIterator(SHARD_ITERATOR));
+
+ String stream = underTest.getShardIterator(STREAM, SHARD_1,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
+
+ assertThat(stream).isEqualTo(SHARD_ITERATOR);
+ }
+
+ @Test
+ public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
+ Instant timestamp = Instant.now();
+ given(kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ .withTimestamp(timestamp.toDate())
+ )).willReturn(new GetShardIteratorResult()
+ .withShardIterator(SHARD_ITERATOR));
+
+ String stream = underTest.getShardIterator(STREAM, SHARD_1,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
+
+ assertThat(stream).isEqualTo(SHARD_ITERATOR);
+ }
+
+ @Test
+ public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
+ ExpiredIteratorException.class);
+ }
+
+ @Test
+ public void shouldHandleLimitExceededExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new LimitExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleServiceErrorForGetShardIterator() {
+ shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleClientErrorForGetShardIterator() {
+ shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
+ RuntimeException.class);
+ }
+
+ @Test
+ public void shouldHandleUnexpectedExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new NullPointerException(),
+ RuntimeException.class);
+ }
+
+ private void shouldHandleGetShardIteratorError(
+ Exception thrownException,
+ Class<? extends Exception> expectedExceptionClass) {
+ GetShardIteratorRequest request = new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.LATEST);
+
+ given(kinesis.getShardIterator(request)).willThrow(thrownException);
+
+ try {
+ underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
+ failBecauseExceptionWasNotThrown(expectedExceptionClass);
+ } catch (Exception e) {
+ assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+ } finally {
+ reset(kinesis);
+ }
+ }
+
+ @Test
+ public void shouldListAllShards() throws Exception {
+ Shard shard1 = new Shard().withShardId(SHARD_1);
+ Shard shard2 = new Shard().withShardId(SHARD_2);
+ Shard shard3 = new Shard().withShardId(SHARD_3);
+ given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
+ .withStreamDescription(new StreamDescription()
+ .withShards(shard1, shard2)
+ .withHasMoreShards(true)));
+ given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
+ .withStreamDescription(new StreamDescription()
+ .withShards(shard3)
+ .withHasMoreShards(false)));
+
+ List<Shard> shards = underTest.listShards(STREAM);
+
+ assertThat(shards).containsOnly(shard1, shard2, shard3);
+ }
+
+ @Test
+ public void shouldHandleExpiredIterationExceptionForShardListing() {
+ shouldHandleShardListingError(new ExpiredIteratorException(""),
+ ExpiredIteratorException.class);
+ }
+
+ @Test
+ public void shouldHandleLimitExceededExceptionForShardListing() {
+ shouldHandleShardListingError(new LimitExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
+ shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleServiceErrorForShardListing() {
+ shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleClientErrorForShardListing() {
+ shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
+ RuntimeException.class);
+ }
+
+ @Test
+ public void shouldHandleUnexpectedExceptionForShardListing() {
+ shouldHandleShardListingError(new NullPointerException(),
+ RuntimeException.class);
+ }
+
+ private void shouldHandleShardListingError(
+ Exception thrownException,
+ Class<? extends Exception> expectedExceptionClass) {
+ given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
+ try {
+ underTest.listShards(STREAM);
+ failBecauseExceptionWasNotThrown(expectedExceptionClass);
+ } catch (Exception e) {
+ assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+ } finally {
+ reset(kinesis);
+ }
+ }
+
+ private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
+ AmazonServiceException exception = new AmazonServiceException("");
+ exception.setErrorType(errorType);
+ return exception;
+ }
}
[3/4] beam git commit: Reformatting Kinesis IO to comply with
official code style
Posted by jb...@apache.org.
Reformatting Kinesis IO to comply with official code style
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21fd3028
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21fd3028
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21fd3028
Branch: refs/heads/master
Commit: 21fd30283eb4f6b829b06830a3ef04df0a377b06
Parents: af08f53
Author: Pawel Kaczmarczyk <p....@ocado.com>
Authored: Mon Jun 19 11:10:25 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Jul 11 15:24:19 2017 +0200
----------------------------------------------------------------------
.../sdk/io/kinesis/CheckpointGenerator.java | 6 +-
.../beam/sdk/io/kinesis/CustomOptional.java | 111 ++--
.../io/kinesis/DynamicCheckpointGenerator.java | 52 +-
.../sdk/io/kinesis/GetKinesisRecordsResult.java | 49 +-
.../sdk/io/kinesis/KinesisClientProvider.java | 4 +-
.../apache/beam/sdk/io/kinesis/KinesisIO.java | 279 +++++-----
.../beam/sdk/io/kinesis/KinesisReader.java | 206 +++----
.../sdk/io/kinesis/KinesisReaderCheckpoint.java | 97 ++--
.../beam/sdk/io/kinesis/KinesisRecord.java | 177 +++---
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 68 +--
.../beam/sdk/io/kinesis/KinesisSource.java | 147 ++---
.../beam/sdk/io/kinesis/RecordFilter.java | 18 +-
.../apache/beam/sdk/io/kinesis/RoundRobin.java | 37 +-
.../beam/sdk/io/kinesis/ShardCheckpoint.java | 241 ++++-----
.../sdk/io/kinesis/ShardRecordsIterator.java | 106 ++--
.../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 ++++----
.../beam/sdk/io/kinesis/StartingPoint.java | 84 +--
.../io/kinesis/StaticCheckpointGenerator.java | 27 +-
.../io/kinesis/TransientKinesisException.java | 7 +-
.../beam/sdk/io/kinesis/AmazonKinesisMock.java | 539 ++++++++++---------
.../beam/sdk/io/kinesis/CustomOptionalTest.java | 27 +-
.../kinesis/DynamicCheckpointGeneratorTest.java | 33 +-
.../sdk/io/kinesis/KinesisMockReadTest.java | 97 ++--
.../io/kinesis/KinesisReaderCheckpointTest.java | 52 +-
.../beam/sdk/io/kinesis/KinesisReaderIT.java | 127 ++---
.../beam/sdk/io/kinesis/KinesisReaderTest.java | 166 +++---
.../sdk/io/kinesis/KinesisRecordCoderTest.java | 34 +-
.../beam/sdk/io/kinesis/KinesisTestOptions.java | 43 +-
.../beam/sdk/io/kinesis/KinesisUploader.java | 70 +--
.../beam/sdk/io/kinesis/RecordFilterTest.java | 52 +-
.../beam/sdk/io/kinesis/RoundRobinTest.java | 42 +-
.../sdk/io/kinesis/ShardCheckpointTest.java | 203 +++----
.../io/kinesis/ShardRecordsIteratorTest.java | 216 ++++----
.../io/kinesis/SimplifiedKinesisClientTest.java | 351 ++++++------
34 files changed, 2031 insertions(+), 1952 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
index 919d85a..2629c57 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import java.io.Serializable;
/**
@@ -25,6 +24,7 @@ import java.io.Serializable;
* How exactly the checkpoint is generated is up to implementing class.
*/
interface CheckpointGenerator extends Serializable {
- KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
- throws TransientKinesisException;
+
+ KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
+ throws TransientKinesisException;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
index 4bed0e3..5a28214 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
@@ -24,76 +24,79 @@ import java.util.Objects;
* Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element.
*/
abstract class CustomOptional<T> {
- @SuppressWarnings("unchecked")
- public static <T> CustomOptional<T> absent() {
- return (Absent<T>) Absent.INSTANCE;
- }
- public static <T> CustomOptional<T> of(T v) {
- return new Present<>(v);
- }
+ @SuppressWarnings("unchecked")
+ public static <T> CustomOptional<T> absent() {
+ return (Absent<T>) Absent.INSTANCE;
+ }
- public abstract boolean isPresent();
+ public static <T> CustomOptional<T> of(T v) {
+ return new Present<>(v);
+ }
- public abstract T get();
+ public abstract boolean isPresent();
- private static class Present<T> extends CustomOptional<T> {
- private final T value;
+ public abstract T get();
- private Present(T value) {
- this.value = value;
- }
+ private static class Present<T> extends CustomOptional<T> {
- @Override
- public boolean isPresent() {
- return true;
- }
+ private final T value;
- @Override
- public T get() {
- return value;
- }
+ private Present(T value) {
+ this.value = value;
+ }
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Present)) {
- return false;
- }
+ @Override
+ public boolean isPresent() {
+ return true;
+ }
- Present<?> present = (Present<?>) o;
- return Objects.equals(value, present.value);
- }
+ @Override
+ public T get() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Present)) {
+ return false;
+ }
- @Override
- public int hashCode() {
- return Objects.hash(value);
- }
+ Present<?> present = (Present<?>) o;
+ return Objects.equals(value, present.value);
}
- private static class Absent<T> extends CustomOptional<T> {
- private static final Absent<Object> INSTANCE = new Absent<>();
+ @Override
+ public int hashCode() {
+ return Objects.hash(value);
+ }
+ }
- private Absent() {
- }
+ private static class Absent<T> extends CustomOptional<T> {
- @Override
- public boolean isPresent() {
- return false;
- }
+ private static final Absent<Object> INSTANCE = new Absent<>();
- @Override
- public T get() {
- throw new NoSuchElementException();
- }
+ private Absent() {
+ }
+
+ @Override
+ public boolean isPresent() {
+ return false;
+ }
- @Override
- public boolean equals(Object o) {
- return o instanceof Absent;
- }
+ @Override
+ public T get() {
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof Absent;
+ }
- @Override
- public int hashCode() {
- return 0;
- }
+ @Override
+ public int hashCode() {
+ return 0;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
index 2ec293c..9933019 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -28,29 +28,31 @@ import com.google.common.base.Function;
* List of shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}.
*/
class DynamicCheckpointGenerator implements CheckpointGenerator {
- private final String streamName;
- private final StartingPoint startingPoint;
-
- public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
- this.streamName = checkNotNull(streamName, "streamName");
- this.startingPoint = checkNotNull(startingPoint, "startingPoint");
- }
-
- @Override
- public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
- throws TransientKinesisException {
- return new KinesisReaderCheckpoint(
- transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
- @Override
- public ShardCheckpoint apply(Shard shard) {
- return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
- }
- })
- );
- }
-
- @Override
- public String toString() {
- return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
- }
+
+ private final String streamName;
+ private final StartingPoint startingPoint;
+
+ public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
+ this.streamName = checkNotNull(streamName, "streamName");
+ this.startingPoint = checkNotNull(startingPoint, "startingPoint");
+ }
+
+ @Override
+ public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
+ throws TransientKinesisException {
+ return new KinesisReaderCheckpoint(
+ transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
+
+ @Override
+ public ShardCheckpoint apply(Shard shard) {
+ return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
+ }
+ })
+ );
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
index 5a34d7d..f605f55 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -21,6 +21,7 @@ import static com.google.common.collect.Lists.transform;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.google.common.base.Function;
+
import java.util.List;
import javax.annotation.Nullable;
@@ -28,27 +29,29 @@ import javax.annotation.Nullable;
* Represents the output of 'get' operation on Kinesis stream.
*/
class GetKinesisRecordsResult {
- private final List<KinesisRecord> records;
- private final String nextShardIterator;
-
- public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
- final String streamName, final String shardId) {
- this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
- @Nullable
- @Override
- public KinesisRecord apply(@Nullable UserRecord input) {
- assert input != null; // to make FindBugs happy
- return new KinesisRecord(input, streamName, shardId);
- }
- });
- this.nextShardIterator = nextShardIterator;
- }
-
- public List<KinesisRecord> getRecords() {
- return records;
- }
-
- public String getNextShardIterator() {
- return nextShardIterator;
- }
+
+ private final List<KinesisRecord> records;
+ private final String nextShardIterator;
+
+ public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
+ final String streamName, final String shardId) {
+ this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
+
+ @Nullable
+ @Override
+ public KinesisRecord apply(@Nullable UserRecord input) {
+ assert input != null; // to make FindBugs happy
+ return new KinesisRecord(input, streamName, shardId);
+ }
+ });
+ this.nextShardIterator = nextShardIterator;
+ }
+
+ public List<KinesisRecord> getRecords() {
+ return records;
+ }
+
+ public String getNextShardIterator() {
+ return nextShardIterator;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
index c7fd7f6..b5b721e 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.kinesis;
import com.amazonaws.services.kinesis.AmazonKinesis;
+
import java.io.Serializable;
/**
@@ -27,5 +28,6 @@ import java.io.Serializable;
* {@link Serializable} to ensure it can be sent to worker machines.
*/
interface KinesisClientProvider extends Serializable {
- AmazonKinesis get();
+
+ AmazonKinesis get();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index b85eb63..bc8ada1 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -29,7 +28,9 @@ import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.google.auto.value.AutoValue;
+
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.transforms.PTransform;
@@ -102,142 +103,148 @@ import org.joda.time.Instant;
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public final class KinesisIO {
- /** Returns a new {@link Read} transform for reading from Kinesis. */
- public static Read read() {
- return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
+
+ /** Returns a new {@link Read} transform for reading from Kinesis. */
+ public static Read read() {
+ return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
+ }
+
+ /** Implementation of {@link #read}. */
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
+
+ @Nullable
+ abstract String getStreamName();
+
+ @Nullable
+ abstract StartingPoint getInitialPosition();
+
+ @Nullable
+ abstract KinesisClientProvider getClientProvider();
+
+ abstract int getMaxNumRecords();
+
+ @Nullable
+ abstract Duration getMaxReadTime();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setStreamName(String streamName);
+
+ abstract Builder setInitialPosition(StartingPoint startingPoint);
+
+ abstract Builder setClientProvider(KinesisClientProvider clientProvider);
+
+ abstract Builder setMaxNumRecords(int maxNumRecords);
+
+ abstract Builder setMaxReadTime(Duration maxReadTime);
+
+ abstract Read build();
}
- /** Implementation of {@link #read}. */
- @AutoValue
- public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
- @Nullable
- abstract String getStreamName();
-
- @Nullable
- abstract StartingPoint getInitialPosition();
-
- @Nullable
- abstract KinesisClientProvider getClientProvider();
-
- abstract int getMaxNumRecords();
-
- @Nullable
- abstract Duration getMaxReadTime();
-
- abstract Builder toBuilder();
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setStreamName(String streamName);
- abstract Builder setInitialPosition(StartingPoint startingPoint);
- abstract Builder setClientProvider(KinesisClientProvider clientProvider);
- abstract Builder setMaxNumRecords(int maxNumRecords);
- abstract Builder setMaxReadTime(Duration maxReadTime);
-
- abstract Read build();
- }
-
- /**
- * Specify reading from streamName at some initial position.
- */
- public Read from(String streamName, InitialPositionInStream initialPosition) {
- return toBuilder()
- .setStreamName(streamName)
- .setInitialPosition(
- new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
- .build();
- }
-
- /**
- * Specify reading from streamName beginning at given {@link Instant}.
- * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
- */
- public Read from(String streamName, Instant initialTimestamp) {
- return toBuilder()
- .setStreamName(streamName)
- .setInitialPosition(
- new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
- .build();
- }
-
- /**
- * Allows to specify custom {@link KinesisClientProvider}.
- * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
- * used for communication with Kinesis.
- * You should use this method if {@link Read#withClientProvider(String, String, Regions)}
- * does not suit your needs.
- */
- public Read withClientProvider(KinesisClientProvider kinesisClientProvider) {
- return toBuilder().setClientProvider(kinesisClientProvider).build();
- }
-
- /**
- * Specify credential details and region to be used to read from Kinesis.
- * If you need more sophisticated credential protocol, then you should look at
- * {@link Read#withClientProvider(KinesisClientProvider)}.
- */
- public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) {
- return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
- }
-
- /** Specifies to read at most a given number of records. */
- public Read withMaxNumRecords(int maxNumRecords) {
- checkArgument(
- maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
- return toBuilder().setMaxNumRecords(maxNumRecords).build();
- }
-
- /** Specifies to read at most a given number of records. */
- public Read withMaxReadTime(Duration maxReadTime) {
- checkNotNull(maxReadTime, "maxReadTime");
- return toBuilder().setMaxReadTime(maxReadTime).build();
- }
-
- @Override
- public PCollection<KinesisRecord> expand(PBegin input) {
- org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
- org.apache.beam.sdk.io.Read.from(
- new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition()));
- if (getMaxNumRecords() > 0) {
- BoundedReadFromUnboundedSource<KinesisRecord> bounded =
- read.withMaxNumRecords(getMaxNumRecords());
- return getMaxReadTime() == null
- ? input.apply(bounded)
- : input.apply(bounded.withMaxReadTime(getMaxReadTime()));
- } else {
- return getMaxReadTime() == null
- ? input.apply(read)
- : input.apply(read.withMaxReadTime(getMaxReadTime()));
- }
- }
-
- private static final class BasicKinesisProvider implements KinesisClientProvider {
-
- private final String accessKey;
- private final String secretKey;
- private final Regions region;
-
- private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
- this.accessKey = checkNotNull(accessKey, "accessKey");
- this.secretKey = checkNotNull(secretKey, "secretKey");
- this.region = checkNotNull(region, "region");
- }
-
-
- private AWSCredentialsProvider getCredentialsProvider() {
- return new StaticCredentialsProvider(new BasicAWSCredentials(
- accessKey,
- secretKey
- ));
-
- }
-
- @Override
- public AmazonKinesis get() {
- AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
- client.withRegion(region);
- return client;
- }
- }
+ /**
+ * Specify reading from streamName at some initial position.
+ */
+ public Read from(String streamName, InitialPositionInStream initialPosition) {
+ return toBuilder()
+ .setStreamName(streamName)
+ .setInitialPosition(
+ new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
+ .build();
+ }
+
+ /**
+ * Specify reading from streamName beginning at given {@link Instant}.
+ * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
+ */
+ public Read from(String streamName, Instant initialTimestamp) {
+ return toBuilder()
+ .setStreamName(streamName)
+ .setInitialPosition(
+ new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
+ .build();
+ }
+
+ /**
+ * Allows to specify custom {@link KinesisClientProvider}.
+ * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
+ * used for communication with Kinesis.
+ * You should use this method if {@link Read#withClientProvider(String, String, Regions)}
+ * does not suit your needs.
+ */
+ public Read withClientProvider(KinesisClientProvider kinesisClientProvider) {
+ return toBuilder().setClientProvider(kinesisClientProvider).build();
+ }
+
+ /**
+ * Specify credential details and region to be used to read from Kinesis.
+ * If you need more sophisticated credential protocol, then you should look at
+ * {@link Read#withClientProvider(KinesisClientProvider)}.
+ */
+ public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) {
+ return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
+ }
+
+ /** Specifies to read at most a given number of records. */
+ public Read withMaxNumRecords(int maxNumRecords) {
+ checkArgument(
+ maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
+ return toBuilder().setMaxNumRecords(maxNumRecords).build();
+ }
+
+ /** Specifies to read at most a given number of records. */
+ public Read withMaxReadTime(Duration maxReadTime) {
+ checkNotNull(maxReadTime, "maxReadTime");
+ return toBuilder().setMaxReadTime(maxReadTime).build();
+ }
+
+ @Override
+ public PCollection<KinesisRecord> expand(PBegin input) {
+ org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
+ org.apache.beam.sdk.io.Read.from(
+ new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition()));
+ if (getMaxNumRecords() > 0) {
+ BoundedReadFromUnboundedSource<KinesisRecord> bounded =
+ read.withMaxNumRecords(getMaxNumRecords());
+ return getMaxReadTime() == null
+ ? input.apply(bounded)
+ : input.apply(bounded.withMaxReadTime(getMaxReadTime()));
+ } else {
+ return getMaxReadTime() == null
+ ? input.apply(read)
+ : input.apply(read.withMaxReadTime(getMaxReadTime()));
+ }
+ }
+
+ private static final class BasicKinesisProvider implements KinesisClientProvider {
+
+ private final String accessKey;
+ private final String secretKey;
+ private final Regions region;
+
+ private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
+ this.accessKey = checkNotNull(accessKey, "accessKey");
+ this.secretKey = checkNotNull(secretKey, "secretKey");
+ this.region = checkNotNull(region, "region");
+ }
+
+ private AWSCredentialsProvider getCredentialsProvider() {
+ return new StaticCredentialsProvider(new BasicAWSCredentials(
+ accessKey,
+ secretKey
+ ));
+
+ }
+
+ @Override
+ public AmazonKinesis get() {
+ AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
+ client.withRegion(region);
+ return client;
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 2138094..e5c32d2 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -17,129 +17,129 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
+
import org.apache.beam.sdk.io.UnboundedSource;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Reads data from multiple kinesis shards in a single thread.
* It uses simple round robin algorithm when fetching data from shards.
*/
class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
- private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
-
- private final SimplifiedKinesisClient kinesis;
- private final UnboundedSource<KinesisRecord, ?> source;
- private final CheckpointGenerator initialCheckpointGenerator;
- private RoundRobin<ShardRecordsIterator> shardIterators;
- private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
-
- public KinesisReader(SimplifiedKinesisClient kinesis,
- CheckpointGenerator initialCheckpointGenerator,
- UnboundedSource<KinesisRecord, ?> source) {
- this.kinesis = checkNotNull(kinesis, "kinesis");
- this.initialCheckpointGenerator =
- checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
- this.source = source;
- }
-
- /**
- * Generates initial checkpoint and instantiates iterators for shards.
- */
- @Override
- public boolean start() throws IOException {
- LOG.info("Starting reader using {}", initialCheckpointGenerator);
-
- try {
- KinesisReaderCheckpoint initialCheckpoint =
- initialCheckpointGenerator.generate(kinesis);
- List<ShardRecordsIterator> iterators = newArrayList();
- for (ShardCheckpoint checkpoint : initialCheckpoint) {
- iterators.add(checkpoint.getShardRecordsIterator(kinesis));
- }
- shardIterators = new RoundRobin<>(iterators);
- } catch (TransientKinesisException e) {
- throw new IOException(e);
- }
- return advance();
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
+
+ private final SimplifiedKinesisClient kinesis;
+ private final UnboundedSource<KinesisRecord, ?> source;
+ private final CheckpointGenerator initialCheckpointGenerator;
+ private RoundRobin<ShardRecordsIterator> shardIterators;
+ private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
+
+ public KinesisReader(SimplifiedKinesisClient kinesis,
+ CheckpointGenerator initialCheckpointGenerator,
+ UnboundedSource<KinesisRecord, ?> source) {
+ this.kinesis = checkNotNull(kinesis, "kinesis");
+ this.initialCheckpointGenerator =
+ checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
+ this.source = source;
+ }
+
+ /**
+ * Generates initial checkpoint and instantiates iterators for shards.
+ */
+ @Override
+ public boolean start() throws IOException {
+ LOG.info("Starting reader using {}", initialCheckpointGenerator);
+
+ try {
+ KinesisReaderCheckpoint initialCheckpoint =
+ initialCheckpointGenerator.generate(kinesis);
+ List<ShardRecordsIterator> iterators = newArrayList();
+ for (ShardCheckpoint checkpoint : initialCheckpoint) {
+ iterators.add(checkpoint.getShardRecordsIterator(kinesis));
+ }
+ shardIterators = new RoundRobin<>(iterators);
+ } catch (TransientKinesisException e) {
+ throw new IOException(e);
}
- /**
- * Moves to the next record in one of the shards.
- * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
- * If not, we iterate over shards in a round-robin manner.
- */
- @Override
- public boolean advance() throws IOException {
- try {
- for (int i = 0; i < shardIterators.size(); ++i) {
- currentRecord = shardIterators.getCurrent().next();
- if (currentRecord.isPresent()) {
- return true;
- } else {
- shardIterators.moveForward();
- }
- }
- } catch (TransientKinesisException e) {
- LOG.warn("Transient exception occurred", e);
+ return advance();
+ }
+
+ /**
+ * Moves to the next record in one of the shards.
+ * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
+ * If not, we iterate over shards in a round-robin manner.
+ */
+ @Override
+ public boolean advance() throws IOException {
+ try {
+ for (int i = 0; i < shardIterators.size(); ++i) {
+ currentRecord = shardIterators.getCurrent().next();
+ if (currentRecord.isPresent()) {
+ return true;
+ } else {
+ shardIterators.moveForward();
}
- return false;
- }
-
- @Override
- public byte[] getCurrentRecordId() throws NoSuchElementException {
- return currentRecord.get().getUniqueId();
- }
-
- @Override
- public KinesisRecord getCurrent() throws NoSuchElementException {
- return currentRecord.get();
- }
-
- /**
- * When {@link KinesisReader} was advanced to the current record.
- * We cannot use approximate arrival timestamp given for each record by Kinesis as it
- * is not guaranteed to be accurate - this could lead to mark some records as "late"
- * even if they were not.
- */
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return currentRecord.get().getReadTime();
- }
-
- @Override
- public void close() throws IOException {
- }
-
- /**
- * Current time.
- * We cannot give better approximation of the watermark with current semantics of
- * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
- * {@link KinesisReader#advance()} will be called.
- */
- @Override
- public Instant getWatermark() {
- return Instant.now();
- }
-
- @Override
- public UnboundedSource.CheckpointMark getCheckpointMark() {
- return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
- }
-
- @Override
- public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
- return source;
+ }
+ } catch (TransientKinesisException e) {
+ LOG.warn("Transient exception occurred", e);
}
+ return false;
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ return currentRecord.get().getUniqueId();
+ }
+
+ @Override
+ public KinesisRecord getCurrent() throws NoSuchElementException {
+ return currentRecord.get();
+ }
+
+ /**
+ * When {@link KinesisReader} was advanced to the current record.
+ * We cannot use approximate arrival timestamp given for each record by Kinesis as it
+ * is not guaranteed to be accurate - this could lead to mark some records as "late"
+ * even if they were not.
+ */
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return currentRecord.get().getReadTime();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ /**
+ * Current time.
+ * We cannot give better approximation of the watermark with current semantics of
+ * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
+ * {@link KinesisReader#advance()} will be called.
+ */
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public UnboundedSource.CheckpointMark getCheckpointMark() {
+ return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
+ }
+
+ @Override
+ public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
+ return source;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
index f0fa45d..d995e75 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -23,11 +23,13 @@ import static com.google.common.collect.Lists.partition;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.io.UnboundedSource;
/**
@@ -37,60 +39,61 @@ import org.apache.beam.sdk.io.UnboundedSource;
* This class is immutable.
*/
class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSource
- .CheckpointMark, Serializable {
- private final List<ShardCheckpoint> shardCheckpoints;
+ .CheckpointMark, Serializable {
- public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
- this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
- }
+ private final List<ShardCheckpoint> shardCheckpoints;
- public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
- iterators) {
- return new KinesisReaderCheckpoint(transform(iterators,
- new Function<ShardRecordsIterator, ShardCheckpoint>() {
-
- @Nullable
- @Override
- public ShardCheckpoint apply(@Nullable
- ShardRecordsIterator shardRecordsIterator) {
- assert shardRecordsIterator != null;
- return shardRecordsIterator.getCheckpoint();
- }
- }));
- }
+ public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
+ this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
+ }
- /**
- * Splits given multi-shard checkpoint into partitions of approximately equal size.
- *
- * @param desiredNumSplits - upper limit for number of partitions to generate.
- * @return list of checkpoints covering consecutive partitions of current checkpoint.
- */
- public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
- int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
-
- List<KinesisReaderCheckpoint> checkpoints = newArrayList();
- for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
- checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
- }
- return checkpoints;
- }
+ public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
+ iterators) {
+ return new KinesisReaderCheckpoint(transform(iterators,
+ new Function<ShardRecordsIterator, ShardCheckpoint>() {
- private int divideAndRoundUp(int nominator, int denominator) {
- return (nominator + denominator - 1) / denominator;
- }
+ @Nullable
+ @Override
+ public ShardCheckpoint apply(@Nullable
+ ShardRecordsIterator shardRecordsIterator) {
+ assert shardRecordsIterator != null;
+ return shardRecordsIterator.getCheckpoint();
+ }
+ }));
+ }
- @Override
- public void finalizeCheckpoint() throws IOException {
+ /**
+ * Splits given multi-shard checkpoint into partitions of approximately equal size.
+ *
+ * @param desiredNumSplits - upper limit for number of partitions to generate.
+ * @return list of checkpoints covering consecutive partitions of current checkpoint.
+ */
+ public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
+ int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
+ List<KinesisReaderCheckpoint> checkpoints = newArrayList();
+ for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
+ checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
}
+ return checkpoints;
+ }
- @Override
- public String toString() {
- return shardCheckpoints.toString();
- }
+ private int divideAndRoundUp(int nominator, int denominator) {
+ return (nominator + denominator - 1) / denominator;
+ }
- @Override
- public Iterator<ShardCheckpoint> iterator() {
- return shardCheckpoints.iterator();
- }
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+
+ }
+
+ @Override
+ public String toString() {
+ return shardCheckpoints.toString();
+ }
+
+ @Override
+ public Iterator<ShardCheckpoint> iterator() {
+ return shardCheckpoints.iterator();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
index 02b5370..057b7bb 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
@@ -22,7 +22,9 @@ import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.google.common.base.Charsets;
+
import java.nio.ByteBuffer;
+
import org.apache.commons.lang.builder.EqualsBuilder;
import org.joda.time.Instant;
@@ -30,91 +32,92 @@ import org.joda.time.Instant;
* {@link UserRecord} enhanced with utility methods.
*/
public class KinesisRecord {
- private Instant readTime;
- private String streamName;
- private String shardId;
- private long subSequenceNumber;
- private String sequenceNumber;
- private Instant approximateArrivalTimestamp;
- private ByteBuffer data;
- private String partitionKey;
-
- public KinesisRecord(UserRecord record, String streamName, String shardId) {
- this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
- record.getPartitionKey(),
- new Instant(record.getApproximateArrivalTimestamp()),
- Instant.now(),
- streamName, shardId);
- }
-
- public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
- String partitionKey, Instant approximateArrivalTimestamp,
- Instant readTime,
- String streamName, String shardId) {
- this.data = data;
- this.sequenceNumber = sequenceNumber;
- this.subSequenceNumber = subSequenceNumber;
- this.partitionKey = partitionKey;
- this.approximateArrivalTimestamp = approximateArrivalTimestamp;
- this.readTime = readTime;
- this.streamName = streamName;
- this.shardId = shardId;
- }
-
- public ExtendedSequenceNumber getExtendedSequenceNumber() {
- return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
- }
-
- /***
- * @return unique id of the record based on its position in the stream
- */
- public byte[] getUniqueId() {
- return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
- }
-
- public Instant getReadTime() {
- return readTime;
- }
-
- public String getStreamName() {
- return streamName;
- }
-
- public String getShardId() {
- return shardId;
- }
-
- public byte[] getDataAsBytes() {
- return getData().array();
- }
-
- @Override
- public boolean equals(Object obj) {
- return EqualsBuilder.reflectionEquals(this, obj);
- }
-
- @Override
- public int hashCode() {
- return reflectionHashCode(this);
- }
-
- public long getSubSequenceNumber() {
- return subSequenceNumber;
- }
-
- public String getSequenceNumber() {
- return sequenceNumber;
- }
-
- public Instant getApproximateArrivalTimestamp() {
- return approximateArrivalTimestamp;
- }
-
- public ByteBuffer getData() {
- return data;
- }
-
- public String getPartitionKey() {
- return partitionKey;
- }
+
+ private Instant readTime;
+ private String streamName;
+ private String shardId;
+ private long subSequenceNumber;
+ private String sequenceNumber;
+ private Instant approximateArrivalTimestamp;
+ private ByteBuffer data;
+ private String partitionKey;
+
+ public KinesisRecord(UserRecord record, String streamName, String shardId) {
+ this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
+ record.getPartitionKey(),
+ new Instant(record.getApproximateArrivalTimestamp()),
+ Instant.now(),
+ streamName, shardId);
+ }
+
+ public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
+ String partitionKey, Instant approximateArrivalTimestamp,
+ Instant readTime,
+ String streamName, String shardId) {
+ this.data = data;
+ this.sequenceNumber = sequenceNumber;
+ this.subSequenceNumber = subSequenceNumber;
+ this.partitionKey = partitionKey;
+ this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+ this.readTime = readTime;
+ this.streamName = streamName;
+ this.shardId = shardId;
+ }
+
+ public ExtendedSequenceNumber getExtendedSequenceNumber() {
+ return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
+ }
+
+ /***
+ * @return unique id of the record based on its position in the stream
+ */
+ public byte[] getUniqueId() {
+ return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
+ }
+
+ public Instant getReadTime() {
+ return readTime;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public String getShardId() {
+ return shardId;
+ }
+
+ public byte[] getDataAsBytes() {
+ return getData().array();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return EqualsBuilder.reflectionEquals(this, obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return reflectionHashCode(this);
+ }
+
+ public long getSubSequenceNumber() {
+ return subSequenceNumber;
+ }
+
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public Instant getApproximateArrivalTimestamp() {
+ return approximateArrivalTimestamp;
+ }
+
+ public ByteBuffer getData() {
+ return data;
+ }
+
+ public String getPartitionKey() {
+ return partitionKey;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index f233e27..dcf564d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -33,40 +34,41 @@ import org.joda.time.Instant;
* A {@link Coder} for {@link KinesisRecord}.
*/
class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
- private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
- private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
- private static final InstantCoder INSTANT_CODER = InstantCoder.of();
- private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
- public static KinesisRecordCoder of() {
- return new KinesisRecordCoder();
- }
+ private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+ private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+ private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+ private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
+
+ public static KinesisRecordCoder of() {
+ return new KinesisRecordCoder();
+ }
- @Override
- public void encode(KinesisRecord value, OutputStream outStream) throws
- IOException {
- BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
- STRING_CODER.encode(value.getSequenceNumber(), outStream);
- STRING_CODER.encode(value.getPartitionKey(), outStream);
- INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
- VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
- INSTANT_CODER.encode(value.getReadTime(), outStream);
- STRING_CODER.encode(value.getStreamName(), outStream);
- STRING_CODER.encode(value.getShardId(), outStream);
- }
+ @Override
+ public void encode(KinesisRecord value, OutputStream outStream) throws
+ IOException {
+ BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
+ STRING_CODER.encode(value.getSequenceNumber(), outStream);
+ STRING_CODER.encode(value.getPartitionKey(), outStream);
+ INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
+ VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
+ INSTANT_CODER.encode(value.getReadTime(), outStream);
+ STRING_CODER.encode(value.getStreamName(), outStream);
+ STRING_CODER.encode(value.getShardId(), outStream);
+ }
- @Override
- public KinesisRecord decode(InputStream inStream) throws IOException {
- ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
- String sequenceNumber = STRING_CODER.decode(inStream);
- String partitionKey = STRING_CODER.decode(inStream);
- Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
- long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
- Instant readTimestamp = INSTANT_CODER.decode(inStream);
- String streamName = STRING_CODER.decode(inStream);
- String shardId = STRING_CODER.decode(inStream);
- return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
- approximateArrivalTimestamp, readTimestamp, streamName, shardId
- );
- }
+ @Override
+ public KinesisRecord decode(InputStream inStream) throws IOException {
+ ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
+ String sequenceNumber = STRING_CODER.decode(inStream);
+ String partitionKey = STRING_CODER.decode(inStream);
+ Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
+ long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
+ Instant readTimestamp = INSTANT_CODER.decode(inStream);
+ String streamName = STRING_CODER.decode(inStream);
+ String shardId = STRING_CODER.decode(inStream);
+ return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
+ approximateArrivalTimestamp, readTimestamp, streamName, shardId
+ );
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 7e67d07..362792b 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
import java.util.List;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -28,85 +29,85 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Represents source for single stream in Kinesis.
*/
class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> {
- private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
-
- private final KinesisClientProvider kinesis;
- private CheckpointGenerator initialCheckpointGenerator;
- public KinesisSource(KinesisClientProvider kinesis, String streamName,
- StartingPoint startingPoint) {
- this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
+
+ private final KinesisClientProvider kinesis;
+ private CheckpointGenerator initialCheckpointGenerator;
+
+ public KinesisSource(KinesisClientProvider kinesis, String streamName,
+ StartingPoint startingPoint) {
+ this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+ }
+
+ private KinesisSource(KinesisClientProvider kinesisClientProvider,
+ CheckpointGenerator initialCheckpoint) {
+ this.kinesis = kinesisClientProvider;
+ this.initialCheckpointGenerator = initialCheckpoint;
+ validate();
+ }
+
+ /**
+ * Generate splits for reading from the stream.
+ * Basically, it'll try to evenly split set of shards in the stream into
+ * {@code desiredNumSplits} partitions. Each partition is then a split.
+ */
+ @Override
+ public List<KinesisSource> split(int desiredNumSplits,
+ PipelineOptions options) throws Exception {
+ KinesisReaderCheckpoint checkpoint =
+ initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
+
+ List<KinesisSource> sources = newArrayList();
+
+ for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
+ sources.add(new KinesisSource(
+ kinesis,
+ new StaticCheckpointGenerator(partition)));
}
-
- private KinesisSource(KinesisClientProvider kinesisClientProvider,
- CheckpointGenerator initialCheckpoint) {
- this.kinesis = kinesisClientProvider;
- this.initialCheckpointGenerator = initialCheckpoint;
- validate();
+ return sources;
+ }
+
+ /**
+ * Creates reader based on given {@link KinesisReaderCheckpoint}.
+ * If {@link KinesisReaderCheckpoint} is not given, then we use
+ * {@code initialCheckpointGenerator} to generate new checkpoint.
+ */
+ @Override
+ public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
+ KinesisReaderCheckpoint checkpointMark) {
+
+ CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
+
+ if (checkpointMark != null) {
+ checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
}
- /**
- * Generate splits for reading from the stream.
- * Basically, it'll try to evenly split set of shards in the stream into
- * {@code desiredNumSplits} partitions. Each partition is then a split.
- */
- @Override
- public List<KinesisSource> split(int desiredNumSplits,
- PipelineOptions options) throws Exception {
- KinesisReaderCheckpoint checkpoint =
- initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
-
- List<KinesisSource> sources = newArrayList();
-
- for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
- sources.add(new KinesisSource(
- kinesis,
- new StaticCheckpointGenerator(partition)));
- }
- return sources;
- }
-
- /**
- * Creates reader based on given {@link KinesisReaderCheckpoint}.
- * If {@link KinesisReaderCheckpoint} is not given, then we use
- * {@code initialCheckpointGenerator} to generate new checkpoint.
- */
- @Override
- public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
- KinesisReaderCheckpoint checkpointMark) {
-
- CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
-
- if (checkpointMark != null) {
- checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
- }
-
- LOG.info("Creating new reader using {}", checkpointGenerator);
-
- return new KinesisReader(
- SimplifiedKinesisClient.from(kinesis),
- checkpointGenerator,
- this);
- }
-
- @Override
- public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
- return SerializableCoder.of(KinesisReaderCheckpoint.class);
- }
-
- @Override
- public void validate() {
- checkNotNull(kinesis);
- checkNotNull(initialCheckpointGenerator);
- }
-
- @Override
- public Coder<KinesisRecord> getDefaultOutputCoder() {
- return KinesisRecordCoder.of();
- }
+ LOG.info("Creating new reader using {}", checkpointGenerator);
+
+ return new KinesisReader(
+ SimplifiedKinesisClient.from(kinesis),
+ checkpointGenerator,
+ this);
+ }
+
+ @Override
+ public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
+ return SerializableCoder.of(KinesisReaderCheckpoint.class);
+ }
+
+ @Override
+ public void validate() {
+ checkNotNull(kinesis);
+ checkNotNull(initialCheckpointGenerator);
+ }
+
+ @Override
+ public Coder<KinesisRecord> getDefaultOutputCoder() {
+ return KinesisRecordCoder.of();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
index 40e65fc..eca725c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
@@ -21,7 +21,6 @@ import static com.google.common.collect.Lists.newArrayList;
import java.util.List;
-
/**
* Filters out records, which were already processed and checkpointed.
*
@@ -29,13 +28,14 @@ import java.util.List;
* accuracy, not with "subSequenceNumber" accuracy.
*/
class RecordFilter {
- public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
- List<KinesisRecord> filteredRecords = newArrayList();
- for (KinesisRecord record : records) {
- if (checkpoint.isBeforeOrAt(record)) {
- filteredRecords.add(record);
- }
- }
- return filteredRecords;
+
+ public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
+ List<KinesisRecord> filteredRecords = newArrayList();
+ for (KinesisRecord record : records) {
+ if (checkpoint.isBeforeOrAt(record)) {
+ filteredRecords.add(record);
+ }
}
+ return filteredRecords;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
index e4ff541..806d982 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
@@ -27,27 +27,28 @@ import java.util.Iterator;
* Very simple implementation of round robin algorithm.
*/
class RoundRobin<T> implements Iterable<T> {
- private final Deque<T> deque;
- public RoundRobin(Iterable<T> collection) {
- this.deque = newArrayDeque(collection);
- checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
- }
+ private final Deque<T> deque;
- public T getCurrent() {
- return deque.getFirst();
- }
+ public RoundRobin(Iterable<T> collection) {
+ this.deque = newArrayDeque(collection);
+ checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
+ }
- public void moveForward() {
- deque.addLast(deque.removeFirst());
- }
+ public T getCurrent() {
+ return deque.getFirst();
+ }
- public int size() {
- return deque.size();
- }
+ public void moveForward() {
+ deque.addLast(deque.removeFirst());
+ }
- @Override
- public Iterator<T> iterator() {
- return deque.iterator();
- }
+ public int size() {
+ return deque.size();
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return deque.iterator();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
index 6aa3504..95f97b8 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
@@ -27,9 +26,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
import java.io.Serializable;
-import org.joda.time.Instant;
+import org.joda.time.Instant;
/**
* Checkpoint mark for single shard in the stream.
@@ -45,131 +45,132 @@ import org.joda.time.Instant;
* This class is immutable.
*/
class ShardCheckpoint implements Serializable {
- private final String streamName;
- private final String shardId;
- private final String sequenceNumber;
- private final ShardIteratorType shardIteratorType;
- private final Long subSequenceNumber;
- private final Instant timestamp;
-
- public ShardCheckpoint(String streamName, String shardId, StartingPoint
- startingPoint) {
- this(streamName, shardId,
- ShardIteratorType.fromValue(startingPoint.getPositionName()),
- startingPoint.getTimestamp());
- }
-
- public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
- shardIteratorType, Instant timestamp) {
- this(streamName, shardId, shardIteratorType, null, null, timestamp);
- }
-
- public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
- shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
- this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
- }
-
- private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
- String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
- this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
- this.streamName = checkNotNull(streamName, "streamName");
- this.shardId = checkNotNull(shardId, "shardId");
- if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
- checkNotNull(sequenceNumber,
- "You must provide sequence number for AT_SEQUENCE_NUMBER"
- + " or AFTER_SEQUENCE_NUMBER");
- } else {
- checkArgument(sequenceNumber == null,
- "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
- }
- if (shardIteratorType == AT_TIMESTAMP) {
- checkNotNull(timestamp,
- "You must provide timestamp for AT_SEQUENCE_NUMBER"
- + " or AFTER_SEQUENCE_NUMBER");
- } else {
- checkArgument(timestamp == null,
- "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
- }
-
- this.subSequenceNumber = subSequenceNumber;
- this.sequenceNumber = sequenceNumber;
- this.timestamp = timestamp;
- }
-
- /**
- * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
- * on the the underlying shardIteratorType, it will either compare the timestamp or the
- * {@link ExtendedSequenceNumber}.
- *
- * @param other
- * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
- */
- public boolean isBeforeOrAt(KinesisRecord other) {
- if (shardIteratorType == AT_TIMESTAMP) {
- return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
- }
- int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
- if (result == 0) {
- return shardIteratorType == AT_SEQUENCE_NUMBER;
- }
- return result < 0;
- }
-
- private ExtendedSequenceNumber extendedSequenceNumber() {
- String fullSequenceNumber = sequenceNumber;
- if (fullSequenceNumber == null) {
- fullSequenceNumber = shardIteratorType.toString();
- }
- return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
- }
- @Override
- public String toString() {
- return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
- streamName, shardId,
- sequenceNumber);
+ private final String streamName;
+ private final String shardId;
+ private final String sequenceNumber;
+ private final ShardIteratorType shardIteratorType;
+ private final Long subSequenceNumber;
+ private final Instant timestamp;
+
+ public ShardCheckpoint(String streamName, String shardId, StartingPoint
+ startingPoint) {
+ this(streamName, shardId,
+ ShardIteratorType.fromValue(startingPoint.getPositionName()),
+ startingPoint.getTimestamp());
+ }
+
+ public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+ shardIteratorType, Instant timestamp) {
+ this(streamName, shardId, shardIteratorType, null, null, timestamp);
+ }
+
+ public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+ shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
+ this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
+ }
+
+ private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
+ String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
+ this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
+ this.streamName = checkNotNull(streamName, "streamName");
+ this.shardId = checkNotNull(shardId, "shardId");
+ if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
+ checkNotNull(sequenceNumber,
+ "You must provide sequence number for AT_SEQUENCE_NUMBER"
+ + " or AFTER_SEQUENCE_NUMBER");
+ } else {
+ checkArgument(sequenceNumber == null,
+ "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
}
-
- public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
- throws TransientKinesisException {
- return new ShardRecordsIterator(this, kinesis);
+ if (shardIteratorType == AT_TIMESTAMP) {
+ checkNotNull(timestamp,
+ "You must provide timestamp for AT_SEQUENCE_NUMBER"
+ + " or AFTER_SEQUENCE_NUMBER");
+ } else {
+ checkArgument(timestamp == null,
+ "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
}
- public String getShardIterator(SimplifiedKinesisClient kinesisClient)
- throws TransientKinesisException {
- if (checkpointIsInTheMiddleOfAUserRecord()) {
- return kinesisClient.getShardIterator(streamName,
- shardId, AT_SEQUENCE_NUMBER,
- sequenceNumber, null);
- }
- return kinesisClient.getShardIterator(streamName,
- shardId, shardIteratorType,
- sequenceNumber, timestamp);
+ this.subSequenceNumber = subSequenceNumber;
+ this.sequenceNumber = sequenceNumber;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
+ * on the the underlying shardIteratorType, it will either compare the timestamp or the
+ * {@link ExtendedSequenceNumber}.
+ *
+ * @param other
+ * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
+ */
+ public boolean isBeforeOrAt(KinesisRecord other) {
+ if (shardIteratorType == AT_TIMESTAMP) {
+ return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
}
-
- private boolean checkpointIsInTheMiddleOfAUserRecord() {
- return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
+ int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
+ if (result == 0) {
+ return shardIteratorType == AT_SEQUENCE_NUMBER;
}
+ return result < 0;
+ }
- /**
- * Used to advance checkpoint mark to position after given {@link Record}.
- *
- * @param record
- * @return new checkpoint object pointing directly after given {@link Record}
- */
- public ShardCheckpoint moveAfter(KinesisRecord record) {
- return new ShardCheckpoint(
- streamName, shardId,
- AFTER_SEQUENCE_NUMBER,
- record.getSequenceNumber(),
- record.getSubSequenceNumber());
+ private ExtendedSequenceNumber extendedSequenceNumber() {
+ String fullSequenceNumber = sequenceNumber;
+ if (fullSequenceNumber == null) {
+ fullSequenceNumber = shardIteratorType.toString();
}
-
- public String getStreamName() {
- return streamName;
- }
-
- public String getShardId() {
- return shardId;
+ return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
+ streamName, shardId,
+ sequenceNumber);
+ }
+
+ public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
+ throws TransientKinesisException {
+ return new ShardRecordsIterator(this, kinesis);
+ }
+
+ public String getShardIterator(SimplifiedKinesisClient kinesisClient)
+ throws TransientKinesisException {
+ if (checkpointIsInTheMiddleOfAUserRecord()) {
+ return kinesisClient.getShardIterator(streamName,
+ shardId, AT_SEQUENCE_NUMBER,
+ sequenceNumber, null);
}
+ return kinesisClient.getShardIterator(streamName,
+ shardId, shardIteratorType,
+ sequenceNumber, timestamp);
+ }
+
+ private boolean checkpointIsInTheMiddleOfAUserRecord() {
+ return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
+ }
+
+ /**
+ * Used to advance checkpoint mark to position after given {@link Record}.
+ *
+ * @param record
+ * @return new checkpoint object pointing directly after given {@link Record}
+ */
+ public ShardCheckpoint moveAfter(KinesisRecord record) {
+ return new ShardCheckpoint(
+ streamName, shardId,
+ AFTER_SEQUENCE_NUMBER,
+ record.getSequenceNumber(),
+ record.getSubSequenceNumber());
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public String getShardId() {
+ return shardId;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
index 872f604..a69c6c1 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -21,7 +21,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Queues.newArrayDeque;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+
import java.util.Deque;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,68 +33,68 @@ import org.slf4j.LoggerFactory;
* Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one.
*/
class ShardRecordsIterator {
- private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
- private final SimplifiedKinesisClient kinesis;
- private final RecordFilter filter;
- private ShardCheckpoint checkpoint;
- private String shardIterator;
- private Deque<KinesisRecord> data = newArrayDeque();
+ private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
- public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
- SimplifiedKinesisClient simplifiedKinesisClient) throws
- TransientKinesisException {
- this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
- }
+ private final SimplifiedKinesisClient kinesis;
+ private final RecordFilter filter;
+ private ShardCheckpoint checkpoint;
+ private String shardIterator;
+ private Deque<KinesisRecord> data = newArrayDeque();
- public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
- SimplifiedKinesisClient simplifiedKinesisClient,
- RecordFilter filter) throws
- TransientKinesisException {
+ public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+ SimplifiedKinesisClient simplifiedKinesisClient) throws
+ TransientKinesisException {
+ this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
+ }
- this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
- this.filter = checkNotNull(filter, "filter");
- this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
- shardIterator = checkpoint.getShardIterator(kinesis);
- }
+ public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+ SimplifiedKinesisClient simplifiedKinesisClient,
+ RecordFilter filter) throws
+ TransientKinesisException {
- /**
- * Returns record if there's any present.
- * Returns absent() if there are no new records at this time in the shard.
- */
- public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
- readMoreIfNecessary();
+ this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
+ this.filter = checkNotNull(filter, "filter");
+ this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
+ shardIterator = checkpoint.getShardIterator(kinesis);
+ }
- if (data.isEmpty()) {
- return CustomOptional.absent();
- } else {
- KinesisRecord record = data.removeFirst();
- checkpoint = checkpoint.moveAfter(record);
- return CustomOptional.of(record);
- }
- }
+ /**
+ * Returns record if there's any present.
+ * Returns absent() if there are no new records at this time in the shard.
+ */
+ public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
+ readMoreIfNecessary();
- private void readMoreIfNecessary() throws TransientKinesisException {
- if (data.isEmpty()) {
- GetKinesisRecordsResult response;
- try {
- response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
- checkpoint.getShardId());
- } catch (ExpiredIteratorException e) {
- LOG.info("Refreshing expired iterator", e);
- shardIterator = checkpoint.getShardIterator(kinesis);
- response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
- checkpoint.getShardId());
- }
- LOG.debug("Fetched {} new records", response.getRecords().size());
- shardIterator = response.getNextShardIterator();
- data.addAll(filter.apply(response.getRecords(), checkpoint));
- }
+ if (data.isEmpty()) {
+ return CustomOptional.absent();
+ } else {
+ KinesisRecord record = data.removeFirst();
+ checkpoint = checkpoint.moveAfter(record);
+ return CustomOptional.of(record);
}
+ }
- public ShardCheckpoint getCheckpoint() {
- return checkpoint;
+ private void readMoreIfNecessary() throws TransientKinesisException {
+ if (data.isEmpty()) {
+ GetKinesisRecordsResult response;
+ try {
+ response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+ checkpoint.getShardId());
+ } catch (ExpiredIteratorException e) {
+ LOG.info("Refreshing expired iterator", e);
+ shardIterator = checkpoint.getShardIterator(kinesis);
+ response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+ checkpoint.getShardId());
+ }
+ LOG.debug("Fetched {} new records", response.getRecords().size());
+ shardIterator = response.getNextShardIterator();
+ data.addAll(filter.apply(response.getRecords(), checkpoint));
}
+ }
+ public ShardCheckpoint getCheckpoint() {
+ return checkpoint;
+ }
}
[2/4] beam git commit: Reformatting Kinesis IO to comply with
official code style
Posted by jb...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index 3e3984a..80c950f 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
@@ -31,9 +30,11 @@ import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.google.common.collect.Lists;
+
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
+
import org.joda.time.Instant;
/**
@@ -41,117 +42,121 @@ import org.joda.time.Instant;
* proper error handling.
*/
class SimplifiedKinesisClient {
- private final AmazonKinesis kinesis;
- public SimplifiedKinesisClient(AmazonKinesis kinesis) {
- this.kinesis = kinesis;
- }
+ private final AmazonKinesis kinesis;
- public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
- return new SimplifiedKinesisClient(provider.get());
- }
+ public SimplifiedKinesisClient(AmazonKinesis kinesis) {
+ this.kinesis = kinesis;
+ }
- public String getShardIterator(final String streamName, final String shardId,
- final ShardIteratorType shardIteratorType,
- final String startingSequenceNumber, final Instant timestamp)
- throws TransientKinesisException {
- final Date date = timestamp != null ? timestamp.toDate() : null;
- return wrapExceptions(new Callable<String>() {
- @Override
- public String call() throws Exception {
- return kinesis.getShardIterator(new GetShardIteratorRequest()
- .withStreamName(streamName)
- .withShardId(shardId)
- .withShardIteratorType(shardIteratorType)
- .withStartingSequenceNumber(startingSequenceNumber)
- .withTimestamp(date)
- ).getShardIterator();
- }
- });
- }
+ public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
+ return new SimplifiedKinesisClient(provider.get());
+ }
- public List<Shard> listShards(final String streamName) throws TransientKinesisException {
- return wrapExceptions(new Callable<List<Shard>>() {
- @Override
- public List<Shard> call() throws Exception {
- List<Shard> shards = Lists.newArrayList();
- String lastShardId = null;
-
- StreamDescription description;
- do {
- description = kinesis.describeStream(streamName, lastShardId)
- .getStreamDescription();
-
- shards.addAll(description.getShards());
- lastShardId = shards.get(shards.size() - 1).getShardId();
- } while (description.getHasMoreShards());
-
- return shards;
- }
- });
- }
+ public String getShardIterator(final String streamName, final String shardId,
+ final ShardIteratorType shardIteratorType,
+ final String startingSequenceNumber, final Instant timestamp)
+ throws TransientKinesisException {
+ final Date date = timestamp != null ? timestamp.toDate() : null;
+ return wrapExceptions(new Callable<String>() {
- /**
- * Gets records from Kinesis and deaggregates them if needed.
- *
- * @return list of deaggregated records
- * @throws TransientKinesisException - in case of recoverable situation
- */
- public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
- String shardId) throws TransientKinesisException {
- return getRecords(shardIterator, streamName, shardId, null);
- }
+ @Override
+ public String call() throws Exception {
+ return kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(streamName)
+ .withShardId(shardId)
+ .withShardIteratorType(shardIteratorType)
+ .withStartingSequenceNumber(startingSequenceNumber)
+ .withTimestamp(date)
+ ).getShardIterator();
+ }
+ });
+ }
- /**
- * Gets records from Kinesis and deaggregates them if needed.
- *
- * @return list of deaggregated records
- * @throws TransientKinesisException - in case of recoverable situation
- */
- public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
- final String shardId, final Integer limit)
- throws
- TransientKinesisException {
- return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
- @Override
- public GetKinesisRecordsResult call() throws Exception {
- GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
- .withShardIterator(shardIterator)
- .withLimit(limit));
- return new GetKinesisRecordsResult(
- UserRecord.deaggregate(response.getRecords()),
- response.getNextShardIterator(),
- streamName, shardId);
- }
- });
- }
+ public List<Shard> listShards(final String streamName) throws TransientKinesisException {
+ return wrapExceptions(new Callable<List<Shard>>() {
+
+ @Override
+ public List<Shard> call() throws Exception {
+ List<Shard> shards = Lists.newArrayList();
+ String lastShardId = null;
+
+ StreamDescription description;
+ do {
+ description = kinesis.describeStream(streamName, lastShardId)
+ .getStreamDescription();
+
+ shards.addAll(description.getShards());
+ lastShardId = shards.get(shards.size() - 1).getShardId();
+ } while (description.getHasMoreShards());
+
+ return shards;
+ }
+ });
+ }
+
+ /**
+ * Gets records from Kinesis and deaggregates them if needed.
+ *
+ * @return list of deaggregated records
+ * @throws TransientKinesisException - in case of recoverable situation
+ */
+ public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
+ String shardId) throws TransientKinesisException {
+ return getRecords(shardIterator, streamName, shardId, null);
+ }
+
+ /**
+ * Gets records from Kinesis and deaggregates them if needed.
+ *
+ * @return list of deaggregated records
+ * @throws TransientKinesisException - in case of recoverable situation
+ */
+ public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
+ final String shardId, final Integer limit)
+ throws
+ TransientKinesisException {
+ return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
+
+ @Override
+ public GetKinesisRecordsResult call() throws Exception {
+ GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
+ .withShardIterator(shardIterator)
+ .withLimit(limit));
+ return new GetKinesisRecordsResult(
+ UserRecord.deaggregate(response.getRecords()),
+ response.getNextShardIterator(),
+ streamName, shardId);
+ }
+ });
+ }
- /**
- * Wraps Amazon specific exceptions into more friendly format.
- *
- * @throws TransientKinesisException - in case of recoverable situation, i.e.
- * the request rate is too high, Kinesis remote service
- * failed, network issue, etc.
- * @throws ExpiredIteratorException - if iterator needs to be refreshed
- * @throws RuntimeException - in all other cases
- */
- private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
- try {
- return callable.call();
- } catch (ExpiredIteratorException e) {
- throw e;
- } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
- throw new TransientKinesisException(
- "Too many requests to Kinesis. Wait some time and retry.", e);
- } catch (AmazonServiceException e) {
- if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
- throw new TransientKinesisException(
- "Kinesis backend failed. Wait some time and retry.", e);
- }
- throw new RuntimeException("Kinesis client side failure", e);
- } catch (Exception e) {
- throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
- }
+ /**
+ * Wraps Amazon specific exceptions into more friendly format.
+ *
+ * @throws TransientKinesisException - in case of recoverable situation, i.e.
+ * the request rate is too high, Kinesis remote service
+ * failed, network issue, etc.
+ * @throws ExpiredIteratorException - if iterator needs to be refreshed
+ * @throws RuntimeException - in all other cases
+ */
+ private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
+ try {
+ return callable.call();
+ } catch (ExpiredIteratorException e) {
+ throw e;
+ } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
+ throw new TransientKinesisException(
+ "Too many requests to Kinesis. Wait some time and retry.", e);
+ } catch (AmazonServiceException e) {
+ if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
+ throw new TransientKinesisException(
+ "Kinesis backend failed. Wait some time and retry.", e);
+ }
+ throw new RuntimeException("Kinesis client side failure", e);
+ } catch (Exception e) {
+ throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
index d8842c4..f9298fa 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
@@ -17,13 +17,14 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import static com.google.common.base.Preconditions.checkNotNull;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
import java.io.Serializable;
import java.util.Objects;
+
import org.joda.time.Instant;
/**
@@ -32,54 +33,55 @@ import org.joda.time.Instant;
* in which case the reader will start reading at the specified point in time.
*/
class StartingPoint implements Serializable {
- private final InitialPositionInStream position;
- private final Instant timestamp;
- public StartingPoint(InitialPositionInStream position) {
- this.position = checkNotNull(position, "position");
- this.timestamp = null;
- }
+ private final InitialPositionInStream position;
+ private final Instant timestamp;
- public StartingPoint(Instant timestamp) {
- this.timestamp = checkNotNull(timestamp, "timestamp");
- this.position = null;
- }
+ public StartingPoint(InitialPositionInStream position) {
+ this.position = checkNotNull(position, "position");
+ this.timestamp = null;
+ }
- public InitialPositionInStream getPosition() {
- return position;
- }
+ public StartingPoint(Instant timestamp) {
+ this.timestamp = checkNotNull(timestamp, "timestamp");
+ this.position = null;
+ }
- public String getPositionName() {
- return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
- }
+ public InitialPositionInStream getPosition() {
+ return position;
+ }
- public Instant getTimestamp() {
- return timestamp != null ? timestamp : null;
- }
+ public String getPositionName() {
+ return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
+ }
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- StartingPoint that = (StartingPoint) o;
- return position == that.position && Objects.equals(timestamp, that.timestamp);
- }
+ public Instant getTimestamp() {
+ return timestamp != null ? timestamp : null;
+ }
- @Override
- public int hashCode() {
- return Objects.hash(position, timestamp);
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
}
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StartingPoint that = (StartingPoint) o;
+ return position == that.position && Objects.equals(timestamp, that.timestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(position, timestamp);
+ }
- @Override
- public String toString() {
- if (timestamp == null) {
- return position.toString();
- } else {
- return "Starting at timestamp " + timestamp;
- }
+ @Override
+ public String toString() {
+ if (timestamp == null) {
+ return position.toString();
+ } else {
+ return "Starting at timestamp " + timestamp;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
index 22dc973..1ec865d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
@@ -23,20 +23,21 @@ import static com.google.common.base.Preconditions.checkNotNull;
* Always returns the same instance of checkpoint.
*/
class StaticCheckpointGenerator implements CheckpointGenerator {
- private final KinesisReaderCheckpoint checkpoint;
- public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
- checkNotNull(checkpoint, "checkpoint");
- this.checkpoint = checkpoint;
- }
+ private final KinesisReaderCheckpoint checkpoint;
- @Override
- public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
- return checkpoint;
- }
+ public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
+ checkNotNull(checkpoint, "checkpoint");
+ this.checkpoint = checkpoint;
+ }
- @Override
- public String toString() {
- return checkpoint.toString();
- }
+ @Override
+ public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
+ return checkpoint;
+ }
+
+ @Override
+ public String toString() {
+ return checkpoint.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
index 57ad8a8..68ca0d7 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
@@ -23,7 +23,8 @@ import com.amazonaws.AmazonServiceException;
* A transient exception thrown by Kinesis.
*/
class TransientKinesisException extends Exception {
- public TransientKinesisException(String s, AmazonServiceException e) {
- super(s, e);
- }
+
+ public TransientKinesisException(String s, AmazonServiceException e) {
+ super(s, e);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index 046c9d9..994d6e3 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -66,10 +66,12 @@ import com.amazonaws.services.kinesis.model.SplitShardRequest;
import com.amazonaws.services.kinesis.model.SplitShardResult;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.google.common.base.Function;
+
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import javax.annotation.Nullable;
+
import org.apache.commons.lang.builder.EqualsBuilder;
import org.joda.time.Instant;
@@ -78,298 +80,301 @@ import org.joda.time.Instant;
*/
class AmazonKinesisMock implements AmazonKinesis {
- static class TestData implements Serializable {
- private final String data;
- private final Instant arrivalTimestamp;
- private final String sequenceNumber;
-
- public TestData(KinesisRecord record) {
- this(new String(record.getData().array()),
- record.getApproximateArrivalTimestamp(),
- record.getSequenceNumber());
- }
-
- public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
- this.data = data;
- this.arrivalTimestamp = arrivalTimestamp;
- this.sequenceNumber = sequenceNumber;
- }
-
- public Record convertToRecord() {
- return new Record().
- withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
- withData(ByteBuffer.wrap(data.getBytes())).
- withSequenceNumber(sequenceNumber).
- withPartitionKey("");
- }
-
- @Override
- public boolean equals(Object obj) {
- return EqualsBuilder.reflectionEquals(this, obj);
- }
-
- @Override
- public int hashCode() {
- return reflectionHashCode(this);
- }
- }
-
- static class Provider implements KinesisClientProvider {
-
- private final List<List<TestData>> shardedData;
- private final int numberOfRecordsPerGet;
-
- public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
- this.shardedData = shardedData;
- this.numberOfRecordsPerGet = numberOfRecordsPerGet;
- }
-
- @Override
- public AmazonKinesis get() {
- return new AmazonKinesisMock(transform(shardedData,
- new Function<List<TestData>, List<Record>>() {
- @Override
- public List<Record> apply(@Nullable List<TestData> testDatas) {
- return transform(testDatas, new Function<TestData, Record>() {
- @Override
- public Record apply(@Nullable TestData testData) {
- return testData.convertToRecord();
- }
- });
- }
- }), numberOfRecordsPerGet);
- }
- }
-
- private final List<List<Record>> shardedData;
- private final int numberOfRecordsPerGet;
-
- public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
- this.shardedData = shardedData;
- this.numberOfRecordsPerGet = numberOfRecordsPerGet;
- }
-
- @Override
- public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
- String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
- int shardId = parseInt(shardIteratorParts[0]);
- int startingRecord = parseInt(shardIteratorParts[1]);
- List<Record> shardData = shardedData.get(shardId);
-
- int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
- int fromIndex = min(startingRecord, toIndex);
- return new GetRecordsResult().
- withRecords(shardData.subList(fromIndex, toIndex)).
- withNextShardIterator(String.format("%s:%s", shardId, toIndex));
- }
-
- @Override
- public GetShardIteratorResult getShardIterator(
- GetShardIteratorRequest getShardIteratorRequest) {
- ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
- getShardIteratorRequest.getShardIteratorType());
-
- String shardIterator;
- if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
- shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
- } else {
- throw new RuntimeException("Not implemented");
- }
-
- return new GetShardIteratorResult().withShardIterator(shardIterator);
- }
-
- @Override
- public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
- int nextShardId = 0;
- if (exclusiveStartShardId != null) {
- nextShardId = parseInt(exclusiveStartShardId) + 1;
- }
- boolean hasMoreShards = nextShardId + 1 < shardedData.size();
-
- List<Shard> shards = newArrayList();
- if (nextShardId < shardedData.size()) {
- shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
- }
-
- return new DescribeStreamResult().withStreamDescription(
- new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
- );
- }
-
- @Override
- public void setEndpoint(String endpoint) {
-
- }
-
- @Override
- public void setRegion(Region region) {
-
- }
-
- @Override
- public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public CreateStreamResult createStream(String streamName, Integer shardCount) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
- DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DeleteStreamResult deleteStream(String streamName) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DescribeStreamResult describeStream(String streamName) {
-
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DescribeStreamResult describeStream(String streamName,
- Integer limit, String exclusiveStartShardId) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
- DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
- EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public GetShardIteratorResult getShardIterator(String streamName,
- String shardId,
- String shardIteratorType) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public GetShardIteratorResult getShardIterator(String streamName,
- String shardId,
- String shardIteratorType,
- String startingSequenceNumber) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
- IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
- throw new RuntimeException("Not implemented");
- }
+ static class TestData implements Serializable {
- @Override
- public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
- throw new RuntimeException("Not implemented");
- }
+ private final String data;
+ private final Instant arrivalTimestamp;
+ private final String sequenceNumber;
- @Override
- public ListStreamsResult listStreams() {
- throw new RuntimeException("Not implemented");
+ public TestData(KinesisRecord record) {
+ this(new String(record.getData().array()),
+ record.getApproximateArrivalTimestamp(),
+ record.getSequenceNumber());
}
- @Override
- public ListStreamsResult listStreams(String exclusiveStartStreamName) {
- throw new RuntimeException("Not implemented");
+ public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
+ this.data = data;
+ this.arrivalTimestamp = arrivalTimestamp;
+ this.sequenceNumber = sequenceNumber;
}
- @Override
- public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
- throw new RuntimeException("Not implemented");
+ public Record convertToRecord() {
+ return new Record().
+ withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
+ withData(ByteBuffer.wrap(data.getBytes())).
+ withSequenceNumber(sequenceNumber).
+ withPartitionKey("");
}
@Override
- public ListTagsForStreamResult listTagsForStream(
- ListTagsForStreamRequest listTagsForStreamRequest) {
- throw new RuntimeException("Not implemented");
+ public boolean equals(Object obj) {
+ return EqualsBuilder.reflectionEquals(this, obj);
}
@Override
- public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
- throw new RuntimeException("Not implemented");
+ public int hashCode() {
+ return reflectionHashCode(this);
}
+ }
- @Override
- public MergeShardsResult mergeShards(String streamName,
- String shardToMerge, String adjacentShardToMerge) {
- throw new RuntimeException("Not implemented");
- }
+ static class Provider implements KinesisClientProvider {
- @Override
- public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
- throw new RuntimeException("Not implemented");
- }
+ private final List<List<TestData>> shardedData;
+ private final int numberOfRecordsPerGet;
- @Override
- public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
- throw new RuntimeException("Not implemented");
+ public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
+ this.shardedData = shardedData;
+ this.numberOfRecordsPerGet = numberOfRecordsPerGet;
}
@Override
- public PutRecordResult putRecord(String streamName, ByteBuffer data,
- String partitionKey, String sequenceNumberForOrdering) {
- throw new RuntimeException("Not implemented");
- }
+ public AmazonKinesis get() {
+ return new AmazonKinesisMock(transform(shardedData,
+ new Function<List<TestData>, List<Record>>() {
- @Override
- public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
- throw new RuntimeException("Not implemented");
- }
+ @Override
+ public List<Record> apply(@Nullable List<TestData> testDatas) {
+ return transform(testDatas, new Function<TestData, Record>() {
- @Override
- public RemoveTagsFromStreamResult removeTagsFromStream(
- RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
- throw new RuntimeException("Not implemented");
+ @Override
+ public Record apply(@Nullable TestData testData) {
+ return testData.convertToRecord();
+ }
+ });
+ }
+ }), numberOfRecordsPerGet);
}
+ }
- @Override
- public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
- throw new RuntimeException("Not implemented");
- }
+ private final List<List<Record>> shardedData;
+ private final int numberOfRecordsPerGet;
- @Override
- public SplitShardResult splitShard(String streamName,
- String shardToSplit, String newStartingHashKey) {
- throw new RuntimeException("Not implemented");
- }
+ public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
+ this.shardedData = shardedData;
+ this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+ }
- @Override
- public void shutdown() {
+ @Override
+ public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
+ String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
+ int shardId = parseInt(shardIteratorParts[0]);
+ int startingRecord = parseInt(shardIteratorParts[1]);
+ List<Record> shardData = shardedData.get(shardId);
- }
+ int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
+ int fromIndex = min(startingRecord, toIndex);
+ return new GetRecordsResult().
+ withRecords(shardData.subList(fromIndex, toIndex)).
+ withNextShardIterator(String.format("%s:%s", shardId, toIndex));
+ }
- @Override
- public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
- throw new RuntimeException("Not implemented");
- }
+ @Override
+ public GetShardIteratorResult getShardIterator(
+ GetShardIteratorRequest getShardIteratorRequest) {
+ ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
+ getShardIteratorRequest.getShardIteratorType());
+
+ String shardIterator;
+ if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
+ shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
+ } else {
+ throw new RuntimeException("Not implemented");
+ }
+
+ return new GetShardIteratorResult().withShardIterator(shardIterator);
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
+ int nextShardId = 0;
+ if (exclusiveStartShardId != null) {
+ nextShardId = parseInt(exclusiveStartShardId) + 1;
+ }
+ boolean hasMoreShards = nextShardId + 1 < shardedData.size();
+
+ List<Shard> shards = newArrayList();
+ if (nextShardId < shardedData.size()) {
+ shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
+ }
+
+ return new DescribeStreamResult().withStreamDescription(
+ new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
+ );
+ }
+
+ @Override
+ public void setEndpoint(String endpoint) {
+
+ }
+
+ @Override
+ public void setRegion(Region region) {
+
+ }
+
+ @Override
+ public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public CreateStreamResult createStream(String streamName, Integer shardCount) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
+ DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DeleteStreamResult deleteStream(String streamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName) {
+
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName,
+ Integer limit, String exclusiveStartShardId) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
+ DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
+ EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public GetShardIteratorResult getShardIterator(String streamName,
+ String shardId,
+ String shardIteratorType) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public GetShardIteratorResult getShardIterator(String streamName,
+ String shardId,
+ String shardIteratorType,
+ String startingSequenceNumber) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
+ IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(String exclusiveStartStreamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListTagsForStreamResult listTagsForStream(
+ ListTagsForStreamRequest listTagsForStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public MergeShardsResult mergeShards(String streamName,
+ String shardToMerge, String adjacentShardToMerge) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordResult putRecord(String streamName, ByteBuffer data,
+ String partitionKey, String sequenceNumberForOrdering) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public RemoveTagsFromStreamResult removeTagsFromStream(
+ RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public SplitShardResult splitShard(String streamName,
+ String shardToSplit, String newStartingHashKey) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+ throw new RuntimeException("Not implemented");
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
index 00acffe..0b16bb7 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -18,24 +18,27 @@
package org.apache.beam.sdk.io.kinesis;
import com.google.common.testing.EqualsTester;
+
import java.util.NoSuchElementException;
+
import org.junit.Test;
/**
* Tests {@link CustomOptional}.
*/
public class CustomOptionalTest {
- @Test(expected = NoSuchElementException.class)
- public void absentThrowsNoSuchElementExceptionOnGet() {
- CustomOptional.absent().get();
- }
- @Test
- public void testEqualsAndHashCode() {
- new EqualsTester()
- .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent())
- .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3))
- .addEqualityGroup(CustomOptional.of(11))
- .addEqualityGroup(CustomOptional.of("3")).testEquals();
- }
+ @Test(expected = NoSuchElementException.class)
+ public void absentThrowsNoSuchElementExceptionOnGet() {
+ CustomOptional.absent().get();
+ }
+
+ @Test
+ public void testEqualsAndHashCode() {
+ new EqualsTester()
+ .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent())
+ .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3))
+ .addEqualityGroup(CustomOptional.of(11))
+ .addEqualityGroup(CustomOptional.of("3")).testEquals();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
index c92ac9a..1bb9717 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -28,30 +28,29 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-
/***
*/
@RunWith(MockitoJUnitRunner.class)
public class DynamicCheckpointGeneratorTest {
- @Mock
- private SimplifiedKinesisClient kinesisClient;
- @Mock
- private Shard shard1, shard2, shard3;
+ @Mock
+ private SimplifiedKinesisClient kinesisClient;
+ @Mock
+ private Shard shard1, shard2, shard3;
- @Test
- public void shouldMapAllShardsToCheckpoints() throws Exception {
- given(shard1.getShardId()).willReturn("shard-01");
- given(shard2.getShardId()).willReturn("shard-02");
- given(shard3.getShardId()).willReturn("shard-03");
- given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
+ @Test
+ public void shouldMapAllShardsToCheckpoints() throws Exception {
+ given(shard1.getShardId()).willReturn("shard-01");
+ given(shard2.getShardId()).willReturn("shard-02");
+ given(shard3.getShardId()).willReturn("shard-03");
+ given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
- StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
- DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
- startingPoint);
+ StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
+ DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
+ startingPoint);
- KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
+ KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
- assertThat(checkpoint).hasSize(3);
- }
+ assertThat(checkpoint).hasSize(3);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index 567e25f..44ad67d 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -21,7 +21,9 @@ import static com.google.common.collect.Lists.newArrayList;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.google.common.collect.Iterables;
+
import java.util.List;
+
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
@@ -36,59 +38,60 @@ import org.junit.Test;
*/
public class KinesisMockReadTest {
- @Rule
- public final transient TestPipeline p = TestPipeline.create();
-
- @Test
- public void readsDataFromMockKinesis() {
- int noOfShards = 3;
- int noOfEventsPerShard = 100;
- List<List<AmazonKinesisMock.TestData>> testData =
- provideTestData(noOfShards, noOfEventsPerShard);
-
- PCollection<AmazonKinesisMock.TestData> result = p
- .apply(
- KinesisIO.read()
- .from("stream", InitialPositionInStream.TRIM_HORIZON)
- .withClientProvider(new AmazonKinesisMock.Provider(testData, 10))
- .withMaxNumRecords(noOfShards * noOfEventsPerShard))
- .apply(ParDo.of(new KinesisRecordToTestData()));
- PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
- p.run();
- }
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
- private static class KinesisRecordToTestData extends
- DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(new AmazonKinesisMock.TestData(c.element()));
- }
- }
+ @Test
+ public void readsDataFromMockKinesis() {
+ int noOfShards = 3;
+ int noOfEventsPerShard = 100;
+ List<List<AmazonKinesisMock.TestData>> testData =
+ provideTestData(noOfShards, noOfEventsPerShard);
- private List<List<AmazonKinesisMock.TestData>> provideTestData(
- int noOfShards,
- int noOfEventsPerShard) {
+ PCollection<AmazonKinesisMock.TestData> result = p
+ .apply(
+ KinesisIO.read()
+ .from("stream", InitialPositionInStream.TRIM_HORIZON)
+ .withClientProvider(new AmazonKinesisMock.Provider(testData, 10))
+ .withMaxNumRecords(noOfShards * noOfEventsPerShard))
+ .apply(ParDo.of(new KinesisRecordToTestData()));
+ PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
+ p.run();
+ }
- int seqNumber = 0;
+ private static class KinesisRecordToTestData extends
+ DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
- List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
- for (int i = 0; i < noOfShards; ++i) {
- List<AmazonKinesisMock.TestData> shardData = newArrayList();
- shardedData.add(shardData);
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(new AmazonKinesisMock.TestData(c.element()));
+ }
+ }
+
+ private List<List<AmazonKinesisMock.TestData>> provideTestData(
+ int noOfShards,
+ int noOfEventsPerShard) {
- DateTime arrival = DateTime.now();
- for (int j = 0; j < noOfEventsPerShard; ++j) {
- arrival = arrival.plusSeconds(1);
+ int seqNumber = 0;
- seqNumber++;
- shardData.add(new AmazonKinesisMock.TestData(
- Integer.toString(seqNumber),
- arrival.toInstant(),
- Integer.toString(seqNumber))
- );
- }
- }
+ List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
+ for (int i = 0; i < noOfShards; ++i) {
+ List<AmazonKinesisMock.TestData> shardData = newArrayList();
+ shardedData.add(shardData);
- return shardedData;
+ DateTime arrival = DateTime.now();
+ for (int j = 0; j < noOfEventsPerShard; ++j) {
+ arrival = arrival.plusSeconds(1);
+
+ seqNumber++;
+ shardData.add(new AmazonKinesisMock.TestData(
+ Integer.toString(seqNumber),
+ arrival.toInstant(),
+ Integer.toString(seqNumber))
+ );
+ }
}
+
+ return shardedData;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
index 8c8da64..1038a47 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -17,13 +17,14 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.collect.Iterables;
+
import java.util.Iterator;
import java.util.List;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -35,33 +36,34 @@ import org.mockito.runners.MockitoJUnitRunner;
*/
@RunWith(MockitoJUnitRunner.class)
public class KinesisReaderCheckpointTest {
- @Mock
- private ShardCheckpoint a, b, c;
- private KinesisReaderCheckpoint checkpoint;
+ @Mock
+ private ShardCheckpoint a, b, c;
+
+ private KinesisReaderCheckpoint checkpoint;
- @Before
- public void setUp() {
- checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
- }
+ @Before
+ public void setUp() {
+ checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
+ }
- @Test
- public void splitsCheckpointAccordingly() {
- verifySplitInto(1);
- verifySplitInto(2);
- verifySplitInto(3);
- verifySplitInto(4);
- }
+ @Test
+ public void splitsCheckpointAccordingly() {
+ verifySplitInto(1);
+ verifySplitInto(2);
+ verifySplitInto(3);
+ verifySplitInto(4);
+ }
- @Test(expected = UnsupportedOperationException.class)
- public void isImmutable() {
- Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
- iterator.remove();
- }
+ @Test(expected = UnsupportedOperationException.class)
+ public void isImmutable() {
+ Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
+ iterator.remove();
+ }
- private void verifySplitInto(int size) {
- List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
- assertThat(Iterables.concat(split)).containsOnly(a, b, c);
- assertThat(split).hasSize(Math.min(size, 3));
- }
+ private void verifySplitInto(int size) {
+ List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
+ assertThat(Iterables.concat(split)).containsOnly(a, b, c);
+ assertThat(split).hasSize(Math.min(size, 3));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index 8eb6546..5781033 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.assertj.core.api.Assertions.assertThat;
import com.amazonaws.regions.Regions;
+
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -31,6 +32,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
@@ -50,72 +52,75 @@ import org.junit.Test;
* You need to provide all {@link KinesisTestOptions} in order to run this.
*/
public class KinesisReaderIT {
- private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
- private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
-
- @Rule
- public final transient TestPipeline p = TestPipeline.create();
-
- @Ignore
- @Test
- public void readsDataFromRealKinesisStream()
- throws IOException, InterruptedException, ExecutionException {
- KinesisTestOptions options = readKinesisOptions();
- List<String> testData = prepareTestData(1000);
-
- Future<?> future = startTestPipeline(testData, options);
- KinesisUploader.uploadAll(testData, options);
- future.get();
- }
- private List<String> prepareTestData(int count) {
- List<String> data = newArrayList();
- for (int i = 0; i < count; ++i) {
- data.add(RandomStringUtils.randomAlphabetic(32));
- }
- return data;
- }
+ private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
+ private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
- private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
- throws InterruptedException {
-
- PCollection<String> result = p.
- apply(KinesisIO.read()
- .from(options.getAwsKinesisStream(), Instant.now())
- .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(),
- Regions.fromName(options.getAwsKinesisRegion()))
- .withMaxReadTime(Duration.standardMinutes(3))
- ).
- apply(ParDo.of(new RecordDataToString()));
- PAssert.that(result).containsInAnyOrder(testData);
-
- Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- PipelineResult result = p.run();
- PipelineResult.State state = result.getState();
- while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
- Thread.sleep(1000);
- state = result.getState();
- }
- assertThat(state).isEqualTo(PipelineResult.State.DONE);
- return null;
- }
- });
- Thread.sleep(PIPELINE_STARTUP_TIME);
- return future;
- }
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
+
+ @Ignore
+ @Test
+ public void readsDataFromRealKinesisStream()
+ throws IOException, InterruptedException, ExecutionException {
+ KinesisTestOptions options = readKinesisOptions();
+ List<String> testData = prepareTestData(1000);
- private KinesisTestOptions readKinesisOptions() {
- PipelineOptionsFactory.register(KinesisTestOptions.class);
- return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+ Future<?> future = startTestPipeline(testData, options);
+ KinesisUploader.uploadAll(testData, options);
+ future.get();
+ }
+
+ private List<String> prepareTestData(int count) {
+ List<String> data = newArrayList();
+ for (int i = 0; i < count; ++i) {
+ data.add(RandomStringUtils.randomAlphabetic(32));
}
+ return data;
+ }
+
+ private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
+ throws InterruptedException {
+
+ PCollection<String> result = p.
+ apply(KinesisIO.read()
+ .from(options.getAwsKinesisStream(), Instant.now())
+ .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(),
+ Regions.fromName(options.getAwsKinesisRegion()))
+ .withMaxReadTime(Duration.standardMinutes(3))
+ ).
+ apply(ParDo.of(new RecordDataToString()));
+ PAssert.that(result).containsInAnyOrder(testData);
+
+ Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
- private static class RecordDataToString extends DoFn<KinesisRecord, String> {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- checkNotNull(c.element(), "Null record given");
- c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+ @Override
+ public Void call() throws Exception {
+ PipelineResult result = p.run();
+ PipelineResult.State state = result.getState();
+ while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
+ Thread.sleep(1000);
+ state = result.getState();
}
+ assertThat(state).isEqualTo(PipelineResult.State.DONE);
+ return null;
+ }
+ });
+ Thread.sleep(PIPELINE_STARTUP_TIME);
+ return future;
+ }
+
+ private KinesisTestOptions readKinesisOptions() {
+ PipelineOptionsFactory.register(KinesisTestOptions.class);
+ return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+ }
+
+ private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ checkNotNull(c.element(), "Null record given");
+ c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 3111029..a26501a 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.NoSuchElementException;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -34,87 +35,88 @@ import org.mockito.runners.MockitoJUnitRunner;
*/
@RunWith(MockitoJUnitRunner.class)
public class KinesisReaderTest {
- @Mock
- private SimplifiedKinesisClient kinesis;
- @Mock
- private CheckpointGenerator generator;
- @Mock
- private ShardCheckpoint firstCheckpoint, secondCheckpoint;
- @Mock
- private ShardRecordsIterator firstIterator, secondIterator;
- @Mock
- private KinesisRecord a, b, c, d;
-
- private KinesisReader reader;
-
- @Before
- public void setUp() throws IOException, TransientKinesisException {
- when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
- asList(firstCheckpoint, secondCheckpoint)
- ));
- when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
- when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
- when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
- when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
-
- reader = new KinesisReader(kinesis, generator, null);
- }
-
- @Test
- public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
- assertThat(reader.start()).isFalse();
- }
-
- @Test(expected = NoSuchElementException.class)
- public void throwsNoSuchElementExceptionIfNoData() throws IOException {
- reader.start();
- reader.getCurrent();
- }
-
- @Test
- public void startReturnsTrueIfSomeDataAvailable() throws IOException,
- TransientKinesisException {
- when(firstIterator.next()).
- thenReturn(CustomOptional.of(a)).
- thenReturn(CustomOptional.<KinesisRecord>absent());
-
- assertThat(reader.start()).isTrue();
- }
-
- @Test
- public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
- throws IOException, TransientKinesisException {
- reader.start();
-
- when(firstIterator.next()).thenThrow(TransientKinesisException.class);
-
- assertThat(reader.advance()).isFalse();
- }
-
- @Test
- public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
- when(firstIterator.next()).
- thenReturn(CustomOptional.<KinesisRecord>absent()).
- thenReturn(CustomOptional.of(a)).
- thenReturn(CustomOptional.<KinesisRecord>absent()).
- thenReturn(CustomOptional.of(b)).
- thenReturn(CustomOptional.<KinesisRecord>absent());
-
- when(secondIterator.next()).
- thenReturn(CustomOptional.of(c)).
- thenReturn(CustomOptional.<KinesisRecord>absent()).
- thenReturn(CustomOptional.of(d)).
- thenReturn(CustomOptional.<KinesisRecord>absent());
-
- assertThat(reader.start()).isTrue();
- assertThat(reader.getCurrent()).isEqualTo(c);
- assertThat(reader.advance()).isTrue();
- assertThat(reader.getCurrent()).isEqualTo(a);
- assertThat(reader.advance()).isTrue();
- assertThat(reader.getCurrent()).isEqualTo(d);
- assertThat(reader.advance()).isTrue();
- assertThat(reader.getCurrent()).isEqualTo(b);
- assertThat(reader.advance()).isFalse();
- }
+
+ @Mock
+ private SimplifiedKinesisClient kinesis;
+ @Mock
+ private CheckpointGenerator generator;
+ @Mock
+ private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+ @Mock
+ private ShardRecordsIterator firstIterator, secondIterator;
+ @Mock
+ private KinesisRecord a, b, c, d;
+
+ private KinesisReader reader;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
+ asList(firstCheckpoint, secondCheckpoint)
+ ));
+ when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
+ when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
+ when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+ when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ reader = new KinesisReader(kinesis, generator, null);
+ }
+
+ @Test
+ public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
+ assertThat(reader.start()).isFalse();
+ }
+
+ @Test(expected = NoSuchElementException.class)
+ public void throwsNoSuchElementExceptionIfNoData() throws IOException {
+ reader.start();
+ reader.getCurrent();
+ }
+
+ @Test
+ public void startReturnsTrueIfSomeDataAvailable() throws IOException,
+ TransientKinesisException {
+ when(firstIterator.next()).
+ thenReturn(CustomOptional.of(a)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ assertThat(reader.start()).isTrue();
+ }
+
+ @Test
+ public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
+ throws IOException, TransientKinesisException {
+ reader.start();
+
+ when(firstIterator.next()).thenThrow(TransientKinesisException.class);
+
+ assertThat(reader.advance()).isFalse();
+ }
+
+ @Test
+ public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
+ when(firstIterator.next()).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(a)).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(b)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ when(secondIterator.next()).
+ thenReturn(CustomOptional.of(c)).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(d)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ assertThat(reader.start()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(c);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(a);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(d);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(b);
+ assertThat(reader.advance()).isFalse();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
index 8771c86..c9f01bb 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.kinesis;
import java.nio.ByteBuffer;
+
import org.apache.beam.sdk.testing.CoderProperties;
import org.joda.time.Instant;
import org.junit.Test;
@@ -26,20 +27,21 @@ import org.junit.Test;
* Tests {@link KinesisRecordCoder}.
*/
public class KinesisRecordCoderTest {
- @Test
- public void encodingAndDecodingWorks() throws Exception {
- KinesisRecord record = new KinesisRecord(
- ByteBuffer.wrap("data".getBytes()),
- "sequence",
- 128L,
- "partition",
- Instant.now(),
- Instant.now(),
- "stream",
- "shard"
- );
- CoderProperties.coderDecodeEncodeEqual(
- new KinesisRecordCoder(), record
- );
- }
+
+ @Test
+ public void encodingAndDecodingWorks() throws Exception {
+ KinesisRecord record = new KinesisRecord(
+ ByteBuffer.wrap("data".getBytes()),
+ "sequence",
+ 128L,
+ "partition",
+ Instant.now(),
+ Instant.now(),
+ "stream",
+ "shard"
+ );
+ CoderProperties.coderDecodeEncodeEqual(
+ new KinesisRecordCoder(), record
+ );
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
index 324de46..76bcb27 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
@@ -25,23 +25,28 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
* Options for Kinesis integration tests.
*/
public interface KinesisTestOptions extends TestPipelineOptions {
- @Description("AWS region where Kinesis stream resided")
- @Default.String("aws-kinesis-region")
- String getAwsKinesisRegion();
- void setAwsKinesisRegion(String value);
-
- @Description("Kinesis stream name")
- @Default.String("aws-kinesis-stream")
- String getAwsKinesisStream();
- void setAwsKinesisStream(String value);
-
- @Description("AWS secret key")
- @Default.String("aws-secret-key")
- String getAwsSecretKey();
- void setAwsSecretKey(String value);
-
- @Description("AWS access key")
- @Default.String("aws-access-key")
- String getAwsAccessKey();
- void setAwsAccessKey(String value);
+
+ @Description("AWS region where Kinesis stream resided")
+ @Default.String("aws-kinesis-region")
+ String getAwsKinesisRegion();
+
+ void setAwsKinesisRegion(String value);
+
+ @Description("Kinesis stream name")
+ @Default.String("aws-kinesis-stream")
+ String getAwsKinesisStream();
+
+ void setAwsKinesisStream(String value);
+
+ @Description("AWS secret key")
+ @Default.String("aws-secret-key")
+ String getAwsSecretKey();
+
+ void setAwsSecretKey(String value);
+
+ @Description("AWS access key")
+ @Default.String("aws-access-key")
+ String getAwsAccessKey();
+
+ void setAwsAccessKey(String value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
index 7518ff7..7a7cb02 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -29,6 +29,7 @@ import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
+
import java.nio.ByteBuffer;
import java.util.List;
@@ -37,47 +38,46 @@ import java.util.List;
*/
public class KinesisUploader {
- public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
-
- public static void uploadAll(List<String> data, KinesisTestOptions options) {
- AmazonKinesisClient client = new AmazonKinesisClient(
- new StaticCredentialsProvider(
- new BasicAWSCredentials(
- options.getAwsAccessKey(), options.getAwsSecretKey()))
- ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+ public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
- List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
+ public static void uploadAll(List<String> data, KinesisTestOptions options) {
+ AmazonKinesisClient client = new AmazonKinesisClient(
+ new StaticCredentialsProvider(
+ new BasicAWSCredentials(
+ options.getAwsAccessKey(), options.getAwsSecretKey()))
+ ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+ List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
- for (List<String> partition : partitions) {
- List<PutRecordsRequestEntry> allRecords = newArrayList();
- for (String row : partition) {
- allRecords.add(new PutRecordsRequestEntry().
- withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
- withPartitionKey(Integer.toString(row.hashCode()))
+ for (List<String> partition : partitions) {
+ List<PutRecordsRequestEntry> allRecords = newArrayList();
+ for (String row : partition) {
+ allRecords.add(new PutRecordsRequestEntry().
+ withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
+ withPartitionKey(Integer.toString(row.hashCode()))
- );
- }
+ );
+ }
- PutRecordsResult result;
- do {
- result = client.putRecords(
- new PutRecordsRequest().
- withStreamName(options.getAwsKinesisStream()).
- withRecords(allRecords));
- List<PutRecordsRequestEntry> failedRecords = newArrayList();
- int i = 0;
- for (PutRecordsResultEntry row : result.getRecords()) {
- if (row.getErrorCode() != null) {
- failedRecords.add(allRecords.get(i));
- }
- ++i;
- }
- allRecords = failedRecords;
- }
-
- while (result.getFailedRecordCount() > 0);
+ PutRecordsResult result;
+ do {
+ result = client.putRecords(
+ new PutRecordsRequest().
+ withStreamName(options.getAwsKinesisStream()).
+ withRecords(allRecords));
+ List<PutRecordsRequestEntry> failedRecords = newArrayList();
+ int i = 0;
+ for (PutRecordsResultEntry row : result.getRecords()) {
+ if (row.getErrorCode() != null) {
+ failedRecords.add(allRecords.get(i));
+ }
+ ++i;
}
+ allRecords = failedRecords;
+ }
+
+ while (result.getFailedRecordCount() > 0);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
index f979c01..cb32562 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -20,47 +20,49 @@ package org.apache.beam.sdk.io.kinesis;
import static org.mockito.BDDMockito.given;
import com.google.common.collect.Lists;
+
import java.util.Collections;
import java.util.List;
+
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-
/***
*/
@RunWith(MockitoJUnitRunner.class)
public class RecordFilterTest {
- @Mock
- private ShardCheckpoint checkpoint;
- @Mock
- private KinesisRecord record1, record2, record3, record4, record5;
- @Test
- public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
- given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
- given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
- given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
- given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
- given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
- List<KinesisRecord> records = Lists.newArrayList(record1, record2,
- record3, record4, record5);
- RecordFilter underTest = new RecordFilter();
+ @Mock
+ private ShardCheckpoint checkpoint;
+ @Mock
+ private KinesisRecord record1, record2, record3, record4, record5;
+
+ @Test
+ public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
+ given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
+ given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
+ given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
+ given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
+ given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
+ List<KinesisRecord> records = Lists.newArrayList(record1, record2,
+ record3, record4, record5);
+ RecordFilter underTest = new RecordFilter();
- List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+ List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
- Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
- }
+ Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
+ }
- @Test
- public void shouldNotFailOnEmptyList() {
- List<KinesisRecord> records = Collections.emptyList();
- RecordFilter underTest = new RecordFilter();
+ @Test
+ public void shouldNotFailOnEmptyList() {
+ List<KinesisRecord> records = Collections.emptyList();
+ RecordFilter underTest = new RecordFilter();
- List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+ List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
- Assertions.assertThat(retainedRecords).isEmpty();
- }
+ Assertions.assertThat(retainedRecords).isEmpty();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
index f032eea..e4abce4 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -22,36 +22,38 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import java.util.List;
+
import org.junit.Test;
/**
* Tests {@link RoundRobin}.
*/
public class RoundRobinTest {
- @Test(expected = IllegalArgumentException.class)
- public void doesNotAllowCreationWithEmptyCollection() {
- new RoundRobin<>(Collections.emptyList());
- }
- @Test
- public void goesThroughElementsInCycle() {
- List<String> input = newArrayList("a", "b", "c");
+ @Test(expected = IllegalArgumentException.class)
+ public void doesNotAllowCreationWithEmptyCollection() {
+ new RoundRobin<>(Collections.emptyList());
+ }
- RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+ @Test
+ public void goesThroughElementsInCycle() {
+ List<String> input = newArrayList("a", "b", "c");
- input.addAll(input); // duplicate the input
- for (String element : input) {
- assertThat(roundRobin.getCurrent()).isEqualTo(element);
- assertThat(roundRobin.getCurrent()).isEqualTo(element);
- roundRobin.moveForward();
- }
+ RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+
+ input.addAll(input); // duplicate the input
+ for (String element : input) {
+ assertThat(roundRobin.getCurrent()).isEqualTo(element);
+ assertThat(roundRobin.getCurrent()).isEqualTo(element);
+ roundRobin.moveForward();
}
+ }
- @Test
- public void usualIteratorGoesThroughElementsOnce() {
- List<String> input = newArrayList("a", "b", "c");
+ @Test
+ public void usualIteratorGoesThroughElementsOnce() {
+ List<String> input = newArrayList("a", "b", "c");
- RoundRobin<String> roundRobin = new RoundRobin<>(input);
- assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
- }
+ RoundRobin<String> roundRobin = new RoundRobin<>(input);
+ assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
index 39ab36f..d4784c4 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -32,7 +32,9 @@ import static org.mockito.Mockito.when;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
import java.io.IOException;
+
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Before;
@@ -46,104 +48,105 @@ import org.mockito.runners.MockitoJUnitRunner;
*/
@RunWith(MockitoJUnitRunner.class)
public class ShardCheckpointTest {
- private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
- private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
- private static final String STREAM_NAME = "STREAM";
- private static final String SHARD_ID = "SHARD_ID";
- @Mock
- private SimplifiedKinesisClient client;
-
- @Before
- public void setUp() throws IOException, TransientKinesisException {
- when(client.getShardIterator(
- eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
- anyString(), isNull(Instant.class))).
- thenReturn(AT_SEQUENCE_SHARD_IT);
- when(client.getShardIterator(
- eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
- anyString(), isNull(Instant.class))).
- thenReturn(AFTER_SEQUENCE_SHARD_IT);
- }
-
- @Test
- public void testProvidingShardIterator() throws IOException, TransientKinesisException {
- assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
- .isEqualTo(AT_SEQUENCE_SHARD_IT);
- assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
- .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
- assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
- (AT_SEQUENCE_SHARD_IT);
- assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
- .isEqualTo(AT_SEQUENCE_SHARD_IT);
- }
-
- @Test
- public void testComparisonWithExtendedSequenceNumber() {
- assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isTrue();
-
- assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isTrue();
-
- assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isTrue();
-
- assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isTrue();
-
- assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isFalse();
-
- assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isFalse();
-
- assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("99", 1L))
- )).isFalse();
- }
-
- @Test
- public void testComparisonWithTimestamp() {
- DateTime referenceTimestamp = DateTime.now();
-
- assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
- .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
- ).isFalse();
-
- assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
- .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
- ).isTrue();
-
- assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
- .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
- ).isTrue();
- }
-
- private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
- KinesisRecord record = mock(KinesisRecord.class);
- given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
- return record;
- }
-
- private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
- Long subSequenceNumber) {
- return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
- subSequenceNumber);
- }
-
- private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
- KinesisRecord record = mock(KinesisRecord.class);
- given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
- return record;
- }
-
- private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
- return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
- }
+
+ private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
+ private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
+ private static final String STREAM_NAME = "STREAM";
+ private static final String SHARD_ID = "SHARD_ID";
+ @Mock
+ private SimplifiedKinesisClient client;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(client.getShardIterator(
+ eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
+ anyString(), isNull(Instant.class))).
+ thenReturn(AT_SEQUENCE_SHARD_IT);
+ when(client.getShardIterator(
+ eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
+ anyString(), isNull(Instant.class))).
+ thenReturn(AFTER_SEQUENCE_SHARD_IT);
+ }
+
+ @Test
+ public void testProvidingShardIterator() throws IOException, TransientKinesisException {
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+ .isEqualTo(AT_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+ .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
+ (AT_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
+ .isEqualTo(AT_SEQUENCE_SHARD_IT);
+ }
+
+ @Test
+ public void testComparisonWithExtendedSequenceNumber() {
+ assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isFalse();
+
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isFalse();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("99", 1L))
+ )).isFalse();
+ }
+
+ @Test
+ public void testComparisonWithTimestamp() {
+ DateTime referenceTimestamp = DateTime.now();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
+ ).isFalse();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
+ ).isTrue();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
+ ).isTrue();
+ }
+
+ private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
+ KinesisRecord record = mock(KinesisRecord.class);
+ given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
+ return record;
+ }
+
+ private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
+ Long subSequenceNumber) {
+ return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
+ subSequenceNumber);
+ }
+
+ private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
+ KinesisRecord record = mock(KinesisRecord.class);
+ given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
+ return record;
+ }
+
+ private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
+ return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
+ }
}
[4/4] beam git commit: This closes #3389
Posted by jb...@apache.org.
This closes #3389
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/138641f1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/138641f1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/138641f1
Branch: refs/heads/master
Commit: 138641f140b037841f9a1d2cc8e523ad29dc9518
Parents: af08f53 21fd302
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Tue Jul 11 15:47:53 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Jul 11 15:47:53 2017 +0200
----------------------------------------------------------------------
.../sdk/io/kinesis/CheckpointGenerator.java | 6 +-
.../beam/sdk/io/kinesis/CustomOptional.java | 111 ++--
.../io/kinesis/DynamicCheckpointGenerator.java | 52 +-
.../sdk/io/kinesis/GetKinesisRecordsResult.java | 49 +-
.../sdk/io/kinesis/KinesisClientProvider.java | 4 +-
.../apache/beam/sdk/io/kinesis/KinesisIO.java | 279 +++++-----
.../beam/sdk/io/kinesis/KinesisReader.java | 206 +++----
.../sdk/io/kinesis/KinesisReaderCheckpoint.java | 97 ++--
.../beam/sdk/io/kinesis/KinesisRecord.java | 177 +++---
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 68 +--
.../beam/sdk/io/kinesis/KinesisSource.java | 147 ++---
.../beam/sdk/io/kinesis/RecordFilter.java | 18 +-
.../apache/beam/sdk/io/kinesis/RoundRobin.java | 37 +-
.../beam/sdk/io/kinesis/ShardCheckpoint.java | 241 ++++-----
.../sdk/io/kinesis/ShardRecordsIterator.java | 106 ++--
.../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 ++++----
.../beam/sdk/io/kinesis/StartingPoint.java | 84 +--
.../io/kinesis/StaticCheckpointGenerator.java | 27 +-
.../io/kinesis/TransientKinesisException.java | 7 +-
.../beam/sdk/io/kinesis/AmazonKinesisMock.java | 539 ++++++++++---------
.../beam/sdk/io/kinesis/CustomOptionalTest.java | 27 +-
.../kinesis/DynamicCheckpointGeneratorTest.java | 33 +-
.../sdk/io/kinesis/KinesisMockReadTest.java | 97 ++--
.../io/kinesis/KinesisReaderCheckpointTest.java | 52 +-
.../beam/sdk/io/kinesis/KinesisReaderIT.java | 127 ++---
.../beam/sdk/io/kinesis/KinesisReaderTest.java | 166 +++---
.../sdk/io/kinesis/KinesisRecordCoderTest.java | 34 +-
.../beam/sdk/io/kinesis/KinesisTestOptions.java | 43 +-
.../beam/sdk/io/kinesis/KinesisUploader.java | 70 +--
.../beam/sdk/io/kinesis/RecordFilterTest.java | 52 +-
.../beam/sdk/io/kinesis/RoundRobinTest.java | 42 +-
.../sdk/io/kinesis/ShardCheckpointTest.java | 203 +++----
.../io/kinesis/ShardRecordsIteratorTest.java | 216 ++++----
.../io/kinesis/SimplifiedKinesisClientTest.java | 351 ++++++------
34 files changed, 2031 insertions(+), 1952 deletions(-)
----------------------------------------------------------------------