You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:32 UTC

[05/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index 4b2190f..49e806d 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -25,10 +25,8 @@ import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.when;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-
 import java.io.IOException;
 import java.util.Collections;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,114 +40,112 @@ import org.mockito.stubbing.Answer;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class ShardRecordsIteratorTest {
+    private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
+    private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
+    private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
+    private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
+    private static final String STREAM_NAME = "STREAM_NAME";
+    private static final String SHARD_ID = "SHARD_ID";
+
+    @Mock
+    private SimplifiedKinesisClient kinesisClient;
+    @Mock
+    private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
+    @Mock
+    private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
+    @Mock
+    private KinesisRecord a, b, c, d;
+    @Mock
+    private RecordFilter recordFilter;
+
+    private ShardRecordsIterator iterator;
+
+    @Before
+    public void setUp() throws IOException, TransientKinesisException {
+        when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
+        when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+        when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
+        when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
+        when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
+        when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
+        when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
+        when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
+        when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
+        when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+        when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(firstResult);
+        when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(secondResult);
+        when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(thirdResult);
+
+        when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
+        when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+        when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+
+        when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+        when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+        when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+        when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
+                .class))).thenAnswer(new IdentityAnswer());
+
+        iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
+    }
+
+    @Test
+    public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    }
+
+    @Test
+    public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+        when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+        when(secondResult.getRecords()).thenReturn(singletonList(d));
+
+        assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+        assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+        assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+        assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
+        assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+        assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+    }
+
+    @Test
+    public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
+        when(firstResult.getRecords()).thenReturn(singletonList(a));
+        when(secondResult.getRecords()).thenReturn(singletonList(b));
+
+        when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenThrow(ExpiredIteratorException.class);
+        when(aCheckpoint.getShardIterator(kinesisClient))
+                .thenReturn(SECOND_REFRESHED_ITERATOR);
+        when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(secondResult);
+
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    }
 
-  private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
-  private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
-  private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
-  private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
-  private static final String STREAM_NAME = "STREAM_NAME";
-  private static final String SHARD_ID = "SHARD_ID";
-
-  @Mock
-  private SimplifiedKinesisClient kinesisClient;
-  @Mock
-  private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
-  @Mock
-  private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
-  @Mock
-  private KinesisRecord a, b, c, d;
-  @Mock
-  private RecordFilter recordFilter;
-
-  private ShardRecordsIterator iterator;
-
-  @Before
-  public void setUp() throws IOException, TransientKinesisException {
-    when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
-    when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-    when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
-    when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
-    when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
-    when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-    when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
-    when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
-    when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-    when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
-    when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
-    when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-    when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
-    when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
-    when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
-    when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
-        .thenReturn(firstResult);
-    when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
-        .thenReturn(secondResult);
-    when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
-        .thenReturn(thirdResult);
-
-    when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
-    when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
-    when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
-
-    when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-    when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-    when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-
-    when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
-        .class))).thenAnswer(new IdentityAnswer());
-
-    iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
-  }
-
-  @Test
-  public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
-    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-  }
-
-  @Test
-  public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
-    when(firstResult.getRecords()).thenReturn(asList(a, b, c));
-    when(secondResult.getRecords()).thenReturn(singletonList(d));
-
-    assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
-    assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
-    assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
-    assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
-    assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
-    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-    assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
-  }
-
-  @Test
-  public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
-    when(firstResult.getRecords()).thenReturn(singletonList(a));
-    when(secondResult.getRecords()).thenReturn(singletonList(b));
-
-    when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
-        .thenThrow(ExpiredIteratorException.class);
-    when(aCheckpoint.getShardIterator(kinesisClient))
-        .thenReturn(SECOND_REFRESHED_ITERATOR);
-    when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
-        .thenReturn(secondResult);
-
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
-    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-  }
-
-  private static class IdentityAnswer implements Answer<Object> {
-
-    @Override
-    public Object answer(InvocationOnMock invocation) throws Throwable {
-      return invocation.getArguments()[0];
+    private static class IdentityAnswer implements Answer<Object> {
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+            return invocation.getArguments()[0];
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
index 2f8757c..96434fd 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -34,9 +34,7 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
-
 import java.util.List;
-
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -48,180 +46,179 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class SimplifiedKinesisClientTest {
+    private static final String STREAM = "stream";
+    private static final String SHARD_1 = "shard-01";
+    private static final String SHARD_2 = "shard-02";
+    private static final String SHARD_3 = "shard-03";
+    private static final String SHARD_ITERATOR = "iterator";
+    private static final String SEQUENCE_NUMBER = "abc123";
+
+    @Mock
+    private AmazonKinesis kinesis;
+    @InjectMocks
+    private SimplifiedKinesisClient underTest;
+
+    @Test
+    public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
+        given(kinesis.getShardIterator(new GetShardIteratorRequest()
+                .withStreamName(STREAM)
+                .withShardId(SHARD_1)
+                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+                .withStartingSequenceNumber(SEQUENCE_NUMBER)
+        )).willReturn(new GetShardIteratorResult()
+                .withShardIterator(SHARD_ITERATOR));
+
+        String stream = underTest.getShardIterator(STREAM, SHARD_1,
+                ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
+
+        assertThat(stream).isEqualTo(SHARD_ITERATOR);
+    }
+
+    @Test
+    public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
+        Instant timestamp = Instant.now();
+        given(kinesis.getShardIterator(new GetShardIteratorRequest()
+                .withStreamName(STREAM)
+                .withShardId(SHARD_1)
+                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+                .withTimestamp(timestamp.toDate())
+        )).willReturn(new GetShardIteratorResult()
+                .withShardIterator(SHARD_ITERATOR));
+
+        String stream = underTest.getShardIterator(STREAM, SHARD_1,
+                ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
+
+        assertThat(stream).isEqualTo(SHARD_ITERATOR);
+    }
+
+    @Test
+    public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
+                ExpiredIteratorException.class);
+    }
+
+    @Test
+    public void shouldHandleLimitExceededExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new LimitExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleServiceErrorForGetShardIterator() {
+        shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleClientErrorForGetShardIterator() {
+        shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
+                RuntimeException.class);
+    }
+
+    @Test
+    public void shouldHandleUnexpectedExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new NullPointerException(),
+                RuntimeException.class);
+    }
+
+    private void shouldHandleGetShardIteratorError(
+            Exception thrownException,
+            Class<? extends Exception> expectedExceptionClass) {
+        GetShardIteratorRequest request = new GetShardIteratorRequest()
+                .withStreamName(STREAM)
+                .withShardId(SHARD_1)
+                .withShardIteratorType(ShardIteratorType.LATEST);
+
+        given(kinesis.getShardIterator(request)).willThrow(thrownException);
+
+        try {
+            underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
+            failBecauseExceptionWasNotThrown(expectedExceptionClass);
+        } catch (Exception e) {
+            assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+        } finally {
+            reset(kinesis);
+        }
+    }
+
+    @Test
+    public void shouldListAllShards() throws Exception {
+        Shard shard1 = new Shard().withShardId(SHARD_1);
+        Shard shard2 = new Shard().withShardId(SHARD_2);
+        Shard shard3 = new Shard().withShardId(SHARD_3);
+        given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
+                .withStreamDescription(new StreamDescription()
+                        .withShards(shard1, shard2)
+                        .withHasMoreShards(true)));
+        given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
+                .withStreamDescription(new StreamDescription()
+                        .withShards(shard3)
+                        .withHasMoreShards(false)));
+
+        List<Shard> shards = underTest.listShards(STREAM);
+
+        assertThat(shards).containsOnly(shard1, shard2, shard3);
+    }
+
+    @Test
+    public void shouldHandleExpiredIterationExceptionForShardListing() {
+        shouldHandleShardListingError(new ExpiredIteratorException(""),
+                ExpiredIteratorException.class);
+    }
+
+    @Test
+    public void shouldHandleLimitExceededExceptionForShardListing() {
+        shouldHandleShardListingError(new LimitExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
+        shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
+                TransientKinesisException.class);
+    }
 
-  private static final String STREAM = "stream";
-  private static final String SHARD_1 = "shard-01";
-  private static final String SHARD_2 = "shard-02";
-  private static final String SHARD_3 = "shard-03";
-  private static final String SHARD_ITERATOR = "iterator";
-  private static final String SEQUENCE_NUMBER = "abc123";
-
-  @Mock
-  private AmazonKinesis kinesis;
-  @InjectMocks
-  private SimplifiedKinesisClient underTest;
-
-  @Test
-  public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
-    given(kinesis.getShardIterator(new GetShardIteratorRequest()
-        .withStreamName(STREAM)
-        .withShardId(SHARD_1)
-        .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
-        .withStartingSequenceNumber(SEQUENCE_NUMBER)
-    )).willReturn(new GetShardIteratorResult()
-        .withShardIterator(SHARD_ITERATOR));
-
-    String stream = underTest.getShardIterator(STREAM, SHARD_1,
-        ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
-
-    assertThat(stream).isEqualTo(SHARD_ITERATOR);
-  }
-
-  @Test
-  public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
-    Instant timestamp = Instant.now();
-    given(kinesis.getShardIterator(new GetShardIteratorRequest()
-        .withStreamName(STREAM)
-        .withShardId(SHARD_1)
-        .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
-        .withTimestamp(timestamp.toDate())
-    )).willReturn(new GetShardIteratorResult()
-        .withShardIterator(SHARD_ITERATOR));
-
-    String stream = underTest.getShardIterator(STREAM, SHARD_1,
-        ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
-
-    assertThat(stream).isEqualTo(SHARD_ITERATOR);
-  }
-
-  @Test
-  public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
-    shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
-        ExpiredIteratorException.class);
-  }
-
-  @Test
-  public void shouldHandleLimitExceededExceptionForGetShardIterator() {
-    shouldHandleGetShardIteratorError(new LimitExceededException(""),
-        TransientKinesisException.class);
-  }
-
-  @Test
-  public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
-    shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
-        TransientKinesisException.class);
-  }
-
-  @Test
-  public void shouldHandleServiceErrorForGetShardIterator() {
-    shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
-        TransientKinesisException.class);
-  }
-
-  @Test
-  public void shouldHandleClientErrorForGetShardIterator() {
-    shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
-        RuntimeException.class);
-  }
-
-  @Test
-  public void shouldHandleUnexpectedExceptionForGetShardIterator() {
-    shouldHandleGetShardIteratorError(new NullPointerException(),
-        RuntimeException.class);
-  }
-
-  private void shouldHandleGetShardIteratorError(
-      Exception thrownException,
-      Class<? extends Exception> expectedExceptionClass) {
-    GetShardIteratorRequest request = new GetShardIteratorRequest()
-        .withStreamName(STREAM)
-        .withShardId(SHARD_1)
-        .withShardIteratorType(ShardIteratorType.LATEST);
-
-    given(kinesis.getShardIterator(request)).willThrow(thrownException);
-
-    try {
-      underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
-      failBecauseExceptionWasNotThrown(expectedExceptionClass);
-    } catch (Exception e) {
-      assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
-    } finally {
-      reset(kinesis);
-    }
-  }
-
-  @Test
-  public void shouldListAllShards() throws Exception {
-    Shard shard1 = new Shard().withShardId(SHARD_1);
-    Shard shard2 = new Shard().withShardId(SHARD_2);
-    Shard shard3 = new Shard().withShardId(SHARD_3);
-    given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
-        .withStreamDescription(new StreamDescription()
-            .withShards(shard1, shard2)
-            .withHasMoreShards(true)));
-    given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
-        .withStreamDescription(new StreamDescription()
-            .withShards(shard3)
-            .withHasMoreShards(false)));
-
-    List<Shard> shards = underTest.listShards(STREAM);
-
-    assertThat(shards).containsOnly(shard1, shard2, shard3);
-  }
-
-  @Test
-  public void shouldHandleExpiredIterationExceptionForShardListing() {
-    shouldHandleShardListingError(new ExpiredIteratorException(""),
-        ExpiredIteratorException.class);
-  }
-
-  @Test
-  public void shouldHandleLimitExceededExceptionForShardListing() {
-    shouldHandleShardListingError(new LimitExceededException(""),
-        TransientKinesisException.class);
-  }
-
-  @Test
-  public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
-    shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
-        TransientKinesisException.class);
-  }
-
-  @Test
-  public void shouldHandleServiceErrorForShardListing() {
-    shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
-        TransientKinesisException.class);
-  }
-
-  @Test
-  public void shouldHandleClientErrorForShardListing() {
-    shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
-        RuntimeException.class);
-  }
-
-  @Test
-  public void shouldHandleUnexpectedExceptionForShardListing() {
-    shouldHandleShardListingError(new NullPointerException(),
-        RuntimeException.class);
-  }
-
-  private void shouldHandleShardListingError(
-      Exception thrownException,
-      Class<? extends Exception> expectedExceptionClass) {
-    given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
-    try {
-      underTest.listShards(STREAM);
-      failBecauseExceptionWasNotThrown(expectedExceptionClass);
-    } catch (Exception e) {
-      assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
-    } finally {
-      reset(kinesis);
-    }
-  }
-
-  private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
-    AmazonServiceException exception = new AmazonServiceException("");
-    exception.setErrorType(errorType);
-    return exception;
-  }
+    @Test
+    public void shouldHandleServiceErrorForShardListing() {
+        shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleClientErrorForShardListing() {
+        shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
+                RuntimeException.class);
+    }
+
+    @Test
+    public void shouldHandleUnexpectedExceptionForShardListing() {
+        shouldHandleShardListingError(new NullPointerException(),
+                RuntimeException.class);
+    }
+
+    private void shouldHandleShardListingError(
+            Exception thrownException,
+            Class<? extends Exception> expectedExceptionClass) {
+        given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
+        try {
+            underTest.listShards(STREAM);
+            failBecauseExceptionWasNotThrown(expectedExceptionClass);
+        } catch (Exception e) {
+            assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+        } finally {
+            reset(kinesis);
+        }
+    }
+
+    private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
+        AmazonServiceException exception = new AmazonServiceException("");
+        exception.setErrorType(errorType);
+        return exception;
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index d93cc41..912e20c 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index 5b5412c..b63775d 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -117,7 +117,7 @@ import org.joda.time.Instant;
  * to the file separated with line feeds.
  * </p>
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class MongoDbGridFSIO {
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 3b14182..620df74 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -18,13 +18,12 @@
 package org.apache.beam.sdk.io.mongodb;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.mongodb.BasicDBObject;
 import com.mongodb.MongoClient;
-import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoClientURI;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
@@ -94,27 +93,19 @@ import org.slf4j.LoggerFactory;
  *
  * }</pre>
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class MongoDbIO {
 
   private static final Logger LOG = LoggerFactory.getLogger(MongoDbIO.class);
 
   /** Read data from MongoDB. */
   public static Read read() {
-    return new AutoValue_MongoDbIO_Read.Builder()
-        .setKeepAlive(true)
-        .setMaxConnectionIdleTime(60000)
-        .setNumSplits(0)
-        .build();
+    return new AutoValue_MongoDbIO_Read.Builder().setNumSplits(0).build();
   }
 
   /** Write data to MongoDB. */
   public static Write write() {
-    return new AutoValue_MongoDbIO_Write.Builder()
-        .setKeepAlive(true)
-        .setMaxConnectionIdleTime(60000)
-        .setBatchSize(1024L)
-        .build();
+    return new AutoValue_MongoDbIO_Write.Builder().setBatchSize(1024L).build();
   }
 
   private MongoDbIO() {
@@ -126,20 +117,16 @@ public class MongoDbIO {
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, PCollection<Document>> {
     @Nullable abstract String uri();
-    abstract boolean keepAlive();
-    abstract int maxConnectionIdleTime();
     @Nullable abstract String database();
     @Nullable abstract String collection();
     @Nullable abstract String filter();
     abstract int numSplits();
 
-    abstract Builder builder();
+    abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setUri(String uri);
-      abstract Builder setKeepAlive(boolean keepAlive);
-      abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime);
       abstract Builder setDatabase(String database);
       abstract Builder setCollection(String collection);
       abstract Builder setFilter(String filter);
@@ -148,94 +135,31 @@ public class MongoDbIO {
     }
 
     /**
-     * Define the location of the MongoDB instances using an URI. The URI describes the hosts to
-     * be used and some options.
-     *
-     * <p>The format of the URI is:
-     *
-     * <pre>{@code
-     * mongodb://[username:password@]host1[:port1]...[,hostN[:portN]]][/[database][?options]]
-     * }</pre>
-     *
-     * <p>Where:
-     *   <ul>
-     *     <li>{@code mongodb://} is a required prefix to identify that this is a string in the
-     *     standard connection format.</li>
-     *     <li>{@code username:password@} are optional. If given, the driver will attempt to
-     *     login to a database after connecting to a database server. For some authentication
-     *     mechanisms, only the username is specified and the password is not, in which case
-     *     the ":" after the username is left off as well.</li>
-     *     <li>{@code host1} is the only required part of the URI. It identifies a server
-     *     address to connect to.</li>
-     *     <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li>
-     *     <li>{@code /database} is the name of the database to login to and thus is only
-     *     relevant if the {@code username:password@} syntax is used. If not specified, the
-     *     "admin" database will be used by default. It has to be equivalent with the database
-     *     you specific with {@link Read#withDatabase(String)}.</li>
-     *     <li>{@code ?options} are connection options. Note that if {@code database} is absent
-     *     there is still a {@code /} required between the last {@code host} and the {@code ?}
-     *     introducing the options. Options are name=value pairs and the pairs are separated by
-     *     "{@code &}". The {@code KeepAlive} connection option can't be passed via the URI,
-     *     instead you have to use {@link Read#withKeepAlive(boolean)}. Same for the
-     *     {@code MaxConnectionIdleTime} connection option via
-     *     {@link Read#withMaxConnectionIdleTime(int)}.
-     *     </li>
-     *   </ul>
+     * Example documentation for withUri.
      */
     public Read withUri(String uri) {
-      checkArgument(uri != null, "MongoDbIO.read().withUri(uri) called with null uri");
-      return builder().setUri(uri).build();
-    }
-
-    /**
-     * Sets whether socket keep alive is enabled.
-     */
-    public Read withKeepAlive(boolean keepAlive) {
-      return builder().setKeepAlive(keepAlive).build();
-    }
-
-    /**
-     * Sets the maximum idle time for a pooled connection.
-     */
-    public Read withMaxConnectionIdleTime(int maxConnectionIdleTime) {
-      return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
+      checkNotNull(uri);
+      return toBuilder().setUri(uri).build();
     }
 
-    /**
-     * Sets the database to use.
-     */
     public Read withDatabase(String database) {
-      checkArgument(database != null, "MongoDbIO.read().withDatabase(database) called with null"
-          + " database");
-      return builder().setDatabase(database).build();
+      checkNotNull(database);
+      return toBuilder().setDatabase(database).build();
     }
 
-    /**
-     * Sets the collection to consider in the database.
-     */
     public Read withCollection(String collection) {
-      checkArgument(collection != null, "MongoDbIO.read().withCollection(collection) called "
-          + "with null collection");
-      return builder().setCollection(collection).build();
+      checkNotNull(collection);
+      return toBuilder().setCollection(collection).build();
     }
 
-    /**
-     * Sets a filter on the documents in a collection.
-     */
     public Read withFilter(String filter) {
-      checkArgument(filter != null, "MongoDbIO.read().withFilter(filter) called with null "
-          + "filter");
-      return builder().setFilter(filter).build();
+      checkNotNull(filter);
+      return toBuilder().setFilter(filter).build();
     }
 
-    /**
-     * Sets the user defined number of splits.
-     */
     public Read withNumSplits(int numSplits) {
-      checkArgument(numSplits >= 0, "MongoDbIO.read().withNumSplits(numSplits) called with "
-          + "invalid number. The number of splits has to be a positive value (currently %d)",
-          numSplits);
-      return builder().setNumSplits(numSplits).build();
+      checkArgument(numSplits >= 0);
+      return toBuilder().setNumSplits(numSplits).build();
     }
 
     @Override
@@ -245,19 +169,15 @@ public class MongoDbIO {
 
     @Override
     public void validate(PipelineOptions options) {
-      checkState(uri() != null, "MongoDbIO.read() requires an URI to be set via withUri(uri)");
-      checkState(database() != null, "MongoDbIO.read() requires a database to be set via "
-          + "withDatabase(database)");
-      checkState(collection() != null, "MongoDbIO.read() requires a collection to be set via "
-          + "withCollection(collection)");
+      checkNotNull(uri(), "uri");
+      checkNotNull(database(), "database");
+      checkNotNull(collection(), "collection");
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder.add(DisplayData.item("uri", uri()));
-      builder.add(DisplayData.item("keepAlive", keepAlive()));
-      builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime()));
       builder.add(DisplayData.item("database", database()));
       builder.add(DisplayData.item("collection", collection()));
       builder.addIfNotNull(DisplayData.item("filter", filter()));
@@ -298,71 +218,61 @@ public class MongoDbIO {
 
     @Override
     public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
-      try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) {
-        return getEstimatedSizeBytes(mongoClient, spec.database(), spec.collection());
-      }
-    }
-
-    private long getEstimatedSizeBytes(MongoClient mongoClient,
-                                       String database,
-                                       String collection) {
-      MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
+      MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
+      MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
 
       // get the Mongo collStats object
       // it gives the size for the entire collection
       BasicDBObject stat = new BasicDBObject();
-      stat.append("collStats", collection);
+      stat.append("collStats", spec.collection());
       Document stats = mongoDatabase.runCommand(stat);
-
       return stats.get("size", Number.class).longValue();
     }
 
     @Override
     public List<BoundedSource<Document>> split(long desiredBundleSizeBytes,
                                                 PipelineOptions options) {
-      try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) {
-        MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
-
-        List<Document> splitKeys;
-        if (spec.numSplits() > 0) {
-          // the user defines his desired number of splits
-          // calculate the batch size
-          long estimatedSizeBytes = getEstimatedSizeBytes(mongoClient,
-              spec.database(), spec.collection());
-          desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits();
-        }
-
-        // the desired batch size is small, using default chunk size of 1MB
-        if (desiredBundleSizeBytes < 1024 * 1024) {
-          desiredBundleSizeBytes = 1 * 1024 * 1024;
-        }
-
-        // now we have the batch size (provided by user or provided by the runner)
-        // we use Mongo splitVector command to get the split keys
-        BasicDBObject splitVectorCommand = new BasicDBObject();
-        splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection());
-        splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
-        splitVectorCommand.append("force", false);
-        // maxChunkSize is the Mongo partition size in MB
-        LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024);
-        splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024);
-        Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
-        splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
-
-        List<BoundedSource<Document>> sources = new ArrayList<>();
-        if (splitKeys.size() < 1) {
-          LOG.debug("Split keys is low, using an unique source");
-          sources.add(this);
-          return sources;
-        }
+      MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
+      MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
+
+      List<Document> splitKeys;
+      if (spec.numSplits() > 0) {
+        // the user defines his desired number of splits
+        // calculate the batch size
+        long estimatedSizeBytes = getEstimatedSizeBytes(options);
+        desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits();
+      }
 
-        LOG.debug("Number of splits is {}", splitKeys.size());
-        for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) {
-          sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter)));
-        }
+      // the desired batch size is small, using default chunk size of 1MB
+      if (desiredBundleSizeBytes < 1024 * 1024) {
+        desiredBundleSizeBytes = 1 * 1024 * 1024;
+      }
 
+      // now we have the batch size (provided by user or provided by the runner)
+      // we use Mongo splitVector command to get the split keys
+      BasicDBObject splitVectorCommand = new BasicDBObject();
+      splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection());
+      splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
+      splitVectorCommand.append("force", false);
+      // maxChunkSize is the Mongo partition size in MB
+      LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024);
+      splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024);
+      Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
+      splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
+
+      List<BoundedSource<Document>> sources = new ArrayList<>();
+      if (splitKeys.size() < 1) {
+        LOG.debug("Split keys is low, using an unique source");
+        sources.add(this);
         return sources;
       }
+
+      LOG.debug("Number of splits is {}", splitKeys.size());
+      for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) {
+        sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter)));
+      }
+
+      return sources;
     }
 
     /**
@@ -457,10 +367,7 @@ public class MongoDbIO {
     @Override
     public boolean start() {
       Read spec = source.spec;
-      MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder();
-      optionsBuilder.maxConnectionIdleTime(spec.maxConnectionIdleTime());
-      optionsBuilder.socketKeepAlive(spec.keepAlive());
-      client = new MongoClient(new MongoClientURI(spec.uri(), optionsBuilder));
+      client = new MongoClient(new MongoClientURI(spec.uri()));
 
       MongoDatabase mongoDatabase = client.getDatabase(spec.database());
 
@@ -519,106 +426,36 @@ public class MongoDbIO {
    */
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<Document>, PDone> {
-
     @Nullable abstract String uri();
-    abstract boolean keepAlive();
-    abstract int maxConnectionIdleTime();
     @Nullable abstract String database();
     @Nullable abstract String collection();
     abstract long batchSize();
 
-    abstract Builder builder();
+    abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setUri(String uri);
-      abstract Builder setKeepAlive(boolean keepAlive);
-      abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime);
       abstract Builder setDatabase(String database);
       abstract Builder setCollection(String collection);
       abstract Builder setBatchSize(long batchSize);
       abstract Write build();
     }
 
-    /**
-     * Define the location of the MongoDB instances using an URI. The URI describes the hosts to
-     * be used and some options.
-     *
-     * <p>The format of the URI is:
-     *
-     * <pre>{@code
-     * mongodb://[username:password@]host1[:port1],...[,hostN[:portN]]][/[database][?options]]
-     * }</pre>
-     *
-     * <p>Where:
-     *   <ul>
-     *     <li>{@code mongodb://} is a required prefix to identify that this is a string in the
-     *     standard connection format.</li>
-     *     <li>{@code username:password@} are optional. If given, the driver will attempt to
-     *     login to a database after connecting to a database server. For some authentication
-     *     mechanisms, only the username is specified and the password is not, in which case
-     *     the ":" after the username is left off as well.</li>
-     *     <li>{@code host1} is the only required part of the URI. It identifies a server
-     *     address to connect to.</li>
-     *     <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li>
-     *     <li>{@code /database} is the name of the database to login to and thus is only
-     *     relevant if the {@code username:password@} syntax is used. If not specified, the
-     *     "admin" database will be used by default. It has to be equivalent with the database
-     *     you specific with {@link Write#withDatabase(String)}.</li>
-     *     <li>{@code ?options} are connection options. Note that if {@code database} is absent
-     *     there is still a {@code /} required between the last {@code host} and the {@code ?}
-     *     introducing the options. Options are name=value pairs and the pairs are separated by
-     *     "{@code &}". The {@code KeepAlive} connection option can't be passed via the URI, instead
-     *     you have to use {@link Write#withKeepAlive(boolean)}. Same for the
-     *     {@code MaxConnectionIdleTime} connection option via
-     *     {@link Write#withMaxConnectionIdleTime(int)}.
-     *     </li>
-     *   </ul>
-     */
     public Write withUri(String uri) {
-      checkArgument(uri != null, "MongoDbIO.write().withUri(uri) called with null uri");
-      return builder().setUri(uri).build();
-    }
-
-    /**
-     * Sets whether socket keep alive is enabled.
-     */
-    public Write withKeepAlive(boolean keepAlive) {
-      return builder().setKeepAlive(keepAlive).build();
-    }
-
-    /**
-     * Sets the maximum idle time for a pooled connection.
-     */
-    public Write withMaxConnectionIdleTime(int maxConnectionIdleTime) {
-      return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
+      return toBuilder().setUri(uri).build();
     }
 
-    /**
-     * Sets the database to use.
-     */
     public Write withDatabase(String database) {
-      checkArgument(database != null, "MongoDbIO.write().withDatabase(database) called with "
-          + "null database");
-      return builder().setDatabase(database).build();
+      return toBuilder().setDatabase(database).build();
     }
 
-    /**
-     * Sets the collection where to write data in the database.
-     */
     public Write withCollection(String collection) {
-      checkArgument(collection != null, "MongoDbIO.write().withCollection(collection) called "
-          + "with null collection");
-      return builder().setCollection(collection).build();
+      return toBuilder().setCollection(collection).build();
     }
 
-    /**
-     * Define the size of the batch to group write operations.
-     */
     public Write withBatchSize(long batchSize) {
-      checkArgument(batchSize >= 0, "MongoDbIO.write().withBatchSize(batchSize) called with "
-          + "invalid batch size. Batch size has to be >= 0 (currently %d)", batchSize);
-      return builder().setBatchSize(batchSize).build();
+      return toBuilder().setBatchSize(batchSize).build();
     }
 
     @Override
@@ -629,21 +466,10 @@ public class MongoDbIO {
 
     @Override
     public void validate(PipelineOptions options) {
-      checkState(uri() != null, "MongoDbIO.write() requires an URI to be set via withUri(uri)");
-      checkState(database() != null, "MongoDbIO.write() requires a database to be set via "
-          + "withDatabase(database)");
-      checkState(collection() != null, "MongoDbIO.write() requires a collection to be set via "
-          + "withCollection(collection)");
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.add(DisplayData.item("uri", uri()));
-      builder.add(DisplayData.item("keepAlive", keepAlive()));
-      builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime()));
-      builder.add(DisplayData.item("database", database()));
-      builder.add(DisplayData.item("collection", collection()));
-      builder.add(DisplayData.item("batchSize", batchSize()));
+      checkNotNull(uri(), "uri");
+      checkNotNull(database(), "database");
+      checkNotNull(collection(), "collection");
+      checkNotNull(batchSize(), "batchSize");
     }
 
     private static class WriteFn extends DoFn<Document, Void> {
@@ -657,10 +483,7 @@ public class MongoDbIO {
 
       @Setup
       public void createMongoClient() throws Exception {
-        MongoClientOptions.Builder builder = new MongoClientOptions.Builder();
-        builder.socketKeepAlive(spec.keepAlive());
-        builder.maxConnectionIdleTime(spec.maxConnectionIdleTime());
-        client = new MongoClient(new MongoClientURI(spec.uri(), builder));
+        client = new MongoClient(new MongoClientURI(spec.uri()));
       }
 
       @StartBundle

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 67dbca4..cd26b48 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.mongodb;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
 import com.mongodb.MongoClient;
 import com.mongodb.client.MongoCollection;
@@ -190,42 +189,6 @@ public class MongoDbIOTest implements Serializable {
   }
 
   @Test
-  public void testReadWithCustomConnectionOptions() throws Exception {
-    MongoDbIO.Read read = MongoDbIO.read()
-        .withUri("mongodb://localhost:" + port)
-        .withKeepAlive(false)
-        .withMaxConnectionIdleTime(10)
-        .withDatabase(DATABASE)
-        .withCollection(COLLECTION);
-    assertFalse(read.keepAlive());
-    assertEquals(10, read.maxConnectionIdleTime());
-
-    PCollection<Document> documents = pipeline.apply(read);
-
-    PAssert.thatSingleton(documents.apply("Count All", Count.<Document>globally()))
-        .isEqualTo(1000L);
-
-    PAssert.that(documents
-        .apply("Map Scientist", MapElements.via(new SimpleFunction<Document, KV<String, Void>>() {
-          public KV<String, Void> apply(Document input) {
-            return KV.of(input.getString("scientist"), null);
-          }
-        }))
-        .apply("Count Scientist", Count.<String, Void>perKey())
-    ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
-      @Override
-      public Void apply(Iterable<KV<String, Long>> input) {
-        for (KV<String, Long> element : input) {
-          assertEquals(100L, element.getValue().longValue());
-        }
-        return null;
-      }
-    });
-
-    pipeline.run();
-  }
-
-  @Test
   public void testReadWithFilter() throws Exception {
 
     PCollection<Document> output = pipeline.apply(

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml
index 9fa1dc0..baaf771 100644
--- a/sdks/java/io/mqtt/pom.xml
+++ b/sdks/java/io/mqtt/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index add5cb5..228a85d 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -97,7 +97,7 @@ import org.slf4j.LoggerFactory;
  *
  * }</pre>
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class MqttIO {
 
   private static final Logger LOG = LoggerFactory.getLogger(MqttIO.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index b7909fa..44f3baa 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -32,8 +32,38 @@
   <description>Beam SDK Java IO provides different connectivity components
   (sources and sinks) to consume and produce data from systems.</description>
 
+  <properties>
+    <!--
+      This is the version of Hadoop used to compile the hadoop-common module.
+      This dependency is defined with a provided scope.
+      Users must supply their own Hadoop version at runtime.
+    -->
+    <hadoop.version>2.7.3</hadoop.version>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-client</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-common</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-mapreduce-client-core</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <modules>
-    <module>amqp</module>
     <module>cassandra</module>
     <module>common</module>
     <module>elasticsearch</module>
@@ -42,7 +72,6 @@
     <module>hadoop-file-system</module>
     <module>hadoop</module>
     <module>hbase</module>
-    <module>hcatalog</module>
     <module>jdbc</module>
     <module>jms</module>
     <module>kafka</module>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/pom.xml b/sdks/java/io/xml/pom.xml
index 7b5804e..cf7dd33 100644
--- a/sdks/java/io/xml/pom.xml
+++ b/sdks/java/io/xml/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index 442fba5..7255a94 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -522,8 +521,7 @@ public class XmlIO {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      return input.apply(
-          org.apache.beam.sdk.io.WriteFiles.to(createSink(), SerializableFunctions.<T>identity()));
+      return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink()));
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 74e0bda..6ae83f2 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -25,7 +25,6 @@ import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Marshaller;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy;
-import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.ShardNameTemplate;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -35,18 +34,18 @@ import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /** Implementation of {@link XmlIO#write}. */
-class XmlSink<T> extends FileBasedSink<T, Void> {
+class XmlSink<T> extends FileBasedSink<T> {
   private static final String XML_EXTENSION = ".xml";
 
   private final XmlIO.Write<T> spec;
 
-  private static <T> DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<T> spec) {
-    return DefaultFilenamePolicy.fromStandardParameters(
+  private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) {
+    return DefaultFilenamePolicy.constructUsingStandardParameters(
         spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION, false);
   }
 
   XmlSink(XmlIO.Write<T> spec) {
-    super(spec.getFilenamePrefix(), DynamicFileDestinations.constant(makeFilenamePolicy(spec)));
+    super(spec.getFilenamePrefix(), makeFilenamePolicy(spec));
     this.spec = spec;
   }
 
@@ -76,8 +75,10 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
     super.populateDisplayData(builder);
   }
 
-  /** {@link WriteOperation} for XML {@link FileBasedSink}s. */
-  protected static final class XmlWriteOperation<T> extends WriteOperation<T, Void> {
+  /**
+   * {@link WriteOperation} for XML {@link FileBasedSink}s.
+   */
+  protected static final class XmlWriteOperation<T> extends WriteOperation<T> {
     public XmlWriteOperation(XmlSink<T> sink) {
       super(sink);
     }
@@ -111,8 +112,10 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
     }
   }
 
-  /** A {@link Writer} that can write objects as XML elements. */
-  protected static final class XmlWriter<T> extends Writer<T, Void> {
+  /**
+   * A {@link Writer} that can write objects as XML elements.
+   */
+  protected static final class XmlWriter<T> extends Writer<T> {
     final Marshaller marshaller;
     private OutputStream os = null;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
index d1584dc..aa0c1c3 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -197,8 +197,8 @@ public class XmlSinkTest {
         .withRecordClass(Integer.class);
 
     DisplayData displayData = DisplayData.from(write);
-    assertThat(
-        displayData, hasDisplayItem("filenamePattern", "/path/to/file-SSSSS-of-NNNNN" + ".xml"));
+
+    assertThat(displayData, hasDisplayItem("filenamePattern", "file-SSSSS-of-NNNNN.xml"));
     assertThat(displayData, hasDisplayItem("rootElement", "bird"));
     assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index 2378014..b90a757 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index 51109fb..54dae3a 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../../../pom.xml</relativePath>
   </parent>
 
@@ -99,16 +99,6 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-amqp</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-cassandra</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
     </dependency>
 
@@ -134,11 +124,6 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-hcatalog</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-io-jdbc</artifactId>
     </dependency>
 
@@ -211,11 +196,13 @@
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.10</artifactId>
+      <version>${spark.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_2.10</artifactId>
+      <version>${spark.version}</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples-java8/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/pom.xml b/sdks/java/maven-archetypes/examples-java8/pom.xml
index b60a695..b57644d 100644
--- a/sdks/java/maven-archetypes/examples-java8/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
index 4517861..af4fbd3 100644
--- a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
@@ -242,6 +242,7 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
+          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml
index 2a02039..c1378cb 100644
--- a/sdks/java/maven-archetypes/examples/pom.xml
+++ b/sdks/java/maven-archetypes/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index d039ddb..b8b9c9f 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -241,6 +241,7 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
+          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/pom.xml b/sdks/java/maven-archetypes/pom.xml
index d676b31..b7fe274 100644
--- a/sdks/java/maven-archetypes/pom.xml
+++ b/sdks/java/maven-archetypes/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 8024b52..06b41c8 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
index 6056fb0..60405e6 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
@@ -28,7 +28,7 @@
     <beam.version>@project.version@</beam.version>
 
     <maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version>
-    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
+    <maven-exec-plugin.version>1.4.0</maven-exec-plugin.version>
     <slf4j.version>1.7.14</slf4j.version>
   </properties>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 3144193..250c85a 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/pom.xml b/sdks/pom.xml
index aec8762..27b9610 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 2670250..10298bf 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -710,10 +710,6 @@ class WindowedValueCoderImpl(StreamCoderImpl):
       timestamp = MAX_TIMESTAMP.micros
     else:
       timestamp *= 1000
-      if timestamp > MAX_TIMESTAMP.micros:
-        timestamp = MAX_TIMESTAMP.micros
-      if timestamp < MIN_TIMESTAMP.micros:
-        timestamp = MIN_TIMESTAMP.micros
 
     windows = self._windows_coder.decode_from_stream(in_stream, True)
     # Read PaneInfo encoded byte.

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index c56ef52..f40045d 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -25,7 +25,6 @@ import cPickle as pickle
 import google.protobuf
 
 from apache_beam.coders import coder_impl
-from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.utils import urns
 from apache_beam.utils import proto_utils
 
@@ -206,6 +205,7 @@ class Coder(object):
     """For internal use only; no backwards-compatibility guarantees.
     """
     # TODO(BEAM-115): Use specialized URNs and components.
+    from apache_beam.runners.api import beam_runner_api_pb2
     return beam_runner_api_pb2.Coder(
         spec=beam_runner_api_pb2.SdkFunctionSpec(
             spec=beam_runner_api_pb2.FunctionSpec(
@@ -286,11 +286,6 @@ class BytesCoder(FastCoder):
   def is_deterministic(self):
     return True
 
-  def as_cloud_object(self):
-    return {
-        '@type': 'kind:bytes',
-    }
-
   def __eq__(self, other):
     return type(self) == type(other)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 577c53a..c9b67b3 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -23,8 +23,6 @@ import unittest
 
 import dill
 
-from apache_beam.transforms.window import GlobalWindow
-from apache_beam.utils.timestamp import MIN_TIMESTAMP
 import observable
 from apache_beam.transforms import window
 from apache_beam.utils import timestamp
@@ -289,12 +287,6 @@ class CodersTest(unittest.TestCase):
     # Test binary representation
     self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01',
                      coder.encode(window.GlobalWindows.windowed_value(1)))
-
-    # Test decoding large timestamp
-    self.assertEqual(
-        coder.decode('\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'),
-        windowed_value.create(0, MIN_TIMESTAMP.micros, (GlobalWindow(),)))
-
     # Test unnested
     self.check_coder(
         coders.WindowedValueCoder(coders.VarIntCoder()),

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 31f71b3..9183d0d 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -589,22 +589,6 @@ class SnippetsTest(unittest.TestCase):
     snippets.model_textio_compressed(
         {'read': gzip_file_name}, ['aa', 'bb', 'cc'])
 
-  def test_model_textio_gzip_concatenated(self):
-    temp_path_1 = self.create_temp_file('a\nb\nc\n')
-    temp_path_2 = self.create_temp_file('p\nq\nr\n')
-    temp_path_3 = self.create_temp_file('x\ny\nz')
-    gzip_file_name = temp_path_1 + '.gz'
-    with open(temp_path_1) as src, gzip.open(gzip_file_name, 'wb') as dst:
-      dst.writelines(src)
-    with open(temp_path_2) as src, gzip.open(gzip_file_name, 'ab') as dst:
-      dst.writelines(src)
-    with open(temp_path_3) as src, gzip.open(gzip_file_name, 'ab') as dst:
-      dst.writelines(src)
-      # Add the temporary gzip file to be cleaned up as well.
-      self.temp_files.append(gzip_file_name)
-    snippets.model_textio_compressed(
-        {'read': gzip_file_name}, ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z'])
-
   @unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed')
   def test_model_datastoreio(self):
     # We cannot test datastoreio functionality in unit tests therefore we limit

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 7696d77..ed8b5d0 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -25,44 +25,35 @@ from __future__ import absolute_import
 
 import argparse
 import logging
+import re
 
 
 import apache_beam as beam
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.options.pipeline_options import StandardOptions
 import apache_beam.transforms.window as window
 
 
-def split_fn(lines):
-  import re
-  return re.findall(r'[A-Za-z\']+', lines)
-
-
 def run(argv=None):
   """Build and run the pipeline."""
+
   parser = argparse.ArgumentParser()
   parser.add_argument(
       '--input_topic', required=True,
-      help=('Input PubSub topic of the form '
-            '"projects/<PROJECT>/topics/<TOPIC>".'))
+      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
   parser.add_argument(
       '--output_topic', required=True,
-      help=('Output PubSub topic of the form '
-            '"projects/<PROJECT>/topic/<TOPIC>".'))
+      help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
   known_args, pipeline_args = parser.parse_known_args(argv)
-  options = PipelineOptions(pipeline_args)
-  options.view_as(StandardOptions).streaming = True
 
-  with beam.Pipeline(options=options) as p:
+  with beam.Pipeline(argv=pipeline_args) as p:
 
-    # Read from PubSub into a PCollection.
+    # Read the text file[pattern] into a PCollection.
     lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
 
     # Capitalize the characters in each line.
     transformed = (lines
-                   # Use a pre-defined function that imports the re package.
                    | 'Split' >> (
-                       beam.FlatMap(split_fn).with_output_types(unicode))
+                       beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+                       .with_output_types(unicode))
                    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                    | beam.WindowInto(window.FixedWindows(15, 0))
                    | 'Group' >> beam.GroupByKey()

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
deleted file mode 100644
index bd57847..0000000
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A streaming word-counting workflow.
-
-Important: streaming pipeline support in Python Dataflow is in development
-and is not yet available for use.
-"""
-
-from __future__ import absolute_import
-
-import argparse
-import logging
-
-
-import apache_beam as beam
-import apache_beam.transforms.window as window
-
-TABLE_SCHEMA = ('word:STRING, count:INTEGER, '
-                'window_start:TIMESTAMP, window_end:TIMESTAMP')
-
-
-def find_words(element):
-  import re
-  return re.findall(r'[A-Za-z\']+', element)
-
-
-class FormatDoFn(beam.DoFn):
-  def process(self, element, window=beam.DoFn.WindowParam):
-    ts_format = '%Y-%m-%d %H:%M:%S.%f UTC'
-    window_start = window.start.to_utc_datetime().strftime(ts_format)
-    window_end = window.end.to_utc_datetime().strftime(ts_format)
-    return [{'word': element[0],
-             'count': element[1],
-             'window_start':window_start,
-             'window_end':window_end}]
-
-
-def run(argv=None):
-  """Build and run the pipeline."""
-
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '--input_topic', required=True,
-      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
-  parser.add_argument(
-      '--output_table', required=True,
-      help=
-      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
-       'or DATASET.TABLE.'))
-  known_args, pipeline_args = parser.parse_known_args(argv)
-
-  with beam.Pipeline(argv=pipeline_args) as p:
-
-    # Read the text from PubSub messages
-    lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
-
-    # Capitalize the characters in each line.
-    transformed = (lines
-                   | 'Split' >> (beam.FlatMap(find_words)
-                                 .with_output_types(unicode))
-                   | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
-                   | beam.WindowInto(window.FixedWindows(2*60, 0))
-                   | 'Group' >> beam.GroupByKey()
-                   | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
-                   | 'Format' >> beam.ParDo(FormatDoFn()))
-
-    # Write to BigQuery.
-    # pylint: disable=expression-not-assigned
-    transformed | 'Write' >> beam.io.WriteToBigQuery(
-        known_args.output_table,
-        schema=TABLE_SCHEMA,
-        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  run()

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index 1f65d0a..db6a1d0 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -26,8 +26,6 @@ import zlib
 import logging
 import time
 
-from apache_beam.utils.plugin import BeamPlugin
-
 logger = logging.getLogger(__name__)
 
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
@@ -201,14 +199,6 @@ class CompressedFile(object):
             assert False, 'Possible file corruption.'
           except EOFError:
             pass  # All is as expected!
-        elif self._compression_type == CompressionTypes.GZIP:
-          # If Gzip file check if there is unused data generated by gzip concat
-          if self._decompressor.unused_data != '':
-            buf = self._decompressor.unused_data
-            self._decompressor = zlib.decompressobj(self._gzip_mask)
-            decompressed = self._decompressor.decompress(buf)
-            self._read_buffer.write(decompressed)
-            continue
         else:
           self._read_buffer.write(self._decompressor.flush())
 
@@ -419,7 +409,7 @@ class BeamIOError(IOError):
     self.exception_details = exception_details
 
 
-class FileSystem(BeamPlugin):
+class FileSystem(object):
   """A class that defines the functions that can be performed on a filesystem.
 
   All methods are abstract and they are for file system providers to
@@ -439,6 +429,16 @@ class FileSystem(BeamPlugin):
     return compression_type
 
   @classmethod
+  def get_all_subclasses(cls):
+    """Get all the subclasses of the FileSystem class
+    """
+    all_subclasses = []
+    for subclass in cls.__subclasses__():
+      all_subclasses.append(subclass)
+      all_subclasses.extend(subclass.get_all_subclasses())
+    return all_subclasses
+
+  @classmethod
   def scheme(cls):
     """URI scheme for the FileSystem
     """

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 643fbc7..d43c8ba 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -31,7 +31,6 @@ import re
 import threading
 import time
 import traceback
-import httplib2
 
 from apache_beam.utils import retry
 
@@ -69,10 +68,6 @@ except ImportError:
 # +---------------+------------+-------------+-------------+-------------+
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
 
-# This is the number of seconds the library will wait for GCS operations to
-# complete.
-DEFAULT_HTTP_TIMEOUT_SECONDS = 60
-
 # This is the number of seconds the library will wait for a partial-file read
 # operation from GCS to complete before retrying.
 DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60
@@ -104,7 +99,6 @@ class GcsIO(object):
 
   def __new__(cls, storage_client=None):
     if storage_client:
-      # This path is only used for testing.
       return super(GcsIO, cls).__new__(cls, storage_client)
     else:
       # Create a single storage client for each thread.  We would like to avoid
@@ -114,9 +108,7 @@ class GcsIO(object):
       local_state = threading.local()
       if getattr(local_state, 'gcsio_instance', None) is None:
         credentials = auth.get_service_credentials()
-        storage_client = storage.StorageV1(
-            credentials=credentials,
-            http=httplib2.Http(timeout=DEFAULT_HTTP_TIMEOUT_SECONDS))
+        storage_client = storage.StorageV1(credentials=credentials)
         local_state.gcsio_instance = (
             super(GcsIO, cls).__new__(cls, storage_client))
         local_state.gcsio_instance.client = storage_client