You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/11 13:48:05 UTC

[1/4] beam git commit: Reformatting Kinesis IO to comply with official code style

Repository: beam
Updated Branches:
  refs/heads/master af08f5352 -> 138641f14


http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index 49e806d..4b2190f 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -25,8 +25,10 @@ import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.when;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+
 import java.io.IOException;
 import java.util.Collections;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -40,112 +42,114 @@ import org.mockito.stubbing.Answer;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class ShardRecordsIteratorTest {
-    private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
-    private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
-    private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
-    private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
-    private static final String STREAM_NAME = "STREAM_NAME";
-    private static final String SHARD_ID = "SHARD_ID";
-
-    @Mock
-    private SimplifiedKinesisClient kinesisClient;
-    @Mock
-    private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
-    @Mock
-    private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
-    @Mock
-    private KinesisRecord a, b, c, d;
-    @Mock
-    private RecordFilter recordFilter;
-
-    private ShardRecordsIterator iterator;
-
-    @Before
-    public void setUp() throws IOException, TransientKinesisException {
-        when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
-        when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-        when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
-        when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
-        when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
-        when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-        when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
-        when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
-        when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-        when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
-        when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
-        when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-        when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
-        when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-        when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
-        when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
-                .thenReturn(firstResult);
-        when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
-                .thenReturn(secondResult);
-        when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
-                .thenReturn(thirdResult);
-
-        when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
-        when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
-        when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
-
-        when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-        when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-        when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-
-        when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
-                .class))).thenAnswer(new IdentityAnswer());
-
-        iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
-    }
-
-    @Test
-    public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
-        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-    }
-
-    @Test
-    public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
-        when(firstResult.getRecords()).thenReturn(asList(a, b, c));
-        when(secondResult.getRecords()).thenReturn(singletonList(d));
-
-        assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
-        assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
-        assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
-        assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
-        assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
-        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-        assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
-    }
-
-    @Test
-    public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
-        when(firstResult.getRecords()).thenReturn(singletonList(a));
-        when(secondResult.getRecords()).thenReturn(singletonList(b));
-
-        when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
-                .thenThrow(ExpiredIteratorException.class);
-        when(aCheckpoint.getShardIterator(kinesisClient))
-                .thenReturn(SECOND_REFRESHED_ITERATOR);
-        when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
-                .thenReturn(secondResult);
-
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
-        assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
-        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-    }
 
-    private static class IdentityAnswer implements Answer<Object> {
-        @Override
-        public Object answer(InvocationOnMock invocation) throws Throwable {
-            return invocation.getArguments()[0];
-        }
+  private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
+  private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
+  private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
+  private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
+  private static final String STREAM_NAME = "STREAM_NAME";
+  private static final String SHARD_ID = "SHARD_ID";
+
+  @Mock
+  private SimplifiedKinesisClient kinesisClient;
+  @Mock
+  private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
+  @Mock
+  private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
+  @Mock
+  private KinesisRecord a, b, c, d;
+  @Mock
+  private RecordFilter recordFilter;
+
+  private ShardRecordsIterator iterator;
+
+  @Before
+  public void setUp() throws IOException, TransientKinesisException {
+    when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
+    when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+    when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+    when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
+    when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
+    when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+    when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
+    when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
+    when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+    when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
+    when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
+    when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+    when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
+    when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+    when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+    when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
+        .thenReturn(firstResult);
+    when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+        .thenReturn(secondResult);
+    when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
+        .thenReturn(thirdResult);
+
+    when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
+    when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+    when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+
+    when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+    when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+    when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+    when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
+        .class))).thenAnswer(new IdentityAnswer());
+
+    iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
+  }
+
+  @Test
+  public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
+    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+  }
+
+  @Test
+  public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+    when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+    when(secondResult.getRecords()).thenReturn(singletonList(d));
+
+    assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+    assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+    assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+    assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
+    assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+  }
+
+  @Test
+  public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
+    when(firstResult.getRecords()).thenReturn(singletonList(a));
+    when(secondResult.getRecords()).thenReturn(singletonList(b));
+
+    when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+        .thenThrow(ExpiredIteratorException.class);
+    when(aCheckpoint.getShardIterator(kinesisClient))
+        .thenReturn(SECOND_REFRESHED_ITERATOR);
+    when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
+        .thenReturn(secondResult);
+
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+    assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+  }
+
+  private static class IdentityAnswer implements Answer<Object> {
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      return invocation.getArguments()[0];
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
index 96434fd..2f8757c 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -34,7 +34,9 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
+
 import java.util.List;
+
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -46,179 +48,180 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class SimplifiedKinesisClientTest {
-    private static final String STREAM = "stream";
-    private static final String SHARD_1 = "shard-01";
-    private static final String SHARD_2 = "shard-02";
-    private static final String SHARD_3 = "shard-03";
-    private static final String SHARD_ITERATOR = "iterator";
-    private static final String SEQUENCE_NUMBER = "abc123";
-
-    @Mock
-    private AmazonKinesis kinesis;
-    @InjectMocks
-    private SimplifiedKinesisClient underTest;
-
-    @Test
-    public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
-        given(kinesis.getShardIterator(new GetShardIteratorRequest()
-                .withStreamName(STREAM)
-                .withShardId(SHARD_1)
-                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
-                .withStartingSequenceNumber(SEQUENCE_NUMBER)
-        )).willReturn(new GetShardIteratorResult()
-                .withShardIterator(SHARD_ITERATOR));
-
-        String stream = underTest.getShardIterator(STREAM, SHARD_1,
-                ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
-
-        assertThat(stream).isEqualTo(SHARD_ITERATOR);
-    }
-
-    @Test
-    public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
-        Instant timestamp = Instant.now();
-        given(kinesis.getShardIterator(new GetShardIteratorRequest()
-                .withStreamName(STREAM)
-                .withShardId(SHARD_1)
-                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
-                .withTimestamp(timestamp.toDate())
-        )).willReturn(new GetShardIteratorResult()
-                .withShardIterator(SHARD_ITERATOR));
-
-        String stream = underTest.getShardIterator(STREAM, SHARD_1,
-                ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
-
-        assertThat(stream).isEqualTo(SHARD_ITERATOR);
-    }
-
-    @Test
-    public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
-        shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
-                ExpiredIteratorException.class);
-    }
-
-    @Test
-    public void shouldHandleLimitExceededExceptionForGetShardIterator() {
-        shouldHandleGetShardIteratorError(new LimitExceededException(""),
-                TransientKinesisException.class);
-    }
-
-    @Test
-    public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
-        shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
-                TransientKinesisException.class);
-    }
-
-    @Test
-    public void shouldHandleServiceErrorForGetShardIterator() {
-        shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
-                TransientKinesisException.class);
-    }
-
-    @Test
-    public void shouldHandleClientErrorForGetShardIterator() {
-        shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
-                RuntimeException.class);
-    }
-
-    @Test
-    public void shouldHandleUnexpectedExceptionForGetShardIterator() {
-        shouldHandleGetShardIteratorError(new NullPointerException(),
-                RuntimeException.class);
-    }
-
-    private void shouldHandleGetShardIteratorError(
-            Exception thrownException,
-            Class<? extends Exception> expectedExceptionClass) {
-        GetShardIteratorRequest request = new GetShardIteratorRequest()
-                .withStreamName(STREAM)
-                .withShardId(SHARD_1)
-                .withShardIteratorType(ShardIteratorType.LATEST);
-
-        given(kinesis.getShardIterator(request)).willThrow(thrownException);
-
-        try {
-            underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
-            failBecauseExceptionWasNotThrown(expectedExceptionClass);
-        } catch (Exception e) {
-            assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
-        } finally {
-            reset(kinesis);
-        }
-    }
-
-    @Test
-    public void shouldListAllShards() throws Exception {
-        Shard shard1 = new Shard().withShardId(SHARD_1);
-        Shard shard2 = new Shard().withShardId(SHARD_2);
-        Shard shard3 = new Shard().withShardId(SHARD_3);
-        given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
-                .withStreamDescription(new StreamDescription()
-                        .withShards(shard1, shard2)
-                        .withHasMoreShards(true)));
-        given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
-                .withStreamDescription(new StreamDescription()
-                        .withShards(shard3)
-                        .withHasMoreShards(false)));
-
-        List<Shard> shards = underTest.listShards(STREAM);
-
-        assertThat(shards).containsOnly(shard1, shard2, shard3);
-    }
-
-    @Test
-    public void shouldHandleExpiredIterationExceptionForShardListing() {
-        shouldHandleShardListingError(new ExpiredIteratorException(""),
-                ExpiredIteratorException.class);
-    }
-
-    @Test
-    public void shouldHandleLimitExceededExceptionForShardListing() {
-        shouldHandleShardListingError(new LimitExceededException(""),
-                TransientKinesisException.class);
-    }
-
-    @Test
-    public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
-        shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
-                TransientKinesisException.class);
-    }
 
-    @Test
-    public void shouldHandleServiceErrorForShardListing() {
-        shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
-                TransientKinesisException.class);
-    }
-
-    @Test
-    public void shouldHandleClientErrorForShardListing() {
-        shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
-                RuntimeException.class);
-    }
-
-    @Test
-    public void shouldHandleUnexpectedExceptionForShardListing() {
-        shouldHandleShardListingError(new NullPointerException(),
-                RuntimeException.class);
-    }
-
-    private void shouldHandleShardListingError(
-            Exception thrownException,
-            Class<? extends Exception> expectedExceptionClass) {
-        given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
-        try {
-            underTest.listShards(STREAM);
-            failBecauseExceptionWasNotThrown(expectedExceptionClass);
-        } catch (Exception e) {
-            assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
-        } finally {
-            reset(kinesis);
-        }
-    }
-
-    private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
-        AmazonServiceException exception = new AmazonServiceException("");
-        exception.setErrorType(errorType);
-        return exception;
-    }
+  private static final String STREAM = "stream";
+  private static final String SHARD_1 = "shard-01";
+  private static final String SHARD_2 = "shard-02";
+  private static final String SHARD_3 = "shard-03";
+  private static final String SHARD_ITERATOR = "iterator";
+  private static final String SEQUENCE_NUMBER = "abc123";
+
+  @Mock
+  private AmazonKinesis kinesis;
+  @InjectMocks
+  private SimplifiedKinesisClient underTest;
+
+  @Test
+  public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
+    given(kinesis.getShardIterator(new GetShardIteratorRequest()
+        .withStreamName(STREAM)
+        .withShardId(SHARD_1)
+        .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+        .withStartingSequenceNumber(SEQUENCE_NUMBER)
+    )).willReturn(new GetShardIteratorResult()
+        .withShardIterator(SHARD_ITERATOR));
+
+    String stream = underTest.getShardIterator(STREAM, SHARD_1,
+        ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
+
+    assertThat(stream).isEqualTo(SHARD_ITERATOR);
+  }
+
+  @Test
+  public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
+    Instant timestamp = Instant.now();
+    given(kinesis.getShardIterator(new GetShardIteratorRequest()
+        .withStreamName(STREAM)
+        .withShardId(SHARD_1)
+        .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+        .withTimestamp(timestamp.toDate())
+    )).willReturn(new GetShardIteratorResult()
+        .withShardIterator(SHARD_ITERATOR));
+
+    String stream = underTest.getShardIterator(STREAM, SHARD_1,
+        ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
+
+    assertThat(stream).isEqualTo(SHARD_ITERATOR);
+  }
+
+  @Test
+  public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
+    shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
+        ExpiredIteratorException.class);
+  }
+
+  @Test
+  public void shouldHandleLimitExceededExceptionForGetShardIterator() {
+    shouldHandleGetShardIteratorError(new LimitExceededException(""),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
+    shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleServiceErrorForGetShardIterator() {
+    shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleClientErrorForGetShardIterator() {
+    shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
+        RuntimeException.class);
+  }
+
+  @Test
+  public void shouldHandleUnexpectedExceptionForGetShardIterator() {
+    shouldHandleGetShardIteratorError(new NullPointerException(),
+        RuntimeException.class);
+  }
+
+  private void shouldHandleGetShardIteratorError(
+      Exception thrownException,
+      Class<? extends Exception> expectedExceptionClass) {
+    GetShardIteratorRequest request = new GetShardIteratorRequest()
+        .withStreamName(STREAM)
+        .withShardId(SHARD_1)
+        .withShardIteratorType(ShardIteratorType.LATEST);
+
+    given(kinesis.getShardIterator(request)).willThrow(thrownException);
+
+    try {
+      underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
+      failBecauseExceptionWasNotThrown(expectedExceptionClass);
+    } catch (Exception e) {
+      assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+    } finally {
+      reset(kinesis);
+    }
+  }
+
+  @Test
+  public void shouldListAllShards() throws Exception {
+    Shard shard1 = new Shard().withShardId(SHARD_1);
+    Shard shard2 = new Shard().withShardId(SHARD_2);
+    Shard shard3 = new Shard().withShardId(SHARD_3);
+    given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
+        .withStreamDescription(new StreamDescription()
+            .withShards(shard1, shard2)
+            .withHasMoreShards(true)));
+    given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
+        .withStreamDescription(new StreamDescription()
+            .withShards(shard3)
+            .withHasMoreShards(false)));
+
+    List<Shard> shards = underTest.listShards(STREAM);
+
+    assertThat(shards).containsOnly(shard1, shard2, shard3);
+  }
+
+  @Test
+  public void shouldHandleExpiredIterationExceptionForShardListing() {
+    shouldHandleShardListingError(new ExpiredIteratorException(""),
+        ExpiredIteratorException.class);
+  }
+
+  @Test
+  public void shouldHandleLimitExceededExceptionForShardListing() {
+    shouldHandleShardListingError(new LimitExceededException(""),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
+    shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleServiceErrorForShardListing() {
+    shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleClientErrorForShardListing() {
+    shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
+        RuntimeException.class);
+  }
+
+  @Test
+  public void shouldHandleUnexpectedExceptionForShardListing() {
+    shouldHandleShardListingError(new NullPointerException(),
+        RuntimeException.class);
+  }
+
+  private void shouldHandleShardListingError(
+      Exception thrownException,
+      Class<? extends Exception> expectedExceptionClass) {
+    given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
+    try {
+      underTest.listShards(STREAM);
+      failBecauseExceptionWasNotThrown(expectedExceptionClass);
+    } catch (Exception e) {
+      assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+    } finally {
+      reset(kinesis);
+    }
+  }
+
+  private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
+    AmazonServiceException exception = new AmazonServiceException("");
+    exception.setErrorType(errorType);
+    return exception;
+  }
 }


[3/4] beam git commit: Reformatting Kinesis IO to comply with official code style

Posted by jb...@apache.org.
Reformatting Kinesis IO to comply with official code style


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21fd3028
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21fd3028
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21fd3028

Branch: refs/heads/master
Commit: 21fd30283eb4f6b829b06830a3ef04df0a377b06
Parents: af08f53
Author: Pawel Kaczmarczyk <p....@ocado.com>
Authored: Mon Jun 19 11:10:25 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Jul 11 15:24:19 2017 +0200

----------------------------------------------------------------------
 .../sdk/io/kinesis/CheckpointGenerator.java     |   6 +-
 .../beam/sdk/io/kinesis/CustomOptional.java     | 111 ++--
 .../io/kinesis/DynamicCheckpointGenerator.java  |  52 +-
 .../sdk/io/kinesis/GetKinesisRecordsResult.java |  49 +-
 .../sdk/io/kinesis/KinesisClientProvider.java   |   4 +-
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   | 279 +++++-----
 .../beam/sdk/io/kinesis/KinesisReader.java      | 206 +++----
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |  97 ++--
 .../beam/sdk/io/kinesis/KinesisRecord.java      | 177 +++---
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  68 +--
 .../beam/sdk/io/kinesis/KinesisSource.java      | 147 ++---
 .../beam/sdk/io/kinesis/RecordFilter.java       |  18 +-
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |  37 +-
 .../beam/sdk/io/kinesis/ShardCheckpoint.java    | 241 ++++-----
 .../sdk/io/kinesis/ShardRecordsIterator.java    | 106 ++--
 .../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 ++++----
 .../beam/sdk/io/kinesis/StartingPoint.java      |  84 +--
 .../io/kinesis/StaticCheckpointGenerator.java   |  27 +-
 .../io/kinesis/TransientKinesisException.java   |   7 +-
 .../beam/sdk/io/kinesis/AmazonKinesisMock.java  | 539 ++++++++++---------
 .../beam/sdk/io/kinesis/CustomOptionalTest.java |  27 +-
 .../kinesis/DynamicCheckpointGeneratorTest.java |  33 +-
 .../sdk/io/kinesis/KinesisMockReadTest.java     |  97 ++--
 .../io/kinesis/KinesisReaderCheckpointTest.java |  52 +-
 .../beam/sdk/io/kinesis/KinesisReaderIT.java    | 127 ++---
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  | 166 +++---
 .../sdk/io/kinesis/KinesisRecordCoderTest.java  |  34 +-
 .../beam/sdk/io/kinesis/KinesisTestOptions.java |  43 +-
 .../beam/sdk/io/kinesis/KinesisUploader.java    |  70 +--
 .../beam/sdk/io/kinesis/RecordFilterTest.java   |  52 +-
 .../beam/sdk/io/kinesis/RoundRobinTest.java     |  42 +-
 .../sdk/io/kinesis/ShardCheckpointTest.java     | 203 +++----
 .../io/kinesis/ShardRecordsIteratorTest.java    | 216 ++++----
 .../io/kinesis/SimplifiedKinesisClientTest.java | 351 ++++++------
 34 files changed, 2031 insertions(+), 1952 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
index 919d85a..2629c57 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import java.io.Serializable;
 
 /**
@@ -25,6 +24,7 @@ import java.io.Serializable;
  * How exactly the checkpoint is generated is up to implementing class.
  */
 interface CheckpointGenerator extends Serializable {
-    KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
-            throws TransientKinesisException;
+
+  KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
+      throws TransientKinesisException;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
index 4bed0e3..5a28214 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
@@ -24,76 +24,79 @@ import java.util.Objects;
  * Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element.
  */
 abstract class CustomOptional<T> {
-    @SuppressWarnings("unchecked")
-    public static <T> CustomOptional<T> absent() {
-        return (Absent<T>) Absent.INSTANCE;
-    }
 
-    public static <T> CustomOptional<T> of(T v) {
-        return new Present<>(v);
-    }
+  @SuppressWarnings("unchecked")
+  public static <T> CustomOptional<T> absent() {
+    return (Absent<T>) Absent.INSTANCE;
+  }
 
-    public abstract boolean isPresent();
+  public static <T> CustomOptional<T> of(T v) {
+    return new Present<>(v);
+  }
 
-    public abstract T get();
+  public abstract boolean isPresent();
 
-    private static class Present<T> extends CustomOptional<T> {
-        private final T value;
+  public abstract T get();
 
-        private Present(T value) {
-            this.value = value;
-        }
+  private static class Present<T> extends CustomOptional<T> {
 
-        @Override
-        public boolean isPresent() {
-            return true;
-        }
+    private final T value;
 
-        @Override
-        public T get() {
-            return value;
-        }
+    private Present(T value) {
+      this.value = value;
+    }
 
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof Present)) {
-                return false;
-            }
+    @Override
+    public boolean isPresent() {
+      return true;
+    }
 
-            Present<?> present = (Present<?>) o;
-            return Objects.equals(value, present.value);
-        }
+    @Override
+    public T get() {
+      return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof Present)) {
+        return false;
+      }
 
-        @Override
-        public int hashCode() {
-            return Objects.hash(value);
-        }
+      Present<?> present = (Present<?>) o;
+      return Objects.equals(value, present.value);
     }
 
-    private static class Absent<T> extends CustomOptional<T> {
-        private static final Absent<Object> INSTANCE = new Absent<>();
+    @Override
+    public int hashCode() {
+      return Objects.hash(value);
+    }
+  }
 
-        private Absent() {
-        }
+  private static class Absent<T> extends CustomOptional<T> {
 
-        @Override
-        public boolean isPresent() {
-            return false;
-        }
+    private static final Absent<Object> INSTANCE = new Absent<>();
 
-        @Override
-        public T get() {
-            throw new NoSuchElementException();
-        }
+    private Absent() {
+    }
+
+    @Override
+    public boolean isPresent() {
+      return false;
+    }
 
-        @Override
-        public boolean equals(Object o) {
-            return o instanceof Absent;
-        }
+    @Override
+    public T get() {
+      throw new NoSuchElementException();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return o instanceof Absent;
+    }
 
-        @Override
-        public int hashCode() {
-            return 0;
-        }
+    @Override
+    public int hashCode() {
+      return 0;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
index 2ec293c..9933019 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -28,29 +28,31 @@ import com.google.common.base.Function;
  * List of shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}.
  */
 class DynamicCheckpointGenerator implements CheckpointGenerator {
-    private final String streamName;
-    private final StartingPoint startingPoint;
-
-    public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
-        this.streamName = checkNotNull(streamName, "streamName");
-        this.startingPoint = checkNotNull(startingPoint, "startingPoint");
-    }
-
-    @Override
-    public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
-            throws TransientKinesisException {
-        return new KinesisReaderCheckpoint(
-                transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
-                    @Override
-                    public ShardCheckpoint apply(Shard shard) {
-                        return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
-                    }
-                })
-        );
-    }
-
-    @Override
-    public String toString() {
-        return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
-    }
+
+  private final String streamName;
+  private final StartingPoint startingPoint;
+
+  public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
+    this.streamName = checkNotNull(streamName, "streamName");
+    this.startingPoint = checkNotNull(startingPoint, "startingPoint");
+  }
+
+  @Override
+  public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
+      throws TransientKinesisException {
+    return new KinesisReaderCheckpoint(
+        transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
+
+          @Override
+          public ShardCheckpoint apply(Shard shard) {
+            return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
+          }
+        })
+    );
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
index 5a34d7d..f605f55 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -21,6 +21,7 @@ import static com.google.common.collect.Lists.transform;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
 import com.google.common.base.Function;
+
 import java.util.List;
 import javax.annotation.Nullable;
 
@@ -28,27 +29,29 @@ import javax.annotation.Nullable;
  * Represents the output of 'get' operation on Kinesis stream.
  */
 class GetKinesisRecordsResult {
-    private final List<KinesisRecord> records;
-    private final String nextShardIterator;
-
-    public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
-                                   final String streamName, final String shardId) {
-        this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
-            @Nullable
-            @Override
-            public KinesisRecord apply(@Nullable UserRecord input) {
-                assert input != null;  // to make FindBugs happy
-                return new KinesisRecord(input, streamName, shardId);
-            }
-        });
-        this.nextShardIterator = nextShardIterator;
-    }
-
-    public List<KinesisRecord> getRecords() {
-        return records;
-    }
-
-    public String getNextShardIterator() {
-        return nextShardIterator;
-    }
+
+  private final List<KinesisRecord> records;
+  private final String nextShardIterator;
+
+  public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
+      final String streamName, final String shardId) {
+    this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
+
+      @Nullable
+      @Override
+      public KinesisRecord apply(@Nullable UserRecord input) {
+        assert input != null;  // to make FindBugs happy
+        return new KinesisRecord(input, streamName, shardId);
+      }
+    });
+    this.nextShardIterator = nextShardIterator;
+  }
+
+  public List<KinesisRecord> getRecords() {
+    return records;
+  }
+
+  public String getNextShardIterator() {
+    return nextShardIterator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
index c7fd7f6..b5b721e 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import com.amazonaws.services.kinesis.AmazonKinesis;
+
 import java.io.Serializable;
 
 /**
@@ -27,5 +28,6 @@ import java.io.Serializable;
  * {@link Serializable} to ensure it can be sent to worker machines.
  */
 interface KinesisClientProvider extends Serializable {
-    AmazonKinesis get();
+
+  AmazonKinesis get();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index b85eb63..bc8ada1 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -29,7 +28,9 @@ import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.google.auto.value.AutoValue;
+
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -102,142 +103,148 @@ import org.joda.time.Instant;
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public final class KinesisIO {
-    /** Returns a new {@link Read} transform for reading from Kinesis. */
-    public static Read read() {
-        return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
+
+  /** Returns a new {@link Read} transform for reading from Kinesis. */
+  public static Read read() {
+    return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
+  }
+
+  /** Implementation of {@link #read}. */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
+
+    @Nullable
+    abstract String getStreamName();
+
+    @Nullable
+    abstract StartingPoint getInitialPosition();
+
+    @Nullable
+    abstract KinesisClientProvider getClientProvider();
+
+    abstract int getMaxNumRecords();
+
+    @Nullable
+    abstract Duration getMaxReadTime();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setStreamName(String streamName);
+
+      abstract Builder setInitialPosition(StartingPoint startingPoint);
+
+      abstract Builder setClientProvider(KinesisClientProvider clientProvider);
+
+      abstract Builder setMaxNumRecords(int maxNumRecords);
+
+      abstract Builder setMaxReadTime(Duration maxReadTime);
+
+      abstract Read build();
     }
 
-    /** Implementation of {@link #read}. */
-    @AutoValue
-    public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
-        @Nullable
-        abstract String getStreamName();
-
-        @Nullable
-        abstract StartingPoint getInitialPosition();
-
-        @Nullable
-        abstract KinesisClientProvider getClientProvider();
-
-        abstract int getMaxNumRecords();
-
-        @Nullable
-        abstract Duration getMaxReadTime();
-
-        abstract Builder toBuilder();
-
-        @AutoValue.Builder
-        abstract static class Builder {
-            abstract Builder setStreamName(String streamName);
-            abstract Builder setInitialPosition(StartingPoint startingPoint);
-            abstract Builder setClientProvider(KinesisClientProvider clientProvider);
-            abstract Builder setMaxNumRecords(int maxNumRecords);
-            abstract Builder setMaxReadTime(Duration maxReadTime);
-
-            abstract Read build();
-        }
-
-        /**
-         * Specify reading from streamName at some initial position.
-         */
-        public Read from(String streamName, InitialPositionInStream initialPosition) {
-            return toBuilder()
-                .setStreamName(streamName)
-                .setInitialPosition(
-                    new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
-                .build();
-        }
-
-        /**
-         * Specify reading from streamName beginning at given {@link Instant}.
-         * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
-         */
-        public Read from(String streamName, Instant initialTimestamp) {
-            return toBuilder()
-                .setStreamName(streamName)
-                .setInitialPosition(
-                    new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
-                .build();
-        }
-
-        /**
-         * Allows to specify custom {@link KinesisClientProvider}.
-         * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
-         * used for communication with Kinesis.
-         * You should use this method if {@link Read#withClientProvider(String, String, Regions)}
-         * does not suit your needs.
-         */
-        public Read withClientProvider(KinesisClientProvider kinesisClientProvider) {
-            return toBuilder().setClientProvider(kinesisClientProvider).build();
-        }
-
-        /**
-         * Specify credential details and region to be used to read from Kinesis.
-         * If you need more sophisticated credential protocol, then you should look at
-         * {@link Read#withClientProvider(KinesisClientProvider)}.
-         */
-        public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) {
-            return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
-        }
-
-        /** Specifies to read at most a given number of records. */
-        public Read withMaxNumRecords(int maxNumRecords) {
-            checkArgument(
-                maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
-            return toBuilder().setMaxNumRecords(maxNumRecords).build();
-        }
-
-        /** Specifies to read at most a given number of records. */
-        public Read withMaxReadTime(Duration maxReadTime) {
-            checkNotNull(maxReadTime, "maxReadTime");
-            return toBuilder().setMaxReadTime(maxReadTime).build();
-        }
-
-        @Override
-        public PCollection<KinesisRecord> expand(PBegin input) {
-            org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
-                org.apache.beam.sdk.io.Read.from(
-                    new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition()));
-            if (getMaxNumRecords() > 0) {
-                BoundedReadFromUnboundedSource<KinesisRecord> bounded =
-                    read.withMaxNumRecords(getMaxNumRecords());
-                return getMaxReadTime() == null
-                    ? input.apply(bounded)
-                    : input.apply(bounded.withMaxReadTime(getMaxReadTime()));
-            } else {
-                return getMaxReadTime() == null
-                    ? input.apply(read)
-                    : input.apply(read.withMaxReadTime(getMaxReadTime()));
-            }
-        }
-
-        private static final class BasicKinesisProvider implements KinesisClientProvider {
-
-            private final String accessKey;
-            private final String secretKey;
-            private final Regions region;
-
-            private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
-                this.accessKey = checkNotNull(accessKey, "accessKey");
-                this.secretKey = checkNotNull(secretKey, "secretKey");
-                this.region = checkNotNull(region, "region");
-            }
-
-
-            private AWSCredentialsProvider getCredentialsProvider() {
-                return new StaticCredentialsProvider(new BasicAWSCredentials(
-                        accessKey,
-                        secretKey
-                ));
-
-            }
-
-            @Override
-            public AmazonKinesis get() {
-                AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
-                client.withRegion(region);
-                return client;
-            }
-        }
+    /**
+     * Specify reading from streamName at some initial position.
+     */
+    public Read from(String streamName, InitialPositionInStream initialPosition) {
+      return toBuilder()
+          .setStreamName(streamName)
+          .setInitialPosition(
+              new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
+          .build();
+    }
+
+    /**
+     * Specify reading from streamName beginning at given {@link Instant}.
+     * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
+     */
+    public Read from(String streamName, Instant initialTimestamp) {
+      return toBuilder()
+          .setStreamName(streamName)
+          .setInitialPosition(
+              new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
+          .build();
+    }
+
+    /**
+     * Allows to specify custom {@link KinesisClientProvider}.
+     * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
+     * used for communication with Kinesis.
+     * You should use this method if {@link Read#withClientProvider(String, String, Regions)}
+     * does not suit your needs.
+     */
+    public Read withClientProvider(KinesisClientProvider kinesisClientProvider) {
+      return toBuilder().setClientProvider(kinesisClientProvider).build();
+    }
+
+    /**
+     * Specify credential details and region to be used to read from Kinesis.
+     * If you need more sophisticated credential protocol, then you should look at
+     * {@link Read#withClientProvider(KinesisClientProvider)}.
+     */
+    public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) {
+      return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
+    }
+
+    /** Specifies to read at most a given number of records. */
+    public Read withMaxNumRecords(int maxNumRecords) {
+      checkArgument(
+          maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
+      return toBuilder().setMaxNumRecords(maxNumRecords).build();
+    }
+
+    /** Specifies to read at most a given number of records. */
+    public Read withMaxReadTime(Duration maxReadTime) {
+      checkNotNull(maxReadTime, "maxReadTime");
+      return toBuilder().setMaxReadTime(maxReadTime).build();
+    }
+
+    @Override
+    public PCollection<KinesisRecord> expand(PBegin input) {
+      org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
+          org.apache.beam.sdk.io.Read.from(
+              new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition()));
+      if (getMaxNumRecords() > 0) {
+        BoundedReadFromUnboundedSource<KinesisRecord> bounded =
+            read.withMaxNumRecords(getMaxNumRecords());
+        return getMaxReadTime() == null
+            ? input.apply(bounded)
+            : input.apply(bounded.withMaxReadTime(getMaxReadTime()));
+      } else {
+        return getMaxReadTime() == null
+            ? input.apply(read)
+            : input.apply(read.withMaxReadTime(getMaxReadTime()));
+      }
+    }
+
+    private static final class BasicKinesisProvider implements KinesisClientProvider {
+
+      private final String accessKey;
+      private final String secretKey;
+      private final Regions region;
+
+      private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
+        this.accessKey = checkNotNull(accessKey, "accessKey");
+        this.secretKey = checkNotNull(secretKey, "secretKey");
+        this.region = checkNotNull(region, "region");
+      }
+
+      private AWSCredentialsProvider getCredentialsProvider() {
+        return new StaticCredentialsProvider(new BasicAWSCredentials(
+            accessKey,
+            secretKey
+        ));
+
+      }
+
+      @Override
+      public AmazonKinesis get() {
+        AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
+        client.withRegion(region);
+        return client;
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 2138094..e5c32d2 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -17,129 +17,129 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
+
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Reads data from multiple kinesis shards in a single thread.
  * It uses simple round robin algorithm when fetching data from shards.
  */
 class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
-    private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
-
-    private final SimplifiedKinesisClient kinesis;
-    private final UnboundedSource<KinesisRecord, ?> source;
-    private final CheckpointGenerator initialCheckpointGenerator;
-    private RoundRobin<ShardRecordsIterator> shardIterators;
-    private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
-
-    public KinesisReader(SimplifiedKinesisClient kinesis,
-                         CheckpointGenerator initialCheckpointGenerator,
-                         UnboundedSource<KinesisRecord, ?> source) {
-        this.kinesis = checkNotNull(kinesis, "kinesis");
-        this.initialCheckpointGenerator =
-                checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
-        this.source = source;
-    }
-
-    /**
-     * Generates initial checkpoint and instantiates iterators for shards.
-     */
-    @Override
-    public boolean start() throws IOException {
-        LOG.info("Starting reader using {}", initialCheckpointGenerator);
-
-        try {
-            KinesisReaderCheckpoint initialCheckpoint =
-                    initialCheckpointGenerator.generate(kinesis);
-            List<ShardRecordsIterator> iterators = newArrayList();
-            for (ShardCheckpoint checkpoint : initialCheckpoint) {
-                iterators.add(checkpoint.getShardRecordsIterator(kinesis));
-            }
-            shardIterators = new RoundRobin<>(iterators);
-        } catch (TransientKinesisException e) {
-            throw new IOException(e);
-        }
 
-        return advance();
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
+
+  private final SimplifiedKinesisClient kinesis;
+  private final UnboundedSource<KinesisRecord, ?> source;
+  private final CheckpointGenerator initialCheckpointGenerator;
+  private RoundRobin<ShardRecordsIterator> shardIterators;
+  private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
+
+  public KinesisReader(SimplifiedKinesisClient kinesis,
+      CheckpointGenerator initialCheckpointGenerator,
+      UnboundedSource<KinesisRecord, ?> source) {
+    this.kinesis = checkNotNull(kinesis, "kinesis");
+    this.initialCheckpointGenerator =
+        checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
+    this.source = source;
+  }
+
+  /**
+   * Generates initial checkpoint and instantiates iterators for shards.
+   */
+  @Override
+  public boolean start() throws IOException {
+    LOG.info("Starting reader using {}", initialCheckpointGenerator);
+
+    try {
+      KinesisReaderCheckpoint initialCheckpoint =
+          initialCheckpointGenerator.generate(kinesis);
+      List<ShardRecordsIterator> iterators = newArrayList();
+      for (ShardCheckpoint checkpoint : initialCheckpoint) {
+        iterators.add(checkpoint.getShardRecordsIterator(kinesis));
+      }
+      shardIterators = new RoundRobin<>(iterators);
+    } catch (TransientKinesisException e) {
+      throw new IOException(e);
     }
 
-    /**
-     * Moves to the next record in one of the shards.
-     * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
-     * If not, we iterate over shards in a round-robin manner.
-     */
-    @Override
-    public boolean advance() throws IOException {
-        try {
-            for (int i = 0; i < shardIterators.size(); ++i) {
-                currentRecord = shardIterators.getCurrent().next();
-                if (currentRecord.isPresent()) {
-                    return true;
-                } else {
-                    shardIterators.moveForward();
-                }
-            }
-        } catch (TransientKinesisException e) {
-            LOG.warn("Transient exception occurred", e);
+    return advance();
+  }
+
+  /**
+   * Moves to the next record in one of the shards.
+   * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
+   * If not, we iterate over shards in a round-robin manner.
+   */
+  @Override
+  public boolean advance() throws IOException {
+    try {
+      for (int i = 0; i < shardIterators.size(); ++i) {
+        currentRecord = shardIterators.getCurrent().next();
+        if (currentRecord.isPresent()) {
+          return true;
+        } else {
+          shardIterators.moveForward();
         }
-        return false;
-    }
-
-    @Override
-    public byte[] getCurrentRecordId() throws NoSuchElementException {
-        return currentRecord.get().getUniqueId();
-    }
-
-    @Override
-    public KinesisRecord getCurrent() throws NoSuchElementException {
-        return currentRecord.get();
-    }
-
-    /**
-     * When {@link KinesisReader} was advanced to the current record.
-     * We cannot use approximate arrival timestamp given for each record by Kinesis as it
-     * is not guaranteed to be accurate - this could lead to mark some records as "late"
-     * even if they were not.
-     */
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-        return currentRecord.get().getReadTime();
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    /**
-     * Current time.
-     * We cannot give better approximation of the watermark with current semantics of
-     * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
-     * {@link KinesisReader#advance()} will be called.
-     */
-    @Override
-    public Instant getWatermark() {
-        return Instant.now();
-    }
-
-    @Override
-    public UnboundedSource.CheckpointMark getCheckpointMark() {
-        return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
-    }
-
-    @Override
-    public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
-        return source;
+      }
+    } catch (TransientKinesisException e) {
+      LOG.warn("Transient exception occurred", e);
     }
+    return false;
+  }
+
+  @Override
+  public byte[] getCurrentRecordId() throws NoSuchElementException {
+    return currentRecord.get().getUniqueId();
+  }
+
+  @Override
+  public KinesisRecord getCurrent() throws NoSuchElementException {
+    return currentRecord.get();
+  }
+
+  /**
+   * When {@link KinesisReader} was advanced to the current record.
+   * We cannot use approximate arrival timestamp given for each record by Kinesis as it
+   * is not guaranteed to be accurate - this could lead to mark some records as "late"
+   * even if they were not.
+   */
+  @Override
+  public Instant getCurrentTimestamp() throws NoSuchElementException {
+    return currentRecord.get().getReadTime();
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  /**
+   * Current time.
+   * We cannot give better approximation of the watermark with current semantics of
+   * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
+   * {@link KinesisReader#advance()} will be called.
+   */
+  @Override
+  public Instant getWatermark() {
+    return Instant.now();
+  }
+
+  @Override
+  public UnboundedSource.CheckpointMark getCheckpointMark() {
+    return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
+  }
+
+  @Override
+  public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
+    return source;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
index f0fa45d..d995e75 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -23,11 +23,13 @@ import static com.google.common.collect.Lists.partition;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.io.UnboundedSource;
 
 /**
@@ -37,60 +39,61 @@ import org.apache.beam.sdk.io.UnboundedSource;
  * This class is immutable.
  */
 class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSource
-        .CheckpointMark, Serializable {
-    private final List<ShardCheckpoint> shardCheckpoints;
+    .CheckpointMark, Serializable {
 
-    public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
-        this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
-    }
+  private final List<ShardCheckpoint> shardCheckpoints;
 
-    public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
-                                                                   iterators) {
-        return new KinesisReaderCheckpoint(transform(iterators,
-                new Function<ShardRecordsIterator, ShardCheckpoint>() {
-
-                    @Nullable
-                    @Override
-                    public ShardCheckpoint apply(@Nullable
-                                                 ShardRecordsIterator shardRecordsIterator) {
-                        assert shardRecordsIterator != null;
-                        return shardRecordsIterator.getCheckpoint();
-                    }
-                }));
-    }
+  public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
+    this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
+  }
 
-    /**
-     * Splits given multi-shard checkpoint into partitions of approximately equal size.
-     *
-     * @param desiredNumSplits - upper limit for number of partitions to generate.
-     * @return list of checkpoints covering consecutive partitions of current checkpoint.
-     */
-    public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
-        int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
-
-        List<KinesisReaderCheckpoint> checkpoints = newArrayList();
-        for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
-            checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
-        }
-        return checkpoints;
-    }
+  public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
+      iterators) {
+    return new KinesisReaderCheckpoint(transform(iterators,
+        new Function<ShardRecordsIterator, ShardCheckpoint>() {
 
-    private int divideAndRoundUp(int nominator, int denominator) {
-        return (nominator + denominator - 1) / denominator;
-    }
+          @Nullable
+          @Override
+          public ShardCheckpoint apply(@Nullable
+              ShardRecordsIterator shardRecordsIterator) {
+            assert shardRecordsIterator != null;
+            return shardRecordsIterator.getCheckpoint();
+          }
+        }));
+  }
 
-    @Override
-    public void finalizeCheckpoint() throws IOException {
+  /**
+   * Splits given multi-shard checkpoint into partitions of approximately equal size.
+   *
+   * @param desiredNumSplits - upper limit for number of partitions to generate.
+   * @return list of checkpoints covering consecutive partitions of current checkpoint.
+   */
+  public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
+    int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
 
+    List<KinesisReaderCheckpoint> checkpoints = newArrayList();
+    for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
+      checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
     }
+    return checkpoints;
+  }
 
-    @Override
-    public String toString() {
-        return shardCheckpoints.toString();
-    }
+  private int divideAndRoundUp(int nominator, int denominator) {
+    return (nominator + denominator - 1) / denominator;
+  }
 
-    @Override
-    public Iterator<ShardCheckpoint> iterator() {
-        return shardCheckpoints.iterator();
-    }
+  @Override
+  public void finalizeCheckpoint() throws IOException {
+
+  }
+
+  @Override
+  public String toString() {
+    return shardCheckpoints.toString();
+  }
+
+  @Override
+  public Iterator<ShardCheckpoint> iterator() {
+    return shardCheckpoints.iterator();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
index 02b5370..057b7bb 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
@@ -22,7 +22,9 @@ import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
 import com.google.common.base.Charsets;
+
 import java.nio.ByteBuffer;
+
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.joda.time.Instant;
 
@@ -30,91 +32,92 @@ import org.joda.time.Instant;
  * {@link UserRecord} enhanced with utility methods.
  */
 public class KinesisRecord {
-    private Instant readTime;
-    private String streamName;
-    private String shardId;
-    private long subSequenceNumber;
-    private String sequenceNumber;
-    private Instant approximateArrivalTimestamp;
-    private ByteBuffer data;
-    private String partitionKey;
-
-    public KinesisRecord(UserRecord record, String streamName, String shardId) {
-        this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
-                record.getPartitionKey(),
-                new Instant(record.getApproximateArrivalTimestamp()),
-                Instant.now(),
-                streamName, shardId);
-    }
-
-    public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
-                         String partitionKey, Instant approximateArrivalTimestamp,
-                         Instant readTime,
-                         String streamName, String shardId) {
-        this.data = data;
-        this.sequenceNumber = sequenceNumber;
-        this.subSequenceNumber = subSequenceNumber;
-        this.partitionKey = partitionKey;
-        this.approximateArrivalTimestamp = approximateArrivalTimestamp;
-        this.readTime = readTime;
-        this.streamName = streamName;
-        this.shardId = shardId;
-    }
-
-    public ExtendedSequenceNumber getExtendedSequenceNumber() {
-        return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
-    }
-
-    /***
-     * @return unique id of the record based on its position in the stream
-     */
-    public byte[] getUniqueId() {
-        return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
-    }
-
-    public Instant getReadTime() {
-        return readTime;
-    }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    public String getShardId() {
-        return shardId;
-    }
-
-    public byte[] getDataAsBytes() {
-        return getData().array();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        return EqualsBuilder.reflectionEquals(this, obj);
-    }
-
-    @Override
-    public int hashCode() {
-        return reflectionHashCode(this);
-    }
-
-    public long getSubSequenceNumber() {
-        return subSequenceNumber;
-    }
-
-    public String getSequenceNumber() {
-        return sequenceNumber;
-    }
-
-    public Instant getApproximateArrivalTimestamp() {
-        return approximateArrivalTimestamp;
-    }
-
-    public ByteBuffer getData() {
-        return data;
-    }
-
-    public String getPartitionKey() {
-        return partitionKey;
-    }
+
+  private Instant readTime;
+  private String streamName;
+  private String shardId;
+  private long subSequenceNumber;
+  private String sequenceNumber;
+  private Instant approximateArrivalTimestamp;
+  private ByteBuffer data;
+  private String partitionKey;
+
+  public KinesisRecord(UserRecord record, String streamName, String shardId) {
+    this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
+        record.getPartitionKey(),
+        new Instant(record.getApproximateArrivalTimestamp()),
+        Instant.now(),
+        streamName, shardId);
+  }
+
+  public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
+      String partitionKey, Instant approximateArrivalTimestamp,
+      Instant readTime,
+      String streamName, String shardId) {
+    this.data = data;
+    this.sequenceNumber = sequenceNumber;
+    this.subSequenceNumber = subSequenceNumber;
+    this.partitionKey = partitionKey;
+    this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+    this.readTime = readTime;
+    this.streamName = streamName;
+    this.shardId = shardId;
+  }
+
+  public ExtendedSequenceNumber getExtendedSequenceNumber() {
+    return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
+  }
+
+  /***
+   * @return unique id of the record based on its position in the stream
+   */
+  public byte[] getUniqueId() {
+    return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
+  }
+
+  public Instant getReadTime() {
+    return readTime;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public String getShardId() {
+    return shardId;
+  }
+
+  public byte[] getDataAsBytes() {
+    return getData().array();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return EqualsBuilder.reflectionEquals(this, obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return reflectionHashCode(this);
+  }
+
+  public long getSubSequenceNumber() {
+    return subSequenceNumber;
+  }
+
+  public String getSequenceNumber() {
+    return sequenceNumber;
+  }
+
+  public Instant getApproximateArrivalTimestamp() {
+    return approximateArrivalTimestamp;
+  }
+
+  public ByteBuffer getData() {
+    return data;
+  }
+
+  public String getPartitionKey() {
+    return partitionKey;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index f233e27..dcf564d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -33,40 +34,41 @@ import org.joda.time.Instant;
  * A {@link Coder} for {@link KinesisRecord}.
  */
 class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
-    private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
-    private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
-    private static final InstantCoder INSTANT_CODER = InstantCoder.of();
-    private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
 
-    public static KinesisRecordCoder of() {
-        return new KinesisRecordCoder();
-    }
+  private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+  private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+  private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+  private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
+
+  public static KinesisRecordCoder of() {
+    return new KinesisRecordCoder();
+  }
 
-    @Override
-    public void encode(KinesisRecord value, OutputStream outStream) throws
-            IOException {
-        BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
-        STRING_CODER.encode(value.getSequenceNumber(), outStream);
-        STRING_CODER.encode(value.getPartitionKey(), outStream);
-        INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
-        VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
-        INSTANT_CODER.encode(value.getReadTime(), outStream);
-        STRING_CODER.encode(value.getStreamName(), outStream);
-        STRING_CODER.encode(value.getShardId(), outStream);
-    }
+  @Override
+  public void encode(KinesisRecord value, OutputStream outStream) throws
+      IOException {
+    BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
+    STRING_CODER.encode(value.getSequenceNumber(), outStream);
+    STRING_CODER.encode(value.getPartitionKey(), outStream);
+    INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
+    VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
+    INSTANT_CODER.encode(value.getReadTime(), outStream);
+    STRING_CODER.encode(value.getStreamName(), outStream);
+    STRING_CODER.encode(value.getShardId(), outStream);
+  }
 
-    @Override
-    public KinesisRecord decode(InputStream inStream) throws IOException {
-        ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
-        String sequenceNumber = STRING_CODER.decode(inStream);
-        String partitionKey = STRING_CODER.decode(inStream);
-        Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
-        long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
-        Instant readTimestamp = INSTANT_CODER.decode(inStream);
-        String streamName = STRING_CODER.decode(inStream);
-        String shardId = STRING_CODER.decode(inStream);
-        return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
-                approximateArrivalTimestamp, readTimestamp, streamName, shardId
-        );
-    }
+  @Override
+  public KinesisRecord decode(InputStream inStream) throws IOException {
+    ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
+    String sequenceNumber = STRING_CODER.decode(inStream);
+    String partitionKey = STRING_CODER.decode(inStream);
+    Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
+    long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
+    Instant readTimestamp = INSTANT_CODER.decode(inStream);
+    String streamName = STRING_CODER.decode(inStream);
+    String shardId = STRING_CODER.decode(inStream);
+    return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
+        approximateArrivalTimestamp, readTimestamp, streamName, shardId
+    );
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 7e67d07..362792b 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
 
 import java.util.List;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -28,85 +29,85 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Represents source for single stream in Kinesis.
  */
 class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> {
-    private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
-
-    private final KinesisClientProvider kinesis;
-    private CheckpointGenerator initialCheckpointGenerator;
 
-    public KinesisSource(KinesisClientProvider kinesis, String streamName,
-                         StartingPoint startingPoint) {
-        this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
+
+  private final KinesisClientProvider kinesis;
+  private CheckpointGenerator initialCheckpointGenerator;
+
+  public KinesisSource(KinesisClientProvider kinesis, String streamName,
+      StartingPoint startingPoint) {
+    this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+  }
+
+  private KinesisSource(KinesisClientProvider kinesisClientProvider,
+      CheckpointGenerator initialCheckpoint) {
+    this.kinesis = kinesisClientProvider;
+    this.initialCheckpointGenerator = initialCheckpoint;
+    validate();
+  }
+
+  /**
+   * Generate splits for reading from the stream.
+   * Basically, it'll try to evenly split set of shards in the stream into
+   * {@code desiredNumSplits} partitions. Each partition is then a split.
+   */
+  @Override
+  public List<KinesisSource> split(int desiredNumSplits,
+      PipelineOptions options) throws Exception {
+    KinesisReaderCheckpoint checkpoint =
+        initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
+
+    List<KinesisSource> sources = newArrayList();
+
+    for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
+      sources.add(new KinesisSource(
+          kinesis,
+          new StaticCheckpointGenerator(partition)));
     }
-
-    private KinesisSource(KinesisClientProvider kinesisClientProvider,
-                          CheckpointGenerator initialCheckpoint) {
-        this.kinesis = kinesisClientProvider;
-        this.initialCheckpointGenerator = initialCheckpoint;
-        validate();
+    return sources;
+  }
+
+  /**
+   * Creates reader based on given {@link KinesisReaderCheckpoint}.
+   * If {@link KinesisReaderCheckpoint} is not given, then we use
+   * {@code initialCheckpointGenerator} to generate new checkpoint.
+   */
+  @Override
+  public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
+      KinesisReaderCheckpoint checkpointMark) {
+
+    CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
+
+    if (checkpointMark != null) {
+      checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
     }
 
-    /**
-     * Generate splits for reading from the stream.
-     * Basically, it'll try to evenly split set of shards in the stream into
-     * {@code desiredNumSplits} partitions. Each partition is then a split.
-     */
-    @Override
-    public List<KinesisSource> split(int desiredNumSplits,
-                                                     PipelineOptions options) throws Exception {
-        KinesisReaderCheckpoint checkpoint =
-                initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
-
-        List<KinesisSource> sources = newArrayList();
-
-        for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
-            sources.add(new KinesisSource(
-                    kinesis,
-                    new StaticCheckpointGenerator(partition)));
-        }
-        return sources;
-    }
-
-    /**
-     * Creates reader based on given {@link KinesisReaderCheckpoint}.
-     * If {@link KinesisReaderCheckpoint} is not given, then we use
-     * {@code initialCheckpointGenerator} to generate new checkpoint.
-     */
-    @Override
-    public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
-                                                KinesisReaderCheckpoint checkpointMark) {
-
-        CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
-
-        if (checkpointMark != null) {
-            checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
-        }
-
-        LOG.info("Creating new reader using {}", checkpointGenerator);
-
-        return new KinesisReader(
-                SimplifiedKinesisClient.from(kinesis),
-                checkpointGenerator,
-                this);
-    }
-
-    @Override
-    public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
-        return SerializableCoder.of(KinesisReaderCheckpoint.class);
-    }
-
-    @Override
-    public void validate() {
-        checkNotNull(kinesis);
-        checkNotNull(initialCheckpointGenerator);
-    }
-
-    @Override
-    public Coder<KinesisRecord> getDefaultOutputCoder() {
-        return KinesisRecordCoder.of();
-    }
+    LOG.info("Creating new reader using {}", checkpointGenerator);
+
+    return new KinesisReader(
+        SimplifiedKinesisClient.from(kinesis),
+        checkpointGenerator,
+        this);
+  }
+
+  @Override
+  public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
+    return SerializableCoder.of(KinesisReaderCheckpoint.class);
+  }
+
+  @Override
+  public void validate() {
+    checkNotNull(kinesis);
+    checkNotNull(initialCheckpointGenerator);
+  }
+
+  @Override
+  public Coder<KinesisRecord> getDefaultOutputCoder() {
+    return KinesisRecordCoder.of();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
index 40e65fc..eca725c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
@@ -21,7 +21,6 @@ import static com.google.common.collect.Lists.newArrayList;
 
 import java.util.List;
 
-
 /**
  * Filters out records, which were already processed and checkpointed.
  *
@@ -29,13 +28,14 @@ import java.util.List;
  * accuracy, not with "subSequenceNumber" accuracy.
  */
 class RecordFilter {
-    public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
-        List<KinesisRecord> filteredRecords = newArrayList();
-        for (KinesisRecord record : records) {
-            if (checkpoint.isBeforeOrAt(record)) {
-                filteredRecords.add(record);
-            }
-        }
-        return filteredRecords;
+
+  public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
+    List<KinesisRecord> filteredRecords = newArrayList();
+    for (KinesisRecord record : records) {
+      if (checkpoint.isBeforeOrAt(record)) {
+        filteredRecords.add(record);
+      }
     }
+    return filteredRecords;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
index e4ff541..806d982 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
@@ -27,27 +27,28 @@ import java.util.Iterator;
  * Very simple implementation of round robin algorithm.
  */
 class RoundRobin<T> implements Iterable<T> {
-    private final Deque<T> deque;
 
-    public RoundRobin(Iterable<T> collection) {
-        this.deque = newArrayDeque(collection);
-        checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
-    }
+  private final Deque<T> deque;
 
-    public T getCurrent() {
-        return deque.getFirst();
-    }
+  public RoundRobin(Iterable<T> collection) {
+    this.deque = newArrayDeque(collection);
+    checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
+  }
 
-    public void moveForward() {
-        deque.addLast(deque.removeFirst());
-    }
+  public T getCurrent() {
+    return deque.getFirst();
+  }
 
-    public int size() {
-        return deque.size();
-    }
+  public void moveForward() {
+    deque.addLast(deque.removeFirst());
+  }
 
-    @Override
-    public Iterator<T> iterator() {
-        return deque.iterator();
-    }
+  public int size() {
+    return deque.size();
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return deque.iterator();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
index 6aa3504..95f97b8 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
@@ -27,9 +26,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import java.io.Serializable;
-import org.joda.time.Instant;
 
+import org.joda.time.Instant;
 
 /**
  * Checkpoint mark for single shard in the stream.
@@ -45,131 +45,132 @@ import org.joda.time.Instant;
  * This class is immutable.
  */
 class ShardCheckpoint implements Serializable {
-    private final String streamName;
-    private final String shardId;
-    private final String sequenceNumber;
-    private final ShardIteratorType shardIteratorType;
-    private final Long subSequenceNumber;
-    private final Instant timestamp;
-
-    public ShardCheckpoint(String streamName, String shardId, StartingPoint
-            startingPoint) {
-        this(streamName, shardId,
-                ShardIteratorType.fromValue(startingPoint.getPositionName()),
-                startingPoint.getTimestamp());
-    }
-
-    public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
-            shardIteratorType, Instant timestamp) {
-        this(streamName, shardId, shardIteratorType, null, null, timestamp);
-    }
-
-    public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
-            shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
-        this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
-    }
-
-    private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
-                            String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
-        this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
-        this.streamName = checkNotNull(streamName, "streamName");
-        this.shardId = checkNotNull(shardId, "shardId");
-        if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
-            checkNotNull(sequenceNumber,
-                    "You must provide sequence number for AT_SEQUENCE_NUMBER"
-                            + " or AFTER_SEQUENCE_NUMBER");
-        } else {
-            checkArgument(sequenceNumber == null,
-                    "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
-        }
-        if (shardIteratorType == AT_TIMESTAMP) {
-            checkNotNull(timestamp,
-                    "You must provide timestamp for AT_SEQUENCE_NUMBER"
-                            + " or AFTER_SEQUENCE_NUMBER");
-        } else {
-            checkArgument(timestamp == null,
-                    "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
-        }
-
-        this.subSequenceNumber = subSequenceNumber;
-        this.sequenceNumber = sequenceNumber;
-        this.timestamp = timestamp;
-    }
-
-    /**
-     * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
-     * on the the underlying shardIteratorType, it will either compare the timestamp or the
-     * {@link ExtendedSequenceNumber}.
-     *
-     * @param other
-     * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
-     */
-    public boolean isBeforeOrAt(KinesisRecord other) {
-        if (shardIteratorType == AT_TIMESTAMP) {
-            return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
-        }
-        int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
-        if (result == 0) {
-            return shardIteratorType == AT_SEQUENCE_NUMBER;
-        }
-        return result < 0;
-    }
-
-    private ExtendedSequenceNumber extendedSequenceNumber() {
-        String fullSequenceNumber = sequenceNumber;
-        if (fullSequenceNumber == null) {
-            fullSequenceNumber = shardIteratorType.toString();
-        }
-        return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
-    }
 
-    @Override
-    public String toString() {
-        return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
-                streamName, shardId,
-                sequenceNumber);
+  private final String streamName;
+  private final String shardId;
+  private final String sequenceNumber;
+  private final ShardIteratorType shardIteratorType;
+  private final Long subSequenceNumber;
+  private final Instant timestamp;
+
+  public ShardCheckpoint(String streamName, String shardId, StartingPoint
+      startingPoint) {
+    this(streamName, shardId,
+        ShardIteratorType.fromValue(startingPoint.getPositionName()),
+        startingPoint.getTimestamp());
+  }
+
+  public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+      shardIteratorType, Instant timestamp) {
+    this(streamName, shardId, shardIteratorType, null, null, timestamp);
+  }
+
+  public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+      shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
+    this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
+  }
+
+  private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
+      String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
+    this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
+    this.streamName = checkNotNull(streamName, "streamName");
+    this.shardId = checkNotNull(shardId, "shardId");
+    if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
+      checkNotNull(sequenceNumber,
+          "You must provide sequence number for AT_SEQUENCE_NUMBER"
+              + " or AFTER_SEQUENCE_NUMBER");
+    } else {
+      checkArgument(sequenceNumber == null,
+          "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
     }
-
-    public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
-            throws TransientKinesisException {
-        return new ShardRecordsIterator(this, kinesis);
+    if (shardIteratorType == AT_TIMESTAMP) {
+      checkNotNull(timestamp,
+          "You must provide timestamp for AT_SEQUENCE_NUMBER"
+              + " or AFTER_SEQUENCE_NUMBER");
+    } else {
+      checkArgument(timestamp == null,
+          "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
     }
 
-    public String getShardIterator(SimplifiedKinesisClient kinesisClient)
-            throws TransientKinesisException {
-        if (checkpointIsInTheMiddleOfAUserRecord()) {
-            return kinesisClient.getShardIterator(streamName,
-                    shardId, AT_SEQUENCE_NUMBER,
-                    sequenceNumber, null);
-        }
-        return kinesisClient.getShardIterator(streamName,
-                shardId, shardIteratorType,
-                sequenceNumber, timestamp);
+    this.subSequenceNumber = subSequenceNumber;
+    this.sequenceNumber = sequenceNumber;
+    this.timestamp = timestamp;
+  }
+
+  /**
+   * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
+   * on the the underlying shardIteratorType, it will either compare the timestamp or the
+   * {@link ExtendedSequenceNumber}.
+   *
+   * @param other
+   * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
+   */
+  public boolean isBeforeOrAt(KinesisRecord other) {
+    if (shardIteratorType == AT_TIMESTAMP) {
+      return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
     }
-
-    private boolean checkpointIsInTheMiddleOfAUserRecord() {
-        return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
+    int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
+    if (result == 0) {
+      return shardIteratorType == AT_SEQUENCE_NUMBER;
     }
+    return result < 0;
+  }
 
-    /**
-     * Used to advance checkpoint mark to position after given {@link Record}.
-     *
-     * @param record
-     * @return new checkpoint object pointing directly after given {@link Record}
-     */
-    public ShardCheckpoint moveAfter(KinesisRecord record) {
-        return new ShardCheckpoint(
-                streamName, shardId,
-                AFTER_SEQUENCE_NUMBER,
-                record.getSequenceNumber(),
-                record.getSubSequenceNumber());
+  private ExtendedSequenceNumber extendedSequenceNumber() {
+    String fullSequenceNumber = sequenceNumber;
+    if (fullSequenceNumber == null) {
+      fullSequenceNumber = shardIteratorType.toString();
     }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    public String getShardId() {
-        return shardId;
+    return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
+        streamName, shardId,
+        sequenceNumber);
+  }
+
+  public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
+      throws TransientKinesisException {
+    return new ShardRecordsIterator(this, kinesis);
+  }
+
+  public String getShardIterator(SimplifiedKinesisClient kinesisClient)
+      throws TransientKinesisException {
+    if (checkpointIsInTheMiddleOfAUserRecord()) {
+      return kinesisClient.getShardIterator(streamName,
+          shardId, AT_SEQUENCE_NUMBER,
+          sequenceNumber, null);
     }
+    return kinesisClient.getShardIterator(streamName,
+        shardId, shardIteratorType,
+        sequenceNumber, timestamp);
+  }
+
+  private boolean checkpointIsInTheMiddleOfAUserRecord() {
+    return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
+  }
+
+  /**
+   * Used to advance checkpoint mark to position after given {@link Record}.
+   *
+   * @param record
+   * @return new checkpoint object pointing directly after given {@link Record}
+   */
+  public ShardCheckpoint moveAfter(KinesisRecord record) {
+    return new ShardCheckpoint(
+        streamName, shardId,
+        AFTER_SEQUENCE_NUMBER,
+        record.getSequenceNumber(),
+        record.getSubSequenceNumber());
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public String getShardId() {
+    return shardId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
index 872f604..a69c6c1 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -21,7 +21,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Queues.newArrayDeque;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+
 import java.util.Deque;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,68 +33,68 @@ import org.slf4j.LoggerFactory;
  * Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one.
  */
 class ShardRecordsIterator {
-    private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
 
-    private final SimplifiedKinesisClient kinesis;
-    private final RecordFilter filter;
-    private ShardCheckpoint checkpoint;
-    private String shardIterator;
-    private Deque<KinesisRecord> data = newArrayDeque();
+  private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
 
-    public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
-                                SimplifiedKinesisClient simplifiedKinesisClient) throws
-            TransientKinesisException {
-        this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
-    }
+  private final SimplifiedKinesisClient kinesis;
+  private final RecordFilter filter;
+  private ShardCheckpoint checkpoint;
+  private String shardIterator;
+  private Deque<KinesisRecord> data = newArrayDeque();
 
-    public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
-                                SimplifiedKinesisClient simplifiedKinesisClient,
-                                RecordFilter filter) throws
-            TransientKinesisException {
+  public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+      SimplifiedKinesisClient simplifiedKinesisClient) throws
+      TransientKinesisException {
+    this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
+  }
 
-        this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
-        this.filter = checkNotNull(filter, "filter");
-        this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
-        shardIterator = checkpoint.getShardIterator(kinesis);
-    }
+  public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+      SimplifiedKinesisClient simplifiedKinesisClient,
+      RecordFilter filter) throws
+      TransientKinesisException {
 
-    /**
-     * Returns record if there's any present.
-     * Returns absent() if there are no new records at this time in the shard.
-     */
-    public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
-        readMoreIfNecessary();
+    this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
+    this.filter = checkNotNull(filter, "filter");
+    this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
+    shardIterator = checkpoint.getShardIterator(kinesis);
+  }
 
-        if (data.isEmpty()) {
-            return CustomOptional.absent();
-        } else {
-            KinesisRecord record = data.removeFirst();
-            checkpoint = checkpoint.moveAfter(record);
-            return CustomOptional.of(record);
-        }
-    }
+  /**
+   * Returns record if there's any present.
+   * Returns absent() if there are no new records at this time in the shard.
+   */
+  public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
+    readMoreIfNecessary();
 
-    private void readMoreIfNecessary() throws TransientKinesisException {
-        if (data.isEmpty()) {
-            GetKinesisRecordsResult response;
-            try {
-                response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
-                        checkpoint.getShardId());
-            } catch (ExpiredIteratorException e) {
-                LOG.info("Refreshing expired iterator", e);
-                shardIterator = checkpoint.getShardIterator(kinesis);
-                response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
-                        checkpoint.getShardId());
-            }
-            LOG.debug("Fetched {} new records", response.getRecords().size());
-            shardIterator = response.getNextShardIterator();
-            data.addAll(filter.apply(response.getRecords(), checkpoint));
-        }
+    if (data.isEmpty()) {
+      return CustomOptional.absent();
+    } else {
+      KinesisRecord record = data.removeFirst();
+      checkpoint = checkpoint.moveAfter(record);
+      return CustomOptional.of(record);
     }
+  }
 
-    public ShardCheckpoint getCheckpoint() {
-        return checkpoint;
+  private void readMoreIfNecessary() throws TransientKinesisException {
+    if (data.isEmpty()) {
+      GetKinesisRecordsResult response;
+      try {
+        response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+            checkpoint.getShardId());
+      } catch (ExpiredIteratorException e) {
+        LOG.info("Refreshing expired iterator", e);
+        shardIterator = checkpoint.getShardIterator(kinesis);
+        response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+            checkpoint.getShardId());
+      }
+      LOG.debug("Fetched {} new records", response.getRecords().size());
+      shardIterator = response.getNextShardIterator();
+      data.addAll(filter.apply(response.getRecords(), checkpoint));
     }
+  }
 
+  public ShardCheckpoint getCheckpoint() {
+    return checkpoint;
+  }
 
 }


[2/4] beam git commit: Reformatting Kinesis IO to comply with official code style

Posted by jb...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index 3e3984a..80c950f 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
@@ -31,9 +30,11 @@ import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.google.common.collect.Lists;
+
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.Callable;
+
 import org.joda.time.Instant;
 
 /**
@@ -41,117 +42,121 @@ import org.joda.time.Instant;
  * proper error handling.
  */
 class SimplifiedKinesisClient {
-    private final AmazonKinesis kinesis;
 
-    public SimplifiedKinesisClient(AmazonKinesis kinesis) {
-        this.kinesis = kinesis;
-    }
+  private final AmazonKinesis kinesis;
 
-    public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
-        return new SimplifiedKinesisClient(provider.get());
-    }
+  public SimplifiedKinesisClient(AmazonKinesis kinesis) {
+    this.kinesis = kinesis;
+  }
 
-    public String getShardIterator(final String streamName, final String shardId,
-                                   final ShardIteratorType shardIteratorType,
-                                   final String startingSequenceNumber, final Instant timestamp)
-            throws TransientKinesisException {
-        final Date date = timestamp != null ? timestamp.toDate() : null;
-        return wrapExceptions(new Callable<String>() {
-            @Override
-            public String call() throws Exception {
-                return kinesis.getShardIterator(new GetShardIteratorRequest()
-                        .withStreamName(streamName)
-                        .withShardId(shardId)
-                        .withShardIteratorType(shardIteratorType)
-                        .withStartingSequenceNumber(startingSequenceNumber)
-                        .withTimestamp(date)
-                ).getShardIterator();
-            }
-        });
-    }
+  public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
+    return new SimplifiedKinesisClient(provider.get());
+  }
 
-    public List<Shard> listShards(final String streamName) throws TransientKinesisException {
-        return wrapExceptions(new Callable<List<Shard>>() {
-            @Override
-            public List<Shard> call() throws Exception {
-                List<Shard> shards = Lists.newArrayList();
-                String lastShardId = null;
-
-                StreamDescription description;
-                do {
-                    description = kinesis.describeStream(streamName, lastShardId)
-                            .getStreamDescription();
-
-                    shards.addAll(description.getShards());
-                    lastShardId = shards.get(shards.size() - 1).getShardId();
-                } while (description.getHasMoreShards());
-
-                return shards;
-            }
-        });
-    }
+  public String getShardIterator(final String streamName, final String shardId,
+      final ShardIteratorType shardIteratorType,
+      final String startingSequenceNumber, final Instant timestamp)
+      throws TransientKinesisException {
+    final Date date = timestamp != null ? timestamp.toDate() : null;
+    return wrapExceptions(new Callable<String>() {
 
-    /**
-     * Gets records from Kinesis and deaggregates them if needed.
-     *
-     * @return list of deaggregated records
-     * @throws TransientKinesisException - in case of recoverable situation
-     */
-    public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
-                                              String shardId) throws TransientKinesisException {
-        return getRecords(shardIterator, streamName, shardId, null);
-    }
+      @Override
+      public String call() throws Exception {
+        return kinesis.getShardIterator(new GetShardIteratorRequest()
+            .withStreamName(streamName)
+            .withShardId(shardId)
+            .withShardIteratorType(shardIteratorType)
+            .withStartingSequenceNumber(startingSequenceNumber)
+            .withTimestamp(date)
+        ).getShardIterator();
+      }
+    });
+  }
 
-    /**
-     * Gets records from Kinesis and deaggregates them if needed.
-     *
-     * @return list of deaggregated records
-     * @throws TransientKinesisException - in case of recoverable situation
-     */
-    public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
-                                              final String shardId, final Integer limit)
-            throws
-            TransientKinesisException {
-        return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
-            @Override
-            public GetKinesisRecordsResult call() throws Exception {
-                GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
-                        .withShardIterator(shardIterator)
-                        .withLimit(limit));
-                return new GetKinesisRecordsResult(
-                        UserRecord.deaggregate(response.getRecords()),
-                        response.getNextShardIterator(),
-                        streamName, shardId);
-            }
-        });
-    }
+  public List<Shard> listShards(final String streamName) throws TransientKinesisException {
+    return wrapExceptions(new Callable<List<Shard>>() {
+
+      @Override
+      public List<Shard> call() throws Exception {
+        List<Shard> shards = Lists.newArrayList();
+        String lastShardId = null;
+
+        StreamDescription description;
+        do {
+          description = kinesis.describeStream(streamName, lastShardId)
+              .getStreamDescription();
+
+          shards.addAll(description.getShards());
+          lastShardId = shards.get(shards.size() - 1).getShardId();
+        } while (description.getHasMoreShards());
+
+        return shards;
+      }
+    });
+  }
+
+  /**
+   * Gets records from Kinesis and deaggregates them if needed.
+   *
+   * @return list of deaggregated records
+   * @throws TransientKinesisException - in case of recoverable situation
+   */
+  public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
+      String shardId) throws TransientKinesisException {
+    return getRecords(shardIterator, streamName, shardId, null);
+  }
+
+  /**
+   * Gets records from Kinesis and deaggregates them if needed.
+   *
+   * @return list of deaggregated records
+   * @throws TransientKinesisException - in case of recoverable situation
+   */
+  public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
+      final String shardId, final Integer limit)
+      throws
+      TransientKinesisException {
+    return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
+
+      @Override
+      public GetKinesisRecordsResult call() throws Exception {
+        GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
+            .withShardIterator(shardIterator)
+            .withLimit(limit));
+        return new GetKinesisRecordsResult(
+            UserRecord.deaggregate(response.getRecords()),
+            response.getNextShardIterator(),
+            streamName, shardId);
+      }
+    });
+  }
 
-    /**
-     * Wraps Amazon specific exceptions into more friendly format.
-     *
-     * @throws TransientKinesisException              - in case of recoverable situation, i.e.
-     *                                  the request rate is too high, Kinesis remote service
-     *                                  failed, network issue, etc.
-     * @throws ExpiredIteratorException - if iterator needs to be refreshed
-     * @throws RuntimeException         - in all other cases
-     */
-    private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
-        try {
-            return callable.call();
-        } catch (ExpiredIteratorException e) {
-            throw e;
-        } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
-            throw new TransientKinesisException(
-                    "Too many requests to Kinesis. Wait some time and retry.", e);
-        } catch (AmazonServiceException e) {
-            if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
-                throw new TransientKinesisException(
-                        "Kinesis backend failed. Wait some time and retry.", e);
-            }
-            throw new RuntimeException("Kinesis client side failure", e);
-        } catch (Exception e) {
-            throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
-        }
+  /**
+   * Wraps Amazon specific exceptions into more friendly format.
+   *
+   * @throws TransientKinesisException              - in case of recoverable situation, i.e.
+   *                                  the request rate is too high, Kinesis remote service
+   *                                  failed, network issue, etc.
+   * @throws ExpiredIteratorException - if iterator needs to be refreshed
+   * @throws RuntimeException         - in all other cases
+   */
+  private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
+    try {
+      return callable.call();
+    } catch (ExpiredIteratorException e) {
+      throw e;
+    } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
+      throw new TransientKinesisException(
+          "Too many requests to Kinesis. Wait some time and retry.", e);
+    } catch (AmazonServiceException e) {
+      if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
+        throw new TransientKinesisException(
+            "Kinesis backend failed. Wait some time and retry.", e);
+      }
+      throw new RuntimeException("Kinesis client side failure", e);
+    } catch (Exception e) {
+      throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
index d8842c4..f9298fa 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
@@ -17,13 +17,14 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import java.io.Serializable;
 import java.util.Objects;
+
 import org.joda.time.Instant;
 
 /**
@@ -32,54 +33,55 @@ import org.joda.time.Instant;
  * in which case the reader will start reading at the specified point in time.
  */
 class StartingPoint implements Serializable {
-    private final InitialPositionInStream position;
-    private final Instant timestamp;
 
-    public StartingPoint(InitialPositionInStream position) {
-        this.position = checkNotNull(position, "position");
-        this.timestamp = null;
-    }
+  private final InitialPositionInStream position;
+  private final Instant timestamp;
 
-    public StartingPoint(Instant timestamp) {
-        this.timestamp = checkNotNull(timestamp, "timestamp");
-        this.position = null;
-    }
+  public StartingPoint(InitialPositionInStream position) {
+    this.position = checkNotNull(position, "position");
+    this.timestamp = null;
+  }
 
-    public InitialPositionInStream getPosition() {
-        return position;
-    }
+  public StartingPoint(Instant timestamp) {
+    this.timestamp = checkNotNull(timestamp, "timestamp");
+    this.position = null;
+  }
 
-    public String getPositionName() {
-        return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
-    }
+  public InitialPositionInStream getPosition() {
+    return position;
+  }
 
-    public Instant getTimestamp() {
-        return timestamp != null ? timestamp : null;
-    }
+  public String getPositionName() {
+    return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
+  }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        StartingPoint that = (StartingPoint) o;
-        return position == that.position && Objects.equals(timestamp, that.timestamp);
-    }
+  public Instant getTimestamp() {
+    return timestamp != null ? timestamp : null;
+  }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(position, timestamp);
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
     }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    StartingPoint that = (StartingPoint) o;
+    return position == that.position && Objects.equals(timestamp, that.timestamp);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(position, timestamp);
+  }
 
-    @Override
-    public String toString() {
-        if (timestamp == null) {
-            return position.toString();
-        } else {
-            return "Starting at timestamp " + timestamp;
-        }
+  @Override
+  public String toString() {
+    if (timestamp == null) {
+      return position.toString();
+    } else {
+      return "Starting at timestamp " + timestamp;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
index 22dc973..1ec865d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
@@ -23,20 +23,21 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * Always returns the same instance of checkpoint.
  */
 class StaticCheckpointGenerator implements CheckpointGenerator {
-    private final KinesisReaderCheckpoint checkpoint;
 
-    public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
-        checkNotNull(checkpoint, "checkpoint");
-        this.checkpoint = checkpoint;
-    }
+  private final KinesisReaderCheckpoint checkpoint;
 
-    @Override
-    public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
-        return checkpoint;
-    }
+  public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
+    checkNotNull(checkpoint, "checkpoint");
+    this.checkpoint = checkpoint;
+  }
 
-    @Override
-    public String toString() {
-        return checkpoint.toString();
-    }
+  @Override
+  public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
+    return checkpoint;
+  }
+
+  @Override
+  public String toString() {
+    return checkpoint.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
index 57ad8a8..68ca0d7 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
@@ -23,7 +23,8 @@ import com.amazonaws.AmazonServiceException;
  * A transient exception thrown by Kinesis.
  */
 class TransientKinesisException extends Exception {
-    public TransientKinesisException(String s, AmazonServiceException e) {
-        super(s, e);
-    }
+
+  public TransientKinesisException(String s, AmazonServiceException e) {
+    super(s, e);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index 046c9d9..994d6e3 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -66,10 +66,12 @@ import com.amazonaws.services.kinesis.model.SplitShardRequest;
 import com.amazonaws.services.kinesis.model.SplitShardResult;
 import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.google.common.base.Function;
+
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
 import javax.annotation.Nullable;
+
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.joda.time.Instant;
 
@@ -78,298 +80,301 @@ import org.joda.time.Instant;
  */
 class AmazonKinesisMock implements AmazonKinesis {
 
-    static class TestData implements Serializable {
-        private final String data;
-        private final Instant arrivalTimestamp;
-        private final String sequenceNumber;
-
-        public TestData(KinesisRecord record) {
-            this(new String(record.getData().array()),
-                    record.getApproximateArrivalTimestamp(),
-                    record.getSequenceNumber());
-        }
-
-        public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
-            this.data = data;
-            this.arrivalTimestamp = arrivalTimestamp;
-            this.sequenceNumber = sequenceNumber;
-        }
-
-        public Record convertToRecord() {
-            return new Record().
-                    withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
-                    withData(ByteBuffer.wrap(data.getBytes())).
-                    withSequenceNumber(sequenceNumber).
-                    withPartitionKey("");
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            return EqualsBuilder.reflectionEquals(this, obj);
-        }
-
-        @Override
-        public int hashCode() {
-            return reflectionHashCode(this);
-        }
-    }
-
-    static class Provider implements KinesisClientProvider {
-
-        private final List<List<TestData>> shardedData;
-        private final int numberOfRecordsPerGet;
-
-        public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
-            this.shardedData = shardedData;
-            this.numberOfRecordsPerGet = numberOfRecordsPerGet;
-        }
-
-        @Override
-        public AmazonKinesis get() {
-            return new AmazonKinesisMock(transform(shardedData,
-                    new Function<List<TestData>, List<Record>>() {
-                        @Override
-                        public List<Record> apply(@Nullable List<TestData> testDatas) {
-                            return transform(testDatas, new Function<TestData, Record>() {
-                                @Override
-                                public Record apply(@Nullable TestData testData) {
-                                    return testData.convertToRecord();
-                                }
-                            });
-                        }
-                    }), numberOfRecordsPerGet);
-        }
-    }
-
-    private final List<List<Record>> shardedData;
-    private final int numberOfRecordsPerGet;
-
-    public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
-        this.shardedData = shardedData;
-        this.numberOfRecordsPerGet = numberOfRecordsPerGet;
-    }
-
-    @Override
-    public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
-        String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
-        int shardId = parseInt(shardIteratorParts[0]);
-        int startingRecord = parseInt(shardIteratorParts[1]);
-        List<Record> shardData = shardedData.get(shardId);
-
-        int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
-        int fromIndex = min(startingRecord, toIndex);
-        return new GetRecordsResult().
-                withRecords(shardData.subList(fromIndex, toIndex)).
-                withNextShardIterator(String.format("%s:%s", shardId, toIndex));
-    }
-
-    @Override
-    public GetShardIteratorResult getShardIterator(
-            GetShardIteratorRequest getShardIteratorRequest) {
-        ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
-                getShardIteratorRequest.getShardIteratorType());
-
-        String shardIterator;
-        if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
-            shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
-        } else {
-            throw new RuntimeException("Not implemented");
-        }
-
-        return new GetShardIteratorResult().withShardIterator(shardIterator);
-    }
-
-    @Override
-    public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
-        int nextShardId = 0;
-        if (exclusiveStartShardId != null) {
-            nextShardId = parseInt(exclusiveStartShardId) + 1;
-        }
-        boolean hasMoreShards = nextShardId + 1 < shardedData.size();
-
-        List<Shard> shards = newArrayList();
-        if (nextShardId < shardedData.size()) {
-            shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
-        }
-
-        return new DescribeStreamResult().withStreamDescription(
-                new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
-        );
-    }
-
-    @Override
-    public void setEndpoint(String endpoint) {
-
-    }
-
-    @Override
-    public void setRegion(Region region) {
-
-    }
-
-    @Override
-    public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public CreateStreamResult createStream(String streamName, Integer shardCount) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
-            DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DeleteStreamResult deleteStream(String streamName) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DescribeStreamResult describeStream(String streamName) {
-
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DescribeStreamResult describeStream(String streamName,
-                                               Integer limit, String exclusiveStartShardId) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
-            DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
-            EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public GetShardIteratorResult getShardIterator(String streamName,
-                                                   String shardId,
-                                                   String shardIteratorType) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public GetShardIteratorResult getShardIterator(String streamName,
-                                                   String shardId,
-                                                   String shardIteratorType,
-                                                   String startingSequenceNumber) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
-            IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
-        throw new RuntimeException("Not implemented");
-    }
+  static class TestData implements Serializable {
 
-    @Override
-    public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
-        throw new RuntimeException("Not implemented");
-    }
+    private final String data;
+    private final Instant arrivalTimestamp;
+    private final String sequenceNumber;
 
-    @Override
-    public ListStreamsResult listStreams() {
-        throw new RuntimeException("Not implemented");
+    public TestData(KinesisRecord record) {
+      this(new String(record.getData().array()),
+          record.getApproximateArrivalTimestamp(),
+          record.getSequenceNumber());
     }
 
-    @Override
-    public ListStreamsResult listStreams(String exclusiveStartStreamName) {
-        throw new RuntimeException("Not implemented");
+    public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
+      this.data = data;
+      this.arrivalTimestamp = arrivalTimestamp;
+      this.sequenceNumber = sequenceNumber;
     }
 
-    @Override
-    public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
-        throw new RuntimeException("Not implemented");
+    public Record convertToRecord() {
+      return new Record().
+          withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
+          withData(ByteBuffer.wrap(data.getBytes())).
+          withSequenceNumber(sequenceNumber).
+          withPartitionKey("");
     }
 
     @Override
-    public ListTagsForStreamResult listTagsForStream(
-            ListTagsForStreamRequest listTagsForStreamRequest) {
-        throw new RuntimeException("Not implemented");
+    public boolean equals(Object obj) {
+      return EqualsBuilder.reflectionEquals(this, obj);
     }
 
     @Override
-    public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
-        throw new RuntimeException("Not implemented");
+    public int hashCode() {
+      return reflectionHashCode(this);
     }
+  }
 
-    @Override
-    public MergeShardsResult mergeShards(String streamName,
-                                         String shardToMerge, String adjacentShardToMerge) {
-        throw new RuntimeException("Not implemented");
-    }
+  static class Provider implements KinesisClientProvider {
 
-    @Override
-    public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
-        throw new RuntimeException("Not implemented");
-    }
+    private final List<List<TestData>> shardedData;
+    private final int numberOfRecordsPerGet;
 
-    @Override
-    public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
-        throw new RuntimeException("Not implemented");
+    public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
+      this.shardedData = shardedData;
+      this.numberOfRecordsPerGet = numberOfRecordsPerGet;
     }
 
     @Override
-    public PutRecordResult putRecord(String streamName, ByteBuffer data,
-                                     String partitionKey, String sequenceNumberForOrdering) {
-        throw new RuntimeException("Not implemented");
-    }
+    public AmazonKinesis get() {
+      return new AmazonKinesisMock(transform(shardedData,
+          new Function<List<TestData>, List<Record>>() {
 
-    @Override
-    public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
-        throw new RuntimeException("Not implemented");
-    }
+            @Override
+            public List<Record> apply(@Nullable List<TestData> testDatas) {
+              return transform(testDatas, new Function<TestData, Record>() {
 
-    @Override
-    public RemoveTagsFromStreamResult removeTagsFromStream(
-            RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
-        throw new RuntimeException("Not implemented");
+                @Override
+                public Record apply(@Nullable TestData testData) {
+                  return testData.convertToRecord();
+                }
+              });
+            }
+          }), numberOfRecordsPerGet);
     }
+  }
 
-    @Override
-    public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
-        throw new RuntimeException("Not implemented");
-    }
+  private final List<List<Record>> shardedData;
+  private final int numberOfRecordsPerGet;
 
-    @Override
-    public SplitShardResult splitShard(String streamName,
-                                       String shardToSplit, String newStartingHashKey) {
-        throw new RuntimeException("Not implemented");
-    }
+  public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
+    this.shardedData = shardedData;
+    this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+  }
 
-    @Override
-    public void shutdown() {
+  @Override
+  public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
+    String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
+    int shardId = parseInt(shardIteratorParts[0]);
+    int startingRecord = parseInt(shardIteratorParts[1]);
+    List<Record> shardData = shardedData.get(shardId);
 
-    }
+    int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
+    int fromIndex = min(startingRecord, toIndex);
+    return new GetRecordsResult().
+        withRecords(shardData.subList(fromIndex, toIndex)).
+        withNextShardIterator(String.format("%s:%s", shardId, toIndex));
+  }
 
-    @Override
-    public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
-        throw new RuntimeException("Not implemented");
-    }
+  @Override
+  public GetShardIteratorResult getShardIterator(
+      GetShardIteratorRequest getShardIteratorRequest) {
+    ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
+        getShardIteratorRequest.getShardIteratorType());
+
+    String shardIterator;
+    if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
+      shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
+    } else {
+      throw new RuntimeException("Not implemented");
+    }
+
+    return new GetShardIteratorResult().withShardIterator(shardIterator);
+  }
+
+  @Override
+  public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
+    int nextShardId = 0;
+    if (exclusiveStartShardId != null) {
+      nextShardId = parseInt(exclusiveStartShardId) + 1;
+    }
+    boolean hasMoreShards = nextShardId + 1 < shardedData.size();
+
+    List<Shard> shards = newArrayList();
+    if (nextShardId < shardedData.size()) {
+      shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
+    }
+
+    return new DescribeStreamResult().withStreamDescription(
+        new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
+    );
+  }
+
+  @Override
+  public void setEndpoint(String endpoint) {
+
+  }
+
+  @Override
+  public void setRegion(Region region) {
+
+  }
+
+  @Override
+  public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public CreateStreamResult createStream(String streamName, Integer shardCount) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
+      DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DeleteStreamResult deleteStream(String streamName) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DescribeStreamResult describeStream(String streamName) {
+
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DescribeStreamResult describeStream(String streamName,
+      Integer limit, String exclusiveStartShardId) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
+      DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
+      EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetShardIteratorResult getShardIterator(String streamName,
+      String shardId,
+      String shardIteratorType) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetShardIteratorResult getShardIterator(String streamName,
+      String shardId,
+      String shardIteratorType,
+      String startingSequenceNumber) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
+      IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListStreamsResult listStreams() {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListStreamsResult listStreams(String exclusiveStartStreamName) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListTagsForStreamResult listTagsForStream(
+      ListTagsForStreamRequest listTagsForStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public MergeShardsResult mergeShards(String streamName,
+      String shardToMerge, String adjacentShardToMerge) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public PutRecordResult putRecord(String streamName, ByteBuffer data,
+      String partitionKey, String sequenceNumberForOrdering) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public RemoveTagsFromStreamResult removeTagsFromStream(
+      RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SplitShardResult splitShard(String streamName,
+      String shardToSplit, String newStartingHashKey) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public void shutdown() {
+
+  }
+
+  @Override
+  public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+    throw new RuntimeException("Not implemented");
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
index 00acffe..0b16bb7 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -18,24 +18,27 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import com.google.common.testing.EqualsTester;
+
 import java.util.NoSuchElementException;
+
 import org.junit.Test;
 
 /**
  * Tests {@link CustomOptional}.
  */
 public class CustomOptionalTest {
-    @Test(expected = NoSuchElementException.class)
-    public void absentThrowsNoSuchElementExceptionOnGet() {
-        CustomOptional.absent().get();
-    }
 
-    @Test
-    public void testEqualsAndHashCode() {
-        new EqualsTester()
-            .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent())
-            .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3))
-            .addEqualityGroup(CustomOptional.of(11))
-            .addEqualityGroup(CustomOptional.of("3")).testEquals();
-    }
+  @Test(expected = NoSuchElementException.class)
+  public void absentThrowsNoSuchElementExceptionOnGet() {
+    CustomOptional.absent().get();
+  }
+
+  @Test
+  public void testEqualsAndHashCode() {
+    new EqualsTester()
+        .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent())
+        .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3))
+        .addEqualityGroup(CustomOptional.of(11))
+        .addEqualityGroup(CustomOptional.of("3")).testEquals();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
index c92ac9a..1bb9717 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -28,30 +28,29 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
-
 /***
  */
 @RunWith(MockitoJUnitRunner.class)
 public class DynamicCheckpointGeneratorTest {
 
-    @Mock
-    private SimplifiedKinesisClient kinesisClient;
-    @Mock
-    private Shard shard1, shard2, shard3;
+  @Mock
+  private SimplifiedKinesisClient kinesisClient;
+  @Mock
+  private Shard shard1, shard2, shard3;
 
-    @Test
-    public void shouldMapAllShardsToCheckpoints() throws Exception {
-        given(shard1.getShardId()).willReturn("shard-01");
-        given(shard2.getShardId()).willReturn("shard-02");
-        given(shard3.getShardId()).willReturn("shard-03");
-        given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
+  @Test
+  public void shouldMapAllShardsToCheckpoints() throws Exception {
+    given(shard1.getShardId()).willReturn("shard-01");
+    given(shard2.getShardId()).willReturn("shard-02");
+    given(shard3.getShardId()).willReturn("shard-03");
+    given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
 
-        StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
-        DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
-                startingPoint);
+    StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
+    DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
+        startingPoint);
 
-        KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
+    KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
 
-        assertThat(checkpoint).hasSize(3);
-    }
+    assertThat(checkpoint).hasSize(3);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index 567e25f..44ad67d 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -21,7 +21,9 @@ import static com.google.common.collect.Lists.newArrayList;
 
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.google.common.collect.Iterables;
+
 import java.util.List;
+
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -36,59 +38,60 @@ import org.junit.Test;
  */
 public class KinesisMockReadTest {
 
-    @Rule
-    public final transient TestPipeline p = TestPipeline.create();
-
-    @Test
-    public void readsDataFromMockKinesis() {
-        int noOfShards = 3;
-        int noOfEventsPerShard = 100;
-        List<List<AmazonKinesisMock.TestData>> testData =
-                provideTestData(noOfShards, noOfEventsPerShard);
-
-        PCollection<AmazonKinesisMock.TestData> result = p
-                .apply(
-                        KinesisIO.read()
-                                .from("stream", InitialPositionInStream.TRIM_HORIZON)
-                                .withClientProvider(new AmazonKinesisMock.Provider(testData, 10))
-                                .withMaxNumRecords(noOfShards * noOfEventsPerShard))
-                .apply(ParDo.of(new KinesisRecordToTestData()));
-        PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
-        p.run();
-    }
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
 
-    private static class KinesisRecordToTestData extends
-            DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
-        @ProcessElement
-        public void processElement(ProcessContext c) throws Exception {
-            c.output(new AmazonKinesisMock.TestData(c.element()));
-        }
-    }
+  @Test
+  public void readsDataFromMockKinesis() {
+    int noOfShards = 3;
+    int noOfEventsPerShard = 100;
+    List<List<AmazonKinesisMock.TestData>> testData =
+        provideTestData(noOfShards, noOfEventsPerShard);
 
-    private List<List<AmazonKinesisMock.TestData>> provideTestData(
-            int noOfShards,
-            int noOfEventsPerShard) {
+    PCollection<AmazonKinesisMock.TestData> result = p
+        .apply(
+            KinesisIO.read()
+                .from("stream", InitialPositionInStream.TRIM_HORIZON)
+                .withClientProvider(new AmazonKinesisMock.Provider(testData, 10))
+                .withMaxNumRecords(noOfShards * noOfEventsPerShard))
+        .apply(ParDo.of(new KinesisRecordToTestData()));
+    PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
+    p.run();
+  }
 
-        int seqNumber = 0;
+  private static class KinesisRecordToTestData extends
+      DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
 
-        List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
-        for (int i = 0; i < noOfShards; ++i) {
-            List<AmazonKinesisMock.TestData> shardData = newArrayList();
-            shardedData.add(shardData);
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(new AmazonKinesisMock.TestData(c.element()));
+    }
+  }
+
+  private List<List<AmazonKinesisMock.TestData>> provideTestData(
+      int noOfShards,
+      int noOfEventsPerShard) {
 
-            DateTime arrival = DateTime.now();
-            for (int j = 0; j < noOfEventsPerShard; ++j) {
-                arrival = arrival.plusSeconds(1);
+    int seqNumber = 0;
 
-                seqNumber++;
-                shardData.add(new AmazonKinesisMock.TestData(
-                        Integer.toString(seqNumber),
-                        arrival.toInstant(),
-                        Integer.toString(seqNumber))
-                );
-            }
-        }
+    List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
+    for (int i = 0; i < noOfShards; ++i) {
+      List<AmazonKinesisMock.TestData> shardData = newArrayList();
+      shardedData.add(shardData);
 
-        return shardedData;
+      DateTime arrival = DateTime.now();
+      for (int j = 0; j < noOfEventsPerShard; ++j) {
+        arrival = arrival.plusSeconds(1);
+
+        seqNumber++;
+        shardData.add(new AmazonKinesisMock.TestData(
+            Integer.toString(seqNumber),
+            arrival.toInstant(),
+            Integer.toString(seqNumber))
+        );
+      }
     }
+
+    return shardedData;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
index 8c8da64..1038a47 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -17,13 +17,14 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.google.common.collect.Iterables;
+
 import java.util.Iterator;
 import java.util.List;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -35,33 +36,34 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class KinesisReaderCheckpointTest {
-    @Mock
-    private ShardCheckpoint a, b, c;
 
-    private KinesisReaderCheckpoint checkpoint;
+  @Mock
+  private ShardCheckpoint a, b, c;
+
+  private KinesisReaderCheckpoint checkpoint;
 
-    @Before
-    public void setUp() {
-        checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
-    }
+  @Before
+  public void setUp() {
+    checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
+  }
 
-    @Test
-    public void splitsCheckpointAccordingly() {
-        verifySplitInto(1);
-        verifySplitInto(2);
-        verifySplitInto(3);
-        verifySplitInto(4);
-    }
+  @Test
+  public void splitsCheckpointAccordingly() {
+    verifySplitInto(1);
+    verifySplitInto(2);
+    verifySplitInto(3);
+    verifySplitInto(4);
+  }
 
-    @Test(expected = UnsupportedOperationException.class)
-    public void isImmutable() {
-        Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
-        iterator.remove();
-    }
+  @Test(expected = UnsupportedOperationException.class)
+  public void isImmutable() {
+    Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
+    iterator.remove();
+  }
 
-    private void verifySplitInto(int size) {
-        List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
-        assertThat(Iterables.concat(split)).containsOnly(a, b, c);
-        assertThat(split).hasSize(Math.min(size, 3));
-    }
+  private void verifySplitInto(int size) {
+    List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
+    assertThat(Iterables.concat(split)).containsOnly(a, b, c);
+    assertThat(split).hasSize(Math.min(size, 3));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index 8eb6546..5781033 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.amazonaws.regions.Regions;
+
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -31,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
@@ -50,72 +52,75 @@ import org.junit.Test;
  * You need to provide all {@link KinesisTestOptions} in order to run this.
  */
 public class KinesisReaderIT {
-    private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
-    private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
-
-    @Rule
-    public final transient TestPipeline p = TestPipeline.create();
-
-    @Ignore
-    @Test
-    public void readsDataFromRealKinesisStream()
-            throws IOException, InterruptedException, ExecutionException {
-        KinesisTestOptions options = readKinesisOptions();
-        List<String> testData = prepareTestData(1000);
-
-        Future<?> future = startTestPipeline(testData, options);
-        KinesisUploader.uploadAll(testData, options);
-        future.get();
-    }
 
-    private List<String> prepareTestData(int count) {
-        List<String> data = newArrayList();
-        for (int i = 0; i < count; ++i) {
-            data.add(RandomStringUtils.randomAlphabetic(32));
-        }
-        return data;
-    }
+  private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
+  private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
 
-    private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
-            throws InterruptedException {
-
-        PCollection<String> result = p.
-                apply(KinesisIO.read()
-                        .from(options.getAwsKinesisStream(), Instant.now())
-                        .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(),
-                                Regions.fromName(options.getAwsKinesisRegion()))
-                        .withMaxReadTime(Duration.standardMinutes(3))
-                ).
-                apply(ParDo.of(new RecordDataToString()));
-        PAssert.that(result).containsInAnyOrder(testData);
-
-        Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                PipelineResult result = p.run();
-                PipelineResult.State state = result.getState();
-                while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
-                    Thread.sleep(1000);
-                    state = result.getState();
-                }
-                assertThat(state).isEqualTo(PipelineResult.State.DONE);
-                return null;
-            }
-        });
-        Thread.sleep(PIPELINE_STARTUP_TIME);
-        return future;
-    }
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Ignore
+  @Test
+  public void readsDataFromRealKinesisStream()
+      throws IOException, InterruptedException, ExecutionException {
+    KinesisTestOptions options = readKinesisOptions();
+    List<String> testData = prepareTestData(1000);
 
-    private KinesisTestOptions readKinesisOptions() {
-        PipelineOptionsFactory.register(KinesisTestOptions.class);
-        return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+    Future<?> future = startTestPipeline(testData, options);
+    KinesisUploader.uploadAll(testData, options);
+    future.get();
+  }
+
+  private List<String> prepareTestData(int count) {
+    List<String> data = newArrayList();
+    for (int i = 0; i < count; ++i) {
+      data.add(RandomStringUtils.randomAlphabetic(32));
     }
+    return data;
+  }
+
+  private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
+      throws InterruptedException {
+
+    PCollection<String> result = p.
+        apply(KinesisIO.read()
+            .from(options.getAwsKinesisStream(), Instant.now())
+            .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(),
+                Regions.fromName(options.getAwsKinesisRegion()))
+            .withMaxReadTime(Duration.standardMinutes(3))
+        ).
+        apply(ParDo.of(new RecordDataToString()));
+    PAssert.that(result).containsInAnyOrder(testData);
+
+    Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
 
-    private static class RecordDataToString extends DoFn<KinesisRecord, String> {
-        @ProcessElement
-        public void processElement(ProcessContext c) throws Exception {
-            checkNotNull(c.element(), "Null record given");
-            c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+      @Override
+      public Void call() throws Exception {
+        PipelineResult result = p.run();
+        PipelineResult.State state = result.getState();
+        while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
+          Thread.sleep(1000);
+          state = result.getState();
         }
+        assertThat(state).isEqualTo(PipelineResult.State.DONE);
+        return null;
+      }
+    });
+    Thread.sleep(PIPELINE_STARTUP_TIME);
+    return future;
+  }
+
+  private KinesisTestOptions readKinesisOptions() {
+    PipelineOptionsFactory.register(KinesisTestOptions.class);
+    return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+  }
+
+  private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      checkNotNull(c.element(), "Null record given");
+      c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 3111029..a26501a 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -34,87 +35,88 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class KinesisReaderTest {
-    @Mock
-    private SimplifiedKinesisClient kinesis;
-    @Mock
-    private CheckpointGenerator generator;
-    @Mock
-    private ShardCheckpoint firstCheckpoint, secondCheckpoint;
-    @Mock
-    private ShardRecordsIterator firstIterator, secondIterator;
-    @Mock
-    private KinesisRecord a, b, c, d;
-
-    private KinesisReader reader;
-
-    @Before
-    public void setUp() throws IOException, TransientKinesisException {
-        when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
-                asList(firstCheckpoint, secondCheckpoint)
-        ));
-        when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
-        when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
-        when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
-        when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
-
-        reader = new KinesisReader(kinesis, generator, null);
-    }
-
-    @Test
-    public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
-        assertThat(reader.start()).isFalse();
-    }
-
-    @Test(expected = NoSuchElementException.class)
-    public void throwsNoSuchElementExceptionIfNoData() throws IOException {
-        reader.start();
-        reader.getCurrent();
-    }
-
-    @Test
-    public void startReturnsTrueIfSomeDataAvailable() throws IOException,
-            TransientKinesisException {
-        when(firstIterator.next()).
-                thenReturn(CustomOptional.of(a)).
-                thenReturn(CustomOptional.<KinesisRecord>absent());
-
-        assertThat(reader.start()).isTrue();
-    }
-
-    @Test
-    public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
-            throws IOException, TransientKinesisException {
-        reader.start();
-
-        when(firstIterator.next()).thenThrow(TransientKinesisException.class);
-
-        assertThat(reader.advance()).isFalse();
-    }
-
-    @Test
-    public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
-        when(firstIterator.next()).
-                thenReturn(CustomOptional.<KinesisRecord>absent()).
-                thenReturn(CustomOptional.of(a)).
-                thenReturn(CustomOptional.<KinesisRecord>absent()).
-                thenReturn(CustomOptional.of(b)).
-                thenReturn(CustomOptional.<KinesisRecord>absent());
-
-        when(secondIterator.next()).
-                thenReturn(CustomOptional.of(c)).
-                thenReturn(CustomOptional.<KinesisRecord>absent()).
-                thenReturn(CustomOptional.of(d)).
-                thenReturn(CustomOptional.<KinesisRecord>absent());
-
-        assertThat(reader.start()).isTrue();
-        assertThat(reader.getCurrent()).isEqualTo(c);
-        assertThat(reader.advance()).isTrue();
-        assertThat(reader.getCurrent()).isEqualTo(a);
-        assertThat(reader.advance()).isTrue();
-        assertThat(reader.getCurrent()).isEqualTo(d);
-        assertThat(reader.advance()).isTrue();
-        assertThat(reader.getCurrent()).isEqualTo(b);
-        assertThat(reader.advance()).isFalse();
-    }
+
+  @Mock
+  private SimplifiedKinesisClient kinesis;
+  @Mock
+  private CheckpointGenerator generator;
+  @Mock
+  private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+  @Mock
+  private ShardRecordsIterator firstIterator, secondIterator;
+  @Mock
+  private KinesisRecord a, b, c, d;
+
+  private KinesisReader reader;
+
+  @Before
+  public void setUp() throws IOException, TransientKinesisException {
+    when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
+        asList(firstCheckpoint, secondCheckpoint)
+    ));
+    when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
+    when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
+    when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+    when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+
+    reader = new KinesisReader(kinesis, generator, null);
+  }
+
+  @Test
+  public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
+    assertThat(reader.start()).isFalse();
+  }
+
+  @Test(expected = NoSuchElementException.class)
+  public void throwsNoSuchElementExceptionIfNoData() throws IOException {
+    reader.start();
+    reader.getCurrent();
+  }
+
+  @Test
+  public void startReturnsTrueIfSomeDataAvailable() throws IOException,
+      TransientKinesisException {
+    when(firstIterator.next()).
+        thenReturn(CustomOptional.of(a)).
+        thenReturn(CustomOptional.<KinesisRecord>absent());
+
+    assertThat(reader.start()).isTrue();
+  }
+
+  @Test
+  public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
+      throws IOException, TransientKinesisException {
+    reader.start();
+
+    when(firstIterator.next()).thenThrow(TransientKinesisException.class);
+
+    assertThat(reader.advance()).isFalse();
+  }
+
+  @Test
+  public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
+    when(firstIterator.next()).
+        thenReturn(CustomOptional.<KinesisRecord>absent()).
+        thenReturn(CustomOptional.of(a)).
+        thenReturn(CustomOptional.<KinesisRecord>absent()).
+        thenReturn(CustomOptional.of(b)).
+        thenReturn(CustomOptional.<KinesisRecord>absent());
+
+    when(secondIterator.next()).
+        thenReturn(CustomOptional.of(c)).
+        thenReturn(CustomOptional.<KinesisRecord>absent()).
+        thenReturn(CustomOptional.of(d)).
+        thenReturn(CustomOptional.<KinesisRecord>absent());
+
+    assertThat(reader.start()).isTrue();
+    assertThat(reader.getCurrent()).isEqualTo(c);
+    assertThat(reader.advance()).isTrue();
+    assertThat(reader.getCurrent()).isEqualTo(a);
+    assertThat(reader.advance()).isTrue();
+    assertThat(reader.getCurrent()).isEqualTo(d);
+    assertThat(reader.advance()).isTrue();
+    assertThat(reader.getCurrent()).isEqualTo(b);
+    assertThat(reader.advance()).isFalse();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
index 8771c86..c9f01bb 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import java.nio.ByteBuffer;
+
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -26,20 +27,21 @@ import org.junit.Test;
  * Tests {@link KinesisRecordCoder}.
  */
 public class KinesisRecordCoderTest {
-    @Test
-    public void encodingAndDecodingWorks() throws Exception {
-        KinesisRecord record = new KinesisRecord(
-                ByteBuffer.wrap("data".getBytes()),
-                "sequence",
-                128L,
-                "partition",
-                Instant.now(),
-                Instant.now(),
-                "stream",
-                "shard"
-        );
-        CoderProperties.coderDecodeEncodeEqual(
-                new KinesisRecordCoder(), record
-        );
-    }
+
+  @Test
+  public void encodingAndDecodingWorks() throws Exception {
+    KinesisRecord record = new KinesisRecord(
+        ByteBuffer.wrap("data".getBytes()),
+        "sequence",
+        128L,
+        "partition",
+        Instant.now(),
+        Instant.now(),
+        "stream",
+        "shard"
+    );
+    CoderProperties.coderDecodeEncodeEqual(
+        new KinesisRecordCoder(), record
+    );
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
index 324de46..76bcb27 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
@@ -25,23 +25,28 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
  * Options for Kinesis integration tests.
  */
 public interface KinesisTestOptions extends TestPipelineOptions {
-    @Description("AWS region where Kinesis stream resided")
-    @Default.String("aws-kinesis-region")
-    String getAwsKinesisRegion();
-    void setAwsKinesisRegion(String value);
-
-    @Description("Kinesis stream name")
-    @Default.String("aws-kinesis-stream")
-    String getAwsKinesisStream();
-    void setAwsKinesisStream(String value);
-
-    @Description("AWS secret key")
-    @Default.String("aws-secret-key")
-    String getAwsSecretKey();
-    void setAwsSecretKey(String value);
-
-    @Description("AWS access key")
-    @Default.String("aws-access-key")
-    String getAwsAccessKey();
-    void setAwsAccessKey(String value);
+
+  @Description("AWS region where Kinesis stream resided")
+  @Default.String("aws-kinesis-region")
+  String getAwsKinesisRegion();
+
+  void setAwsKinesisRegion(String value);
+
+  @Description("Kinesis stream name")
+  @Default.String("aws-kinesis-stream")
+  String getAwsKinesisStream();
+
+  void setAwsKinesisStream(String value);
+
+  @Description("AWS secret key")
+  @Default.String("aws-secret-key")
+  String getAwsSecretKey();
+
+  void setAwsSecretKey(String value);
+
+  @Description("AWS access key")
+  @Default.String("aws-access-key")
+  String getAwsAccessKey();
+
+  void setAwsAccessKey(String value);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
index 7518ff7..7a7cb02 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -29,6 +29,7 @@ import com.amazonaws.services.kinesis.model.PutRecordsResult;
 import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+
 import java.nio.ByteBuffer;
 import java.util.List;
 
@@ -37,47 +38,46 @@ import java.util.List;
  */
 public class KinesisUploader {
 
-    public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
-
-    public static void uploadAll(List<String> data, KinesisTestOptions options) {
-        AmazonKinesisClient client = new AmazonKinesisClient(
-                new StaticCredentialsProvider(
-                        new BasicAWSCredentials(
-                                options.getAwsAccessKey(), options.getAwsSecretKey()))
-        ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+  public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
 
-        List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
+  public static void uploadAll(List<String> data, KinesisTestOptions options) {
+    AmazonKinesisClient client = new AmazonKinesisClient(
+        new StaticCredentialsProvider(
+            new BasicAWSCredentials(
+                options.getAwsAccessKey(), options.getAwsSecretKey()))
+    ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
 
+    List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
 
-        for (List<String> partition : partitions) {
-            List<PutRecordsRequestEntry> allRecords = newArrayList();
-            for (String row : partition) {
-                allRecords.add(new PutRecordsRequestEntry().
-                        withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
-                        withPartitionKey(Integer.toString(row.hashCode()))
+    for (List<String> partition : partitions) {
+      List<PutRecordsRequestEntry> allRecords = newArrayList();
+      for (String row : partition) {
+        allRecords.add(new PutRecordsRequestEntry().
+            withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
+            withPartitionKey(Integer.toString(row.hashCode()))
 
-                );
-            }
+        );
+      }
 
-            PutRecordsResult result;
-            do {
-                result = client.putRecords(
-                        new PutRecordsRequest().
-                                withStreamName(options.getAwsKinesisStream()).
-                                withRecords(allRecords));
-                List<PutRecordsRequestEntry> failedRecords = newArrayList();
-                int i = 0;
-                for (PutRecordsResultEntry row : result.getRecords()) {
-                    if (row.getErrorCode() != null) {
-                        failedRecords.add(allRecords.get(i));
-                    }
-                    ++i;
-                }
-                allRecords = failedRecords;
-            }
-
-            while (result.getFailedRecordCount() > 0);
+      PutRecordsResult result;
+      do {
+        result = client.putRecords(
+            new PutRecordsRequest().
+                withStreamName(options.getAwsKinesisStream()).
+                withRecords(allRecords));
+        List<PutRecordsRequestEntry> failedRecords = newArrayList();
+        int i = 0;
+        for (PutRecordsResultEntry row : result.getRecords()) {
+          if (row.getErrorCode() != null) {
+            failedRecords.add(allRecords.get(i));
+          }
+          ++i;
         }
+        allRecords = failedRecords;
+      }
+
+      while (result.getFailedRecordCount() > 0);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
index f979c01..cb32562 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -20,47 +20,49 @@ package org.apache.beam.sdk.io.kinesis;
 import static org.mockito.BDDMockito.given;
 
 import com.google.common.collect.Lists;
+
 import java.util.Collections;
 import java.util.List;
+
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
-
 /***
  */
 @RunWith(MockitoJUnitRunner.class)
 public class RecordFilterTest {
-    @Mock
-    private ShardCheckpoint checkpoint;
-    @Mock
-    private KinesisRecord record1, record2, record3, record4, record5;
 
-    @Test
-    public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
-        given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
-        given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
-        given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
-        given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
-        given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
-        List<KinesisRecord> records = Lists.newArrayList(record1, record2,
-                record3, record4, record5);
-        RecordFilter underTest = new RecordFilter();
+  @Mock
+  private ShardCheckpoint checkpoint;
+  @Mock
+  private KinesisRecord record1, record2, record3, record4, record5;
+
+  @Test
+  public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
+    given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
+    given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
+    given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
+    given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
+    given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
+    List<KinesisRecord> records = Lists.newArrayList(record1, record2,
+        record3, record4, record5);
+    RecordFilter underTest = new RecordFilter();
 
-        List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+    List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
 
-        Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
-    }
+    Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
+  }
 
-    @Test
-    public void shouldNotFailOnEmptyList() {
-        List<KinesisRecord> records = Collections.emptyList();
-        RecordFilter underTest = new RecordFilter();
+  @Test
+  public void shouldNotFailOnEmptyList() {
+    List<KinesisRecord> records = Collections.emptyList();
+    RecordFilter underTest = new RecordFilter();
 
-        List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+    List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
 
-        Assertions.assertThat(retainedRecords).isEmpty();
-    }
+    Assertions.assertThat(retainedRecords).isEmpty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
index f032eea..e4abce4 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -22,36 +22,38 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Collections;
 import java.util.List;
+
 import org.junit.Test;
 
 /**
  * Tests {@link RoundRobin}.
  */
 public class RoundRobinTest {
-    @Test(expected = IllegalArgumentException.class)
-    public void doesNotAllowCreationWithEmptyCollection() {
-        new RoundRobin<>(Collections.emptyList());
-    }
 
-    @Test
-    public void goesThroughElementsInCycle() {
-        List<String> input = newArrayList("a", "b", "c");
+  @Test(expected = IllegalArgumentException.class)
+  public void doesNotAllowCreationWithEmptyCollection() {
+    new RoundRobin<>(Collections.emptyList());
+  }
 
-        RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+  @Test
+  public void goesThroughElementsInCycle() {
+    List<String> input = newArrayList("a", "b", "c");
 
-        input.addAll(input);  // duplicate the input
-        for (String element : input) {
-            assertThat(roundRobin.getCurrent()).isEqualTo(element);
-            assertThat(roundRobin.getCurrent()).isEqualTo(element);
-            roundRobin.moveForward();
-        }
+    RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+
+    input.addAll(input);  // duplicate the input
+    for (String element : input) {
+      assertThat(roundRobin.getCurrent()).isEqualTo(element);
+      assertThat(roundRobin.getCurrent()).isEqualTo(element);
+      roundRobin.moveForward();
     }
+  }
 
-    @Test
-    public void usualIteratorGoesThroughElementsOnce() {
-        List<String> input = newArrayList("a", "b", "c");
+  @Test
+  public void usualIteratorGoesThroughElementsOnce() {
+    List<String> input = newArrayList("a", "b", "c");
 
-        RoundRobin<String> roundRobin = new RoundRobin<>(input);
-        assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
-    }
+    RoundRobin<String> roundRobin = new RoundRobin<>(input);
+    assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21fd3028/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
index 39ab36f..d4784c4 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -32,7 +32,9 @@ import static org.mockito.Mockito.when;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import java.io.IOException;
+
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -46,104 +48,105 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class ShardCheckpointTest {
-    private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
-    private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
-    private static final String STREAM_NAME = "STREAM";
-    private static final String SHARD_ID = "SHARD_ID";
-    @Mock
-    private SimplifiedKinesisClient client;
-
-    @Before
-    public void setUp() throws IOException, TransientKinesisException {
-        when(client.getShardIterator(
-                eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
-                anyString(), isNull(Instant.class))).
-                thenReturn(AT_SEQUENCE_SHARD_IT);
-        when(client.getShardIterator(
-                eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
-                anyString(), isNull(Instant.class))).
-                thenReturn(AFTER_SEQUENCE_SHARD_IT);
-    }
-
-    @Test
-    public void testProvidingShardIterator() throws IOException, TransientKinesisException {
-        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
-                .isEqualTo(AT_SEQUENCE_SHARD_IT);
-        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
-                .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
-        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
-                (AT_SEQUENCE_SHARD_IT);
-        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
-                .isEqualTo(AT_SEQUENCE_SHARD_IT);
-    }
-
-    @Test
-    public void testComparisonWithExtendedSequenceNumber() {
-        assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isTrue();
-
-        assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isTrue();
-
-        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isTrue();
-
-        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isTrue();
-
-        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isFalse();
-
-        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isFalse();
-
-        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("99", 1L))
-        )).isFalse();
-    }
-
-    @Test
-    public void testComparisonWithTimestamp() {
-        DateTime referenceTimestamp = DateTime.now();
-
-        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
-                .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
-        ).isFalse();
-
-        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
-                .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
-        ).isTrue();
-
-        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
-                .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
-        ).isTrue();
-    }
-
-    private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
-        KinesisRecord record = mock(KinesisRecord.class);
-        given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
-        return record;
-    }
-
-    private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
-                                       Long subSequenceNumber) {
-        return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
-                subSequenceNumber);
-    }
-
-    private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
-        KinesisRecord record = mock(KinesisRecord.class);
-        given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
-        return record;
-    }
-
-    private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
-        return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
-    }
+
+  private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
+  private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
+  private static final String STREAM_NAME = "STREAM";
+  private static final String SHARD_ID = "SHARD_ID";
+  @Mock
+  private SimplifiedKinesisClient client;
+
+  @Before
+  public void setUp() throws IOException, TransientKinesisException {
+    when(client.getShardIterator(
+        eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
+        anyString(), isNull(Instant.class))).
+        thenReturn(AT_SEQUENCE_SHARD_IT);
+    when(client.getShardIterator(
+        eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
+        anyString(), isNull(Instant.class))).
+        thenReturn(AFTER_SEQUENCE_SHARD_IT);
+  }
+
+  @Test
+  public void testProvidingShardIterator() throws IOException, TransientKinesisException {
+    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+        .isEqualTo(AT_SEQUENCE_SHARD_IT);
+    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+        .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
+    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
+        (AT_SEQUENCE_SHARD_IT);
+    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
+        .isEqualTo(AT_SEQUENCE_SHARD_IT);
+  }
+
+  @Test
+  public void testComparisonWithExtendedSequenceNumber() {
+    assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isTrue();
+
+    assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isTrue();
+
+    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isTrue();
+
+    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isTrue();
+
+    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isFalse();
+
+    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isFalse();
+
+    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("99", 1L))
+    )).isFalse();
+  }
+
+  @Test
+  public void testComparisonWithTimestamp() {
+    DateTime referenceTimestamp = DateTime.now();
+
+    assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+        .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
+    ).isFalse();
+
+    assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+        .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
+    ).isTrue();
+
+    assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+        .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
+    ).isTrue();
+  }
+
+  private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
+    KinesisRecord record = mock(KinesisRecord.class);
+    given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
+    return record;
+  }
+
+  private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
+      Long subSequenceNumber) {
+    return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
+        subSequenceNumber);
+  }
+
+  private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
+    KinesisRecord record = mock(KinesisRecord.class);
+    given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
+    return record;
+  }
+
+  private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
+    return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
+  }
 }


[4/4] beam git commit: This closes #3389

Posted by jb...@apache.org.
This closes #3389


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/138641f1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/138641f1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/138641f1

Branch: refs/heads/master
Commit: 138641f140b037841f9a1d2cc8e523ad29dc9518
Parents: af08f53 21fd302
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Tue Jul 11 15:47:53 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Jul 11 15:47:53 2017 +0200

----------------------------------------------------------------------
 .../sdk/io/kinesis/CheckpointGenerator.java     |   6 +-
 .../beam/sdk/io/kinesis/CustomOptional.java     | 111 ++--
 .../io/kinesis/DynamicCheckpointGenerator.java  |  52 +-
 .../sdk/io/kinesis/GetKinesisRecordsResult.java |  49 +-
 .../sdk/io/kinesis/KinesisClientProvider.java   |   4 +-
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   | 279 +++++-----
 .../beam/sdk/io/kinesis/KinesisReader.java      | 206 +++----
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |  97 ++--
 .../beam/sdk/io/kinesis/KinesisRecord.java      | 177 +++---
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  68 +--
 .../beam/sdk/io/kinesis/KinesisSource.java      | 147 ++---
 .../beam/sdk/io/kinesis/RecordFilter.java       |  18 +-
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |  37 +-
 .../beam/sdk/io/kinesis/ShardCheckpoint.java    | 241 ++++-----
 .../sdk/io/kinesis/ShardRecordsIterator.java    | 106 ++--
 .../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 ++++----
 .../beam/sdk/io/kinesis/StartingPoint.java      |  84 +--
 .../io/kinesis/StaticCheckpointGenerator.java   |  27 +-
 .../io/kinesis/TransientKinesisException.java   |   7 +-
 .../beam/sdk/io/kinesis/AmazonKinesisMock.java  | 539 ++++++++++---------
 .../beam/sdk/io/kinesis/CustomOptionalTest.java |  27 +-
 .../kinesis/DynamicCheckpointGeneratorTest.java |  33 +-
 .../sdk/io/kinesis/KinesisMockReadTest.java     |  97 ++--
 .../io/kinesis/KinesisReaderCheckpointTest.java |  52 +-
 .../beam/sdk/io/kinesis/KinesisReaderIT.java    | 127 ++---
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  | 166 +++---
 .../sdk/io/kinesis/KinesisRecordCoderTest.java  |  34 +-
 .../beam/sdk/io/kinesis/KinesisTestOptions.java |  43 +-
 .../beam/sdk/io/kinesis/KinesisUploader.java    |  70 +--
 .../beam/sdk/io/kinesis/RecordFilterTest.java   |  52 +-
 .../beam/sdk/io/kinesis/RoundRobinTest.java     |  42 +-
 .../sdk/io/kinesis/ShardCheckpointTest.java     | 203 +++----
 .../io/kinesis/ShardRecordsIteratorTest.java    | 216 ++++----
 .../io/kinesis/SimplifiedKinesisClientTest.java | 351 ++++++------
 34 files changed, 2031 insertions(+), 1952 deletions(-)
----------------------------------------------------------------------