You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:32 UTC
[05/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index 4b2190f..49e806d 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -25,10 +25,8 @@ import static org.mockito.Matchers.anyListOf;
import static org.mockito.Mockito.when;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-
import java.io.IOException;
import java.util.Collections;
-
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,114 +40,112 @@ import org.mockito.stubbing.Answer;
*/
@RunWith(MockitoJUnitRunner.class)
public class ShardRecordsIteratorTest {
+ private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
+ private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
+ private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
+ private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
+ private static final String STREAM_NAME = "STREAM_NAME";
+ private static final String SHARD_ID = "SHARD_ID";
+
+ @Mock
+ private SimplifiedKinesisClient kinesisClient;
+ @Mock
+ private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
+ @Mock
+ private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
+ @Mock
+ private KinesisRecord a, b, c, d;
+ @Mock
+ private RecordFilter recordFilter;
+
+ private ShardRecordsIterator iterator;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
+ when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+ when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
+ when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
+ when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
+ when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
+ when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+ when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(firstResult);
+ when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(secondResult);
+ when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(thirdResult);
+
+ when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
+ when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+ when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+
+ when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+ when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+ when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+ when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
+ .class))).thenAnswer(new IdentityAnswer());
+
+ iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
+ }
+
+ @Test
+ public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ }
+
+ @Test
+ public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+ when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+ when(secondResult.getRecords()).thenReturn(singletonList(d));
+
+ assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+ assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+ assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+ assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
+ assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+ }
+
+ @Test
+ public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
+ when(firstResult.getRecords()).thenReturn(singletonList(a));
+ when(secondResult.getRecords()).thenReturn(singletonList(b));
+
+ when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenThrow(ExpiredIteratorException.class);
+ when(aCheckpoint.getShardIterator(kinesisClient))
+ .thenReturn(SECOND_REFRESHED_ITERATOR);
+ when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(secondResult);
+
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ }
- private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
- private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
- private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
- private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
- private static final String STREAM_NAME = "STREAM_NAME";
- private static final String SHARD_ID = "SHARD_ID";
-
- @Mock
- private SimplifiedKinesisClient kinesisClient;
- @Mock
- private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
- @Mock
- private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
- @Mock
- private KinesisRecord a, b, c, d;
- @Mock
- private RecordFilter recordFilter;
-
- private ShardRecordsIterator iterator;
-
- @Before
- public void setUp() throws IOException, TransientKinesisException {
- when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
- when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
- when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
- when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
- when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
- when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
- when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
- when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
- when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
- when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
- when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
- when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
- when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
- when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
- when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
-
- when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
- .thenReturn(firstResult);
- when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
- .thenReturn(secondResult);
- when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
- .thenReturn(thirdResult);
-
- when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
- when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
- when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
-
- when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
- when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
- when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
-
- when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
- .class))).thenAnswer(new IdentityAnswer());
-
- iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
- }
-
- @Test
- public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- }
-
- @Test
- public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
- when(firstResult.getRecords()).thenReturn(asList(a, b, c));
- when(secondResult.getRecords()).thenReturn(singletonList(d));
-
- assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
- assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
- assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
- assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
- assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
- }
-
- @Test
- public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
- when(firstResult.getRecords()).thenReturn(singletonList(a));
- when(secondResult.getRecords()).thenReturn(singletonList(b));
-
- when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
- .thenThrow(ExpiredIteratorException.class);
- when(aCheckpoint.getShardIterator(kinesisClient))
- .thenReturn(SECOND_REFRESHED_ITERATOR);
- when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
- .thenReturn(secondResult);
-
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- }
-
- private static class IdentityAnswer implements Answer<Object> {
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- return invocation.getArguments()[0];
+ private static class IdentityAnswer implements Answer<Object> {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ return invocation.getArguments()[0];
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
index 2f8757c..96434fd 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -34,9 +34,7 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
-
import java.util.List;
-
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -48,180 +46,179 @@ import org.mockito.runners.MockitoJUnitRunner;
*/
@RunWith(MockitoJUnitRunner.class)
public class SimplifiedKinesisClientTest {
+ private static final String STREAM = "stream";
+ private static final String SHARD_1 = "shard-01";
+ private static final String SHARD_2 = "shard-02";
+ private static final String SHARD_3 = "shard-03";
+ private static final String SHARD_ITERATOR = "iterator";
+ private static final String SEQUENCE_NUMBER = "abc123";
+
+ @Mock
+ private AmazonKinesis kinesis;
+ @InjectMocks
+ private SimplifiedKinesisClient underTest;
+
+ @Test
+ public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
+ given(kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ .withStartingSequenceNumber(SEQUENCE_NUMBER)
+ )).willReturn(new GetShardIteratorResult()
+ .withShardIterator(SHARD_ITERATOR));
+
+ String stream = underTest.getShardIterator(STREAM, SHARD_1,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
+
+ assertThat(stream).isEqualTo(SHARD_ITERATOR);
+ }
+
+ @Test
+ public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
+ Instant timestamp = Instant.now();
+ given(kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ .withTimestamp(timestamp.toDate())
+ )).willReturn(new GetShardIteratorResult()
+ .withShardIterator(SHARD_ITERATOR));
+
+ String stream = underTest.getShardIterator(STREAM, SHARD_1,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
+
+ assertThat(stream).isEqualTo(SHARD_ITERATOR);
+ }
+
+ @Test
+ public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
+ ExpiredIteratorException.class);
+ }
+
+ @Test
+ public void shouldHandleLimitExceededExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new LimitExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleServiceErrorForGetShardIterator() {
+ shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleClientErrorForGetShardIterator() {
+ shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
+ RuntimeException.class);
+ }
+
+ @Test
+ public void shouldHandleUnexpectedExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new NullPointerException(),
+ RuntimeException.class);
+ }
+
+ private void shouldHandleGetShardIteratorError(
+ Exception thrownException,
+ Class<? extends Exception> expectedExceptionClass) {
+ GetShardIteratorRequest request = new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.LATEST);
+
+ given(kinesis.getShardIterator(request)).willThrow(thrownException);
+
+ try {
+ underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
+ failBecauseExceptionWasNotThrown(expectedExceptionClass);
+ } catch (Exception e) {
+ assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+ } finally {
+ reset(kinesis);
+ }
+ }
+
+ @Test
+ public void shouldListAllShards() throws Exception {
+ Shard shard1 = new Shard().withShardId(SHARD_1);
+ Shard shard2 = new Shard().withShardId(SHARD_2);
+ Shard shard3 = new Shard().withShardId(SHARD_3);
+ given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
+ .withStreamDescription(new StreamDescription()
+ .withShards(shard1, shard2)
+ .withHasMoreShards(true)));
+ given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
+ .withStreamDescription(new StreamDescription()
+ .withShards(shard3)
+ .withHasMoreShards(false)));
+
+ List<Shard> shards = underTest.listShards(STREAM);
+
+ assertThat(shards).containsOnly(shard1, shard2, shard3);
+ }
+
+ @Test
+ public void shouldHandleExpiredIterationExceptionForShardListing() {
+ shouldHandleShardListingError(new ExpiredIteratorException(""),
+ ExpiredIteratorException.class);
+ }
+
+ @Test
+ public void shouldHandleLimitExceededExceptionForShardListing() {
+ shouldHandleShardListingError(new LimitExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
+ shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
+ TransientKinesisException.class);
+ }
- private static final String STREAM = "stream";
- private static final String SHARD_1 = "shard-01";
- private static final String SHARD_2 = "shard-02";
- private static final String SHARD_3 = "shard-03";
- private static final String SHARD_ITERATOR = "iterator";
- private static final String SEQUENCE_NUMBER = "abc123";
-
- @Mock
- private AmazonKinesis kinesis;
- @InjectMocks
- private SimplifiedKinesisClient underTest;
-
- @Test
- public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
- given(kinesis.getShardIterator(new GetShardIteratorRequest()
- .withStreamName(STREAM)
- .withShardId(SHARD_1)
- .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
- .withStartingSequenceNumber(SEQUENCE_NUMBER)
- )).willReturn(new GetShardIteratorResult()
- .withShardIterator(SHARD_ITERATOR));
-
- String stream = underTest.getShardIterator(STREAM, SHARD_1,
- ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
-
- assertThat(stream).isEqualTo(SHARD_ITERATOR);
- }
-
- @Test
- public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
- Instant timestamp = Instant.now();
- given(kinesis.getShardIterator(new GetShardIteratorRequest()
- .withStreamName(STREAM)
- .withShardId(SHARD_1)
- .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
- .withTimestamp(timestamp.toDate())
- )).willReturn(new GetShardIteratorResult()
- .withShardIterator(SHARD_ITERATOR));
-
- String stream = underTest.getShardIterator(STREAM, SHARD_1,
- ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
-
- assertThat(stream).isEqualTo(SHARD_ITERATOR);
- }
-
- @Test
- public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
- shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
- ExpiredIteratorException.class);
- }
-
- @Test
- public void shouldHandleLimitExceededExceptionForGetShardIterator() {
- shouldHandleGetShardIteratorError(new LimitExceededException(""),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
- shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleServiceErrorForGetShardIterator() {
- shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleClientErrorForGetShardIterator() {
- shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
- RuntimeException.class);
- }
-
- @Test
- public void shouldHandleUnexpectedExceptionForGetShardIterator() {
- shouldHandleGetShardIteratorError(new NullPointerException(),
- RuntimeException.class);
- }
-
- private void shouldHandleGetShardIteratorError(
- Exception thrownException,
- Class<? extends Exception> expectedExceptionClass) {
- GetShardIteratorRequest request = new GetShardIteratorRequest()
- .withStreamName(STREAM)
- .withShardId(SHARD_1)
- .withShardIteratorType(ShardIteratorType.LATEST);
-
- given(kinesis.getShardIterator(request)).willThrow(thrownException);
-
- try {
- underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
- failBecauseExceptionWasNotThrown(expectedExceptionClass);
- } catch (Exception e) {
- assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
- } finally {
- reset(kinesis);
- }
- }
-
- @Test
- public void shouldListAllShards() throws Exception {
- Shard shard1 = new Shard().withShardId(SHARD_1);
- Shard shard2 = new Shard().withShardId(SHARD_2);
- Shard shard3 = new Shard().withShardId(SHARD_3);
- given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
- .withStreamDescription(new StreamDescription()
- .withShards(shard1, shard2)
- .withHasMoreShards(true)));
- given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
- .withStreamDescription(new StreamDescription()
- .withShards(shard3)
- .withHasMoreShards(false)));
-
- List<Shard> shards = underTest.listShards(STREAM);
-
- assertThat(shards).containsOnly(shard1, shard2, shard3);
- }
-
- @Test
- public void shouldHandleExpiredIterationExceptionForShardListing() {
- shouldHandleShardListingError(new ExpiredIteratorException(""),
- ExpiredIteratorException.class);
- }
-
- @Test
- public void shouldHandleLimitExceededExceptionForShardListing() {
- shouldHandleShardListingError(new LimitExceededException(""),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
- shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleServiceErrorForShardListing() {
- shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
- TransientKinesisException.class);
- }
-
- @Test
- public void shouldHandleClientErrorForShardListing() {
- shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
- RuntimeException.class);
- }
-
- @Test
- public void shouldHandleUnexpectedExceptionForShardListing() {
- shouldHandleShardListingError(new NullPointerException(),
- RuntimeException.class);
- }
-
- private void shouldHandleShardListingError(
- Exception thrownException,
- Class<? extends Exception> expectedExceptionClass) {
- given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
- try {
- underTest.listShards(STREAM);
- failBecauseExceptionWasNotThrown(expectedExceptionClass);
- } catch (Exception e) {
- assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
- } finally {
- reset(kinesis);
- }
- }
-
- private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
- AmazonServiceException exception = new AmazonServiceException("");
- exception.setErrorType(errorType);
- return exception;
- }
+ @Test
+ public void shouldHandleServiceErrorForShardListing() {
+ shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleClientErrorForShardListing() {
+ shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
+ RuntimeException.class);
+ }
+
+ @Test
+ public void shouldHandleUnexpectedExceptionForShardListing() {
+ shouldHandleShardListingError(new NullPointerException(),
+ RuntimeException.class);
+ }
+
+ private void shouldHandleShardListingError(
+ Exception thrownException,
+ Class<? extends Exception> expectedExceptionClass) {
+ given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
+ try {
+ underTest.listShards(STREAM);
+ failBecauseExceptionWasNotThrown(expectedExceptionClass);
+ } catch (Exception e) {
+ assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+ } finally {
+ reset(kinesis);
+ }
+ }
+
+ private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
+ AmazonServiceException exception = new AmazonServiceException("");
+ exception.setErrorType(errorType);
+ return exception;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index d93cc41..912e20c 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index 5b5412c..b63775d 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -117,7 +117,7 @@ import org.joda.time.Instant;
* to the file separated with line feeds.
* </p>
*/
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
public class MongoDbGridFSIO {
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 3b14182..620df74 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -18,13 +18,12 @@
package org.apache.beam.sdk.io.mongodb;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
-import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
@@ -94,27 +93,19 @@ import org.slf4j.LoggerFactory;
*
* }</pre>
*/
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
public class MongoDbIO {
private static final Logger LOG = LoggerFactory.getLogger(MongoDbIO.class);
/** Read data from MongoDB. */
public static Read read() {
- return new AutoValue_MongoDbIO_Read.Builder()
- .setKeepAlive(true)
- .setMaxConnectionIdleTime(60000)
- .setNumSplits(0)
- .build();
+ return new AutoValue_MongoDbIO_Read.Builder().setNumSplits(0).build();
}
/** Write data to MongoDB. */
public static Write write() {
- return new AutoValue_MongoDbIO_Write.Builder()
- .setKeepAlive(true)
- .setMaxConnectionIdleTime(60000)
- .setBatchSize(1024L)
- .build();
+ return new AutoValue_MongoDbIO_Write.Builder().setBatchSize(1024L).build();
}
private MongoDbIO() {
@@ -126,20 +117,16 @@ public class MongoDbIO {
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<Document>> {
@Nullable abstract String uri();
- abstract boolean keepAlive();
- abstract int maxConnectionIdleTime();
@Nullable abstract String database();
@Nullable abstract String collection();
@Nullable abstract String filter();
abstract int numSplits();
- abstract Builder builder();
+ abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setUri(String uri);
- abstract Builder setKeepAlive(boolean keepAlive);
- abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime);
abstract Builder setDatabase(String database);
abstract Builder setCollection(String collection);
abstract Builder setFilter(String filter);
@@ -148,94 +135,31 @@ public class MongoDbIO {
}
/**
- * Define the location of the MongoDB instances using an URI. The URI describes the hosts to
- * be used and some options.
- *
- * <p>The format of the URI is:
- *
- * <pre>{@code
- * mongodb://[username:password@]host1[:port1]...[,hostN[:portN]]][/[database][?options]]
- * }</pre>
- *
- * <p>Where:
- * <ul>
- * <li>{@code mongodb://} is a required prefix to identify that this is a string in the
- * standard connection format.</li>
- * <li>{@code username:password@} are optional. If given, the driver will attempt to
- * login to a database after connecting to a database server. For some authentication
- * mechanisms, only the username is specified and the password is not, in which case
- * the ":" after the username is left off as well.</li>
- * <li>{@code host1} is the only required part of the URI. It identifies a server
- * address to connect to.</li>
- * <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li>
- * <li>{@code /database} is the name of the database to login to and thus is only
- * relevant if the {@code username:password@} syntax is used. If not specified, the
- * "admin" database will be used by default. It has to be equivalent with the database
- * you specific with {@link Read#withDatabase(String)}.</li>
- * <li>{@code ?options} are connection options. Note that if {@code database} is absent
- * there is still a {@code /} required between the last {@code host} and the {@code ?}
- * introducing the options. Options are name=value pairs and the pairs are separated by
- * "{@code &}". The {@code KeepAlive} connection option can't be passed via the URI,
- * instead you have to use {@link Read#withKeepAlive(boolean)}. Same for the
- * {@code MaxConnectionIdleTime} connection option via
- * {@link Read#withMaxConnectionIdleTime(int)}.
- * </li>
- * </ul>
+ * Example documentation for withUri.
*/
public Read withUri(String uri) {
- checkArgument(uri != null, "MongoDbIO.read().withUri(uri) called with null uri");
- return builder().setUri(uri).build();
- }
-
- /**
- * Sets whether socket keep alive is enabled.
- */
- public Read withKeepAlive(boolean keepAlive) {
- return builder().setKeepAlive(keepAlive).build();
- }
-
- /**
- * Sets the maximum idle time for a pooled connection.
- */
- public Read withMaxConnectionIdleTime(int maxConnectionIdleTime) {
- return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
+ checkNotNull(uri);
+ return toBuilder().setUri(uri).build();
}
- /**
- * Sets the database to use.
- */
public Read withDatabase(String database) {
- checkArgument(database != null, "MongoDbIO.read().withDatabase(database) called with null"
- + " database");
- return builder().setDatabase(database).build();
+ checkNotNull(database);
+ return toBuilder().setDatabase(database).build();
}
- /**
- * Sets the collection to consider in the database.
- */
public Read withCollection(String collection) {
- checkArgument(collection != null, "MongoDbIO.read().withCollection(collection) called "
- + "with null collection");
- return builder().setCollection(collection).build();
+ checkNotNull(collection);
+ return toBuilder().setCollection(collection).build();
}
- /**
- * Sets a filter on the documents in a collection.
- */
public Read withFilter(String filter) {
- checkArgument(filter != null, "MongoDbIO.read().withFilter(filter) called with null "
- + "filter");
- return builder().setFilter(filter).build();
+ checkNotNull(filter);
+ return toBuilder().setFilter(filter).build();
}
- /**
- * Sets the user defined number of splits.
- */
public Read withNumSplits(int numSplits) {
- checkArgument(numSplits >= 0, "MongoDbIO.read().withNumSplits(numSplits) called with "
- + "invalid number. The number of splits has to be a positive value (currently %d)",
- numSplits);
- return builder().setNumSplits(numSplits).build();
+ checkArgument(numSplits >= 0);
+ return toBuilder().setNumSplits(numSplits).build();
}
@Override
@@ -245,19 +169,15 @@ public class MongoDbIO {
@Override
public void validate(PipelineOptions options) {
- checkState(uri() != null, "MongoDbIO.read() requires an URI to be set via withUri(uri)");
- checkState(database() != null, "MongoDbIO.read() requires a database to be set via "
- + "withDatabase(database)");
- checkState(collection() != null, "MongoDbIO.read() requires a collection to be set via "
- + "withCollection(collection)");
+ checkNotNull(uri(), "uri");
+ checkNotNull(database(), "database");
+ checkNotNull(collection(), "collection");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("uri", uri()));
- builder.add(DisplayData.item("keepAlive", keepAlive()));
- builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime()));
builder.add(DisplayData.item("database", database()));
builder.add(DisplayData.item("collection", collection()));
builder.addIfNotNull(DisplayData.item("filter", filter()));
@@ -298,71 +218,61 @@ public class MongoDbIO {
@Override
public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
- try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) {
- return getEstimatedSizeBytes(mongoClient, spec.database(), spec.collection());
- }
- }
-
- private long getEstimatedSizeBytes(MongoClient mongoClient,
- String database,
- String collection) {
- MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
+ MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
// get the Mongo collStats object
// it gives the size for the entire collection
BasicDBObject stat = new BasicDBObject();
- stat.append("collStats", collection);
+ stat.append("collStats", spec.collection());
Document stats = mongoDatabase.runCommand(stat);
-
return stats.get("size", Number.class).longValue();
}
@Override
public List<BoundedSource<Document>> split(long desiredBundleSizeBytes,
PipelineOptions options) {
- try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) {
- MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
-
- List<Document> splitKeys;
- if (spec.numSplits() > 0) {
- // the user defines his desired number of splits
- // calculate the batch size
- long estimatedSizeBytes = getEstimatedSizeBytes(mongoClient,
- spec.database(), spec.collection());
- desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits();
- }
-
- // the desired batch size is small, using default chunk size of 1MB
- if (desiredBundleSizeBytes < 1024 * 1024) {
- desiredBundleSizeBytes = 1 * 1024 * 1024;
- }
-
- // now we have the batch size (provided by user or provided by the runner)
- // we use Mongo splitVector command to get the split keys
- BasicDBObject splitVectorCommand = new BasicDBObject();
- splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection());
- splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
- splitVectorCommand.append("force", false);
- // maxChunkSize is the Mongo partition size in MB
- LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024);
- splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024);
- Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
- splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
-
- List<BoundedSource<Document>> sources = new ArrayList<>();
- if (splitKeys.size() < 1) {
- LOG.debug("Split keys is low, using an unique source");
- sources.add(this);
- return sources;
- }
+ MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());
+
+ List<Document> splitKeys;
+ if (spec.numSplits() > 0) {
+ // the user defines his desired number of splits
+ // calculate the batch size
+ long estimatedSizeBytes = getEstimatedSizeBytes(options);
+ desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits();
+ }
- LOG.debug("Number of splits is {}", splitKeys.size());
- for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) {
- sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter)));
- }
+ // the desired batch size is small, using default chunk size of 1MB
+ if (desiredBundleSizeBytes < 1024 * 1024) {
+ desiredBundleSizeBytes = 1 * 1024 * 1024;
+ }
+ // now we have the batch size (provided by user or provided by the runner)
+ // we use Mongo splitVector command to get the split keys
+ BasicDBObject splitVectorCommand = new BasicDBObject();
+ splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection());
+ splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1));
+ splitVectorCommand.append("force", false);
+ // maxChunkSize is the Mongo partition size in MB
+ LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024);
+ splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024);
+ Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand);
+ splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys");
+
+ List<BoundedSource<Document>> sources = new ArrayList<>();
+ if (splitKeys.size() < 1) {
+ LOG.debug("Split keys is low, using an unique source");
+ sources.add(this);
return sources;
}
+
+ LOG.debug("Number of splits is {}", splitKeys.size());
+ for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) {
+ sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter)));
+ }
+
+ return sources;
}
/**
@@ -457,10 +367,7 @@ public class MongoDbIO {
@Override
public boolean start() {
Read spec = source.spec;
- MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder();
- optionsBuilder.maxConnectionIdleTime(spec.maxConnectionIdleTime());
- optionsBuilder.socketKeepAlive(spec.keepAlive());
- client = new MongoClient(new MongoClientURI(spec.uri(), optionsBuilder));
+ client = new MongoClient(new MongoClientURI(spec.uri()));
MongoDatabase mongoDatabase = client.getDatabase(spec.database());
@@ -519,106 +426,36 @@ public class MongoDbIO {
*/
@AutoValue
public abstract static class Write extends PTransform<PCollection<Document>, PDone> {
-
@Nullable abstract String uri();
- abstract boolean keepAlive();
- abstract int maxConnectionIdleTime();
@Nullable abstract String database();
@Nullable abstract String collection();
abstract long batchSize();
- abstract Builder builder();
+ abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setUri(String uri);
- abstract Builder setKeepAlive(boolean keepAlive);
- abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime);
abstract Builder setDatabase(String database);
abstract Builder setCollection(String collection);
abstract Builder setBatchSize(long batchSize);
abstract Write build();
}
- /**
- * Define the location of the MongoDB instances using an URI. The URI describes the hosts to
- * be used and some options.
- *
- * <p>The format of the URI is:
- *
- * <pre>{@code
- * mongodb://[username:password@]host1[:port1],...[,hostN[:portN]]][/[database][?options]]
- * }</pre>
- *
- * <p>Where:
- * <ul>
- * <li>{@code mongodb://} is a required prefix to identify that this is a string in the
- * standard connection format.</li>
- * <li>{@code username:password@} are optional. If given, the driver will attempt to
- * login to a database after connecting to a database server. For some authentication
- * mechanisms, only the username is specified and the password is not, in which case
- * the ":" after the username is left off as well.</li>
- * <li>{@code host1} is the only required part of the URI. It identifies a server
- * address to connect to.</li>
- * <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li>
- * <li>{@code /database} is the name of the database to login to and thus is only
- * relevant if the {@code username:password@} syntax is used. If not specified, the
- * "admin" database will be used by default. It has to be equivalent with the database
- * you specific with {@link Write#withDatabase(String)}.</li>
- * <li>{@code ?options} are connection options. Note that if {@code database} is absent
- * there is still a {@code /} required between the last {@code host} and the {@code ?}
- * introducing the options. Options are name=value pairs and the pairs are separated by
- * "{@code &}". The {@code KeepAlive} connection option can't be passed via the URI, instead
- * you have to use {@link Write#withKeepAlive(boolean)}. Same for the
- * {@code MaxConnectionIdleTime} connection option via
- * {@link Write#withMaxConnectionIdleTime(int)}.
- * </li>
- * </ul>
- */
public Write withUri(String uri) {
- checkArgument(uri != null, "MongoDbIO.write().withUri(uri) called with null uri");
- return builder().setUri(uri).build();
- }
-
- /**
- * Sets whether socket keep alive is enabled.
- */
- public Write withKeepAlive(boolean keepAlive) {
- return builder().setKeepAlive(keepAlive).build();
- }
-
- /**
- * Sets the maximum idle time for a pooled connection.
- */
- public Write withMaxConnectionIdleTime(int maxConnectionIdleTime) {
- return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
+ return toBuilder().setUri(uri).build();
}
- /**
- * Sets the database to use.
- */
public Write withDatabase(String database) {
- checkArgument(database != null, "MongoDbIO.write().withDatabase(database) called with "
- + "null database");
- return builder().setDatabase(database).build();
+ return toBuilder().setDatabase(database).build();
}
- /**
- * Sets the collection where to write data in the database.
- */
public Write withCollection(String collection) {
- checkArgument(collection != null, "MongoDbIO.write().withCollection(collection) called "
- + "with null collection");
- return builder().setCollection(collection).build();
+ return toBuilder().setCollection(collection).build();
}
- /**
- * Define the size of the batch to group write operations.
- */
public Write withBatchSize(long batchSize) {
- checkArgument(batchSize >= 0, "MongoDbIO.write().withBatchSize(batchSize) called with "
- + "invalid batch size. Batch size has to be >= 0 (currently %d)", batchSize);
- return builder().setBatchSize(batchSize).build();
+ return toBuilder().setBatchSize(batchSize).build();
}
@Override
@@ -629,21 +466,10 @@ public class MongoDbIO {
@Override
public void validate(PipelineOptions options) {
- checkState(uri() != null, "MongoDbIO.write() requires an URI to be set via withUri(uri)");
- checkState(database() != null, "MongoDbIO.write() requires a database to be set via "
- + "withDatabase(database)");
- checkState(collection() != null, "MongoDbIO.write() requires a collection to be set via "
- + "withCollection(collection)");
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.add(DisplayData.item("uri", uri()));
- builder.add(DisplayData.item("keepAlive", keepAlive()));
- builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime()));
- builder.add(DisplayData.item("database", database()));
- builder.add(DisplayData.item("collection", collection()));
- builder.add(DisplayData.item("batchSize", batchSize()));
+ checkNotNull(uri(), "uri");
+ checkNotNull(database(), "database");
+ checkNotNull(collection(), "collection");
+ checkNotNull(batchSize(), "batchSize");
}
private static class WriteFn extends DoFn<Document, Void> {
@@ -657,10 +483,7 @@ public class MongoDbIO {
@Setup
public void createMongoClient() throws Exception {
- MongoClientOptions.Builder builder = new MongoClientOptions.Builder();
- builder.socketKeepAlive(spec.keepAlive());
- builder.maxConnectionIdleTime(spec.maxConnectionIdleTime());
- client = new MongoClient(new MongoClientURI(spec.uri(), builder));
+ client = new MongoClient(new MongoClientURI(spec.uri()));
}
@StartBundle
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 67dbca4..cd26b48 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.mongodb;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
@@ -190,42 +189,6 @@ public class MongoDbIOTest implements Serializable {
}
@Test
- public void testReadWithCustomConnectionOptions() throws Exception {
- MongoDbIO.Read read = MongoDbIO.read()
- .withUri("mongodb://localhost:" + port)
- .withKeepAlive(false)
- .withMaxConnectionIdleTime(10)
- .withDatabase(DATABASE)
- .withCollection(COLLECTION);
- assertFalse(read.keepAlive());
- assertEquals(10, read.maxConnectionIdleTime());
-
- PCollection<Document> documents = pipeline.apply(read);
-
- PAssert.thatSingleton(documents.apply("Count All", Count.<Document>globally()))
- .isEqualTo(1000L);
-
- PAssert.that(documents
- .apply("Map Scientist", MapElements.via(new SimpleFunction<Document, KV<String, Void>>() {
- public KV<String, Void> apply(Document input) {
- return KV.of(input.getString("scientist"), null);
- }
- }))
- .apply("Count Scientist", Count.<String, Void>perKey())
- ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
- @Override
- public Void apply(Iterable<KV<String, Long>> input) {
- for (KV<String, Long> element : input) {
- assertEquals(100L, element.getValue().longValue());
- }
- return null;
- }
- });
-
- pipeline.run();
- }
-
- @Test
public void testReadWithFilter() throws Exception {
PCollection<Document> output = pipeline.apply(
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml
index 9fa1dc0..baaf771 100644
--- a/sdks/java/io/mqtt/pom.xml
+++ b/sdks/java/io/mqtt/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index add5cb5..228a85d 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -97,7 +97,7 @@ import org.slf4j.LoggerFactory;
*
* }</pre>
*/
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
public class MqttIO {
private static final Logger LOG = LoggerFactory.getLogger(MqttIO.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index b7909fa..44f3baa 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -32,8 +32,38 @@
<description>Beam SDK Java IO provides different connectivity components
(sources and sinks) to consume and produce data from systems.</description>
+ <properties>
+ <!--
+ This is the version of Hadoop used to compile the hadoop-common module.
+ This dependency is defined with a provided scope.
+ Users must supply their own Hadoop version at runtime.
+ -->
+ <hadoop.version>2.7.3</hadoop.version>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<modules>
- <module>amqp</module>
<module>cassandra</module>
<module>common</module>
<module>elasticsearch</module>
@@ -42,7 +72,6 @@
<module>hadoop-file-system</module>
<module>hadoop</module>
<module>hbase</module>
- <module>hcatalog</module>
<module>jdbc</module>
<module>jms</module>
<module>kafka</module>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/pom.xml b/sdks/java/io/xml/pom.xml
index 7b5804e..cf7dd33 100644
--- a/sdks/java/io/xml/pom.xml
+++ b/sdks/java/io/xml/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index 442fba5..7255a94 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -522,8 +521,7 @@ public class XmlIO {
@Override
public PDone expand(PCollection<T> input) {
- return input.apply(
- org.apache.beam.sdk.io.WriteFiles.to(createSink(), SerializableFunctions.<T>identity()));
+ return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink()));
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 74e0bda..6ae83f2 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -25,7 +25,6 @@ import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
-import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.ShardNameTemplate;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -35,18 +34,18 @@ import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.MimeTypes;
/** Implementation of {@link XmlIO#write}. */
-class XmlSink<T> extends FileBasedSink<T, Void> {
+class XmlSink<T> extends FileBasedSink<T> {
private static final String XML_EXTENSION = ".xml";
private final XmlIO.Write<T> spec;
- private static <T> DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<T> spec) {
- return DefaultFilenamePolicy.fromStandardParameters(
+ private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) {
+ return DefaultFilenamePolicy.constructUsingStandardParameters(
spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION, false);
}
XmlSink(XmlIO.Write<T> spec) {
- super(spec.getFilenamePrefix(), DynamicFileDestinations.constant(makeFilenamePolicy(spec)));
+ super(spec.getFilenamePrefix(), makeFilenamePolicy(spec));
this.spec = spec;
}
@@ -76,8 +75,10 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
super.populateDisplayData(builder);
}
- /** {@link WriteOperation} for XML {@link FileBasedSink}s. */
- protected static final class XmlWriteOperation<T> extends WriteOperation<T, Void> {
+ /**
+ * {@link WriteOperation} for XML {@link FileBasedSink}s.
+ */
+ protected static final class XmlWriteOperation<T> extends WriteOperation<T> {
public XmlWriteOperation(XmlSink<T> sink) {
super(sink);
}
@@ -111,8 +112,10 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
}
}
- /** A {@link Writer} that can write objects as XML elements. */
- protected static final class XmlWriter<T> extends Writer<T, Void> {
+ /**
+ * A {@link Writer} that can write objects as XML elements.
+ */
+ protected static final class XmlWriter<T> extends Writer<T> {
final Marshaller marshaller;
private OutputStream os = null;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
index d1584dc..aa0c1c3 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -197,8 +197,8 @@ public class XmlSinkTest {
.withRecordClass(Integer.class);
DisplayData displayData = DisplayData.from(write);
- assertThat(
- displayData, hasDisplayItem("filenamePattern", "/path/to/file-SSSSS-of-NNNNN" + ".xml"));
+
+ assertThat(displayData, hasDisplayItem("filenamePattern", "file-SSSSS-of-NNNNN.xml"));
assertThat(displayData, hasDisplayItem("rootElement", "bird"));
assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index 2378014..b90a757 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index 51109fb..54dae3a 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
@@ -99,16 +99,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-amqp</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-cassandra</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-elasticsearch</artifactId>
</dependency>
@@ -134,11 +124,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-hcatalog</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
</dependency>
@@ -211,11 +196,13 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
+ <version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples-java8/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/pom.xml b/sdks/java/maven-archetypes/examples-java8/pom.xml
index b60a695..b57644d 100644
--- a/sdks/java/maven-archetypes/examples-java8/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
index 4517861..af4fbd3 100644
--- a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
@@ -242,6 +242,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml
index 2a02039..c1378cb 100644
--- a/sdks/java/maven-archetypes/examples/pom.xml
+++ b/sdks/java/maven-archetypes/examples/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index d039ddb..b8b9c9f 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -241,6 +241,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/pom.xml b/sdks/java/maven-archetypes/pom.xml
index d676b31..b7fe274 100644
--- a/sdks/java/maven-archetypes/pom.xml
+++ b/sdks/java/maven-archetypes/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 8024b52..06b41c8 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-maven-archetypes-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
index 6056fb0..60405e6 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
@@ -28,7 +28,7 @@
<beam.version>@project.version@</beam.version>
<maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version>
- <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
+ <maven-exec-plugin.version>1.4.0</maven-exec-plugin.version>
<slf4j.version>1.7.14</slf4j.version>
</properties>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 3144193..250c85a 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/pom.xml b/sdks/pom.xml
index aec8762..27b9610 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 2670250..10298bf 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -710,10 +710,6 @@ class WindowedValueCoderImpl(StreamCoderImpl):
timestamp = MAX_TIMESTAMP.micros
else:
timestamp *= 1000
- if timestamp > MAX_TIMESTAMP.micros:
- timestamp = MAX_TIMESTAMP.micros
- if timestamp < MIN_TIMESTAMP.micros:
- timestamp = MIN_TIMESTAMP.micros
windows = self._windows_coder.decode_from_stream(in_stream, True)
# Read PaneInfo encoded byte.
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index c56ef52..f40045d 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -25,7 +25,6 @@ import cPickle as pickle
import google.protobuf
from apache_beam.coders import coder_impl
-from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.utils import urns
from apache_beam.utils import proto_utils
@@ -206,6 +205,7 @@ class Coder(object):
"""For internal use only; no backwards-compatibility guarantees.
"""
# TODO(BEAM-115): Use specialized URNs and components.
+ from apache_beam.runners.api import beam_runner_api_pb2
return beam_runner_api_pb2.Coder(
spec=beam_runner_api_pb2.SdkFunctionSpec(
spec=beam_runner_api_pb2.FunctionSpec(
@@ -286,11 +286,6 @@ class BytesCoder(FastCoder):
def is_deterministic(self):
return True
- def as_cloud_object(self):
- return {
- '@type': 'kind:bytes',
- }
-
def __eq__(self, other):
return type(self) == type(other)
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 577c53a..c9b67b3 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -23,8 +23,6 @@ import unittest
import dill
-from apache_beam.transforms.window import GlobalWindow
-from apache_beam.utils.timestamp import MIN_TIMESTAMP
import observable
from apache_beam.transforms import window
from apache_beam.utils import timestamp
@@ -289,12 +287,6 @@ class CodersTest(unittest.TestCase):
# Test binary representation
self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01',
coder.encode(window.GlobalWindows.windowed_value(1)))
-
- # Test decoding large timestamp
- self.assertEqual(
- coder.decode('\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'),
- windowed_value.create(0, MIN_TIMESTAMP.micros, (GlobalWindow(),)))
-
# Test unnested
self.check_coder(
coders.WindowedValueCoder(coders.VarIntCoder()),
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 31f71b3..9183d0d 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -589,22 +589,6 @@ class SnippetsTest(unittest.TestCase):
snippets.model_textio_compressed(
{'read': gzip_file_name}, ['aa', 'bb', 'cc'])
- def test_model_textio_gzip_concatenated(self):
- temp_path_1 = self.create_temp_file('a\nb\nc\n')
- temp_path_2 = self.create_temp_file('p\nq\nr\n')
- temp_path_3 = self.create_temp_file('x\ny\nz')
- gzip_file_name = temp_path_1 + '.gz'
- with open(temp_path_1) as src, gzip.open(gzip_file_name, 'wb') as dst:
- dst.writelines(src)
- with open(temp_path_2) as src, gzip.open(gzip_file_name, 'ab') as dst:
- dst.writelines(src)
- with open(temp_path_3) as src, gzip.open(gzip_file_name, 'ab') as dst:
- dst.writelines(src)
- # Add the temporary gzip file to be cleaned up as well.
- self.temp_files.append(gzip_file_name)
- snippets.model_textio_compressed(
- {'read': gzip_file_name}, ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z'])
-
@unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed')
def test_model_datastoreio(self):
# We cannot test datastoreio functionality in unit tests therefore we limit
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 7696d77..ed8b5d0 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -25,44 +25,35 @@ from __future__ import absolute_import
import argparse
import logging
+import re
import apache_beam as beam
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.options.pipeline_options import StandardOptions
import apache_beam.transforms.window as window
-def split_fn(lines):
- import re
- return re.findall(r'[A-Za-z\']+', lines)
-
-
def run(argv=None):
"""Build and run the pipeline."""
+
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic', required=True,
- help=('Input PubSub topic of the form '
- '"projects/<PROJECT>/topics/<TOPIC>".'))
+ help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
parser.add_argument(
'--output_topic', required=True,
- help=('Output PubSub topic of the form '
- '"projects/<PROJECT>/topic/<TOPIC>".'))
+ help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
known_args, pipeline_args = parser.parse_known_args(argv)
- options = PipelineOptions(pipeline_args)
- options.view_as(StandardOptions).streaming = True
- with beam.Pipeline(options=options) as p:
+ with beam.Pipeline(argv=pipeline_args) as p:
- # Read from PubSub into a PCollection.
+ # Read the text file[pattern] into a PCollection.
lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
# Capitalize the characters in each line.
transformed = (lines
- # Use a pre-defined function that imports the re package.
| 'Split' >> (
- beam.FlatMap(split_fn).with_output_types(unicode))
+ beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+ .with_output_types(unicode))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
| 'Group' >> beam.GroupByKey()
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
deleted file mode 100644
index bd57847..0000000
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A streaming word-counting workflow.
-
-Important: streaming pipeline support in Python Dataflow is in development
-and is not yet available for use.
-"""
-
-from __future__ import absolute_import
-
-import argparse
-import logging
-
-
-import apache_beam as beam
-import apache_beam.transforms.window as window
-
-TABLE_SCHEMA = ('word:STRING, count:INTEGER, '
- 'window_start:TIMESTAMP, window_end:TIMESTAMP')
-
-
-def find_words(element):
- import re
- return re.findall(r'[A-Za-z\']+', element)
-
-
-class FormatDoFn(beam.DoFn):
- def process(self, element, window=beam.DoFn.WindowParam):
- ts_format = '%Y-%m-%d %H:%M:%S.%f UTC'
- window_start = window.start.to_utc_datetime().strftime(ts_format)
- window_end = window.end.to_utc_datetime().strftime(ts_format)
- return [{'word': element[0],
- 'count': element[1],
- 'window_start':window_start,
- 'window_end':window_end}]
-
-
-def run(argv=None):
- """Build and run the pipeline."""
-
- parser = argparse.ArgumentParser()
- parser.add_argument(
- '--input_topic', required=True,
- help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
- parser.add_argument(
- '--output_table', required=True,
- help=
- ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
- 'or DATASET.TABLE.'))
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- with beam.Pipeline(argv=pipeline_args) as p:
-
- # Read the text from PubSub messages
- lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
-
- # Capitalize the characters in each line.
- transformed = (lines
- | 'Split' >> (beam.FlatMap(find_words)
- .with_output_types(unicode))
- | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
- | beam.WindowInto(window.FixedWindows(2*60, 0))
- | 'Group' >> beam.GroupByKey()
- | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
- | 'Format' >> beam.ParDo(FormatDoFn()))
-
- # Write to BigQuery.
- # pylint: disable=expression-not-assigned
- transformed | 'Write' >> beam.io.WriteToBigQuery(
- known_args.output_table,
- schema=TABLE_SCHEMA,
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- run()
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index 1f65d0a..db6a1d0 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -26,8 +26,6 @@ import zlib
import logging
import time
-from apache_beam.utils.plugin import BeamPlugin
-
logger = logging.getLogger(__name__)
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
@@ -201,14 +199,6 @@ class CompressedFile(object):
assert False, 'Possible file corruption.'
except EOFError:
pass # All is as expected!
- elif self._compression_type == CompressionTypes.GZIP:
- # If Gzip file check if there is unused data generated by gzip concat
- if self._decompressor.unused_data != '':
- buf = self._decompressor.unused_data
- self._decompressor = zlib.decompressobj(self._gzip_mask)
- decompressed = self._decompressor.decompress(buf)
- self._read_buffer.write(decompressed)
- continue
else:
self._read_buffer.write(self._decompressor.flush())
@@ -419,7 +409,7 @@ class BeamIOError(IOError):
self.exception_details = exception_details
-class FileSystem(BeamPlugin):
+class FileSystem(object):
"""A class that defines the functions that can be performed on a filesystem.
All methods are abstract and they are for file system providers to
@@ -439,6 +429,16 @@ class FileSystem(BeamPlugin):
return compression_type
@classmethod
+ def get_all_subclasses(cls):
+ """Get all the subclasses of the FileSystem class
+ """
+ all_subclasses = []
+ for subclass in cls.__subclasses__():
+ all_subclasses.append(subclass)
+ all_subclasses.extend(subclass.get_all_subclasses())
+ return all_subclasses
+
+ @classmethod
def scheme(cls):
"""URI scheme for the FileSystem
"""
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 643fbc7..d43c8ba 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -31,7 +31,6 @@ import re
import threading
import time
import traceback
-import httplib2
from apache_beam.utils import retry
@@ -69,10 +68,6 @@ except ImportError:
# +---------------+------------+-------------+-------------+-------------+
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
-# This is the number of seconds the library will wait for GCS operations to
-# complete.
-DEFAULT_HTTP_TIMEOUT_SECONDS = 60
-
# This is the number of seconds the library will wait for a partial-file read
# operation from GCS to complete before retrying.
DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60
@@ -104,7 +99,6 @@ class GcsIO(object):
def __new__(cls, storage_client=None):
if storage_client:
- # This path is only used for testing.
return super(GcsIO, cls).__new__(cls, storage_client)
else:
# Create a single storage client for each thread. We would like to avoid
@@ -114,9 +108,7 @@ class GcsIO(object):
local_state = threading.local()
if getattr(local_state, 'gcsio_instance', None) is None:
credentials = auth.get_service_credentials()
- storage_client = storage.StorageV1(
- credentials=credentials,
- http=httplib2.Http(timeout=DEFAULT_HTTP_TIMEOUT_SECONDS))
+ storage_client = storage.StorageV1(credentials=credentials)
local_state.gcsio_instance = (
super(GcsIO, cls).__new__(cls, storage_client))
local_state.gcsio_instance.client = storage_client