You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:36 UTC
[09/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index 8aac417..91caded 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -52,7 +52,6 @@ public class GcpApiSurfaceTest {
@SuppressWarnings("unchecked")
final Set<Matcher<Class<?>>> allowedClasses =
ImmutableSet.of(
- classesInPackage("com.google.api.core"),
classesInPackage("com.google.api.client.googleapis"),
classesInPackage("com.google.api.client.http"),
classesInPackage("com.google.api.client.json"),
@@ -61,18 +60,9 @@ public class GcpApiSurfaceTest {
classesInPackage("com.google.auth"),
classesInPackage("com.google.bigtable.v2"),
classesInPackage("com.google.cloud.bigtable.config"),
- classesInPackage("com.google.spanner.v1"),
- Matchers.<Class<?>>equalTo(com.google.api.gax.grpc.ApiException.class),
Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableClusterName.class),
Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableInstanceName.class),
Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableTableName.class),
- Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.class),
- Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.Error.class),
- Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.ExceptionData.class),
- Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.ExceptionData.Builder
- .class),
- Matchers.<Class<?>>equalTo(com.google.cloud.RetryHelper.RetryHelperException.class),
- Matchers.<Class<?>>equalTo(com.google.cloud.grpc.BaseGrpcServiceException.class),
Matchers.<Class<?>>equalTo(com.google.cloud.ByteArray.class),
Matchers.<Class<?>>equalTo(com.google.cloud.Date.class),
Matchers.<Class<?>>equalTo(com.google.cloud.Timestamp.class),
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d31f3a0..bfd260a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -82,7 +82,6 @@ import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.BoundedSource;
@@ -132,7 +131,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index 91f0bae..a064bd6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigtable;
import com.google.bigtable.v2.Row;
import com.google.cloud.bigtable.config.BigtableOptions;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -42,10 +41,8 @@ public class BigtableReadIT {
BigtableTestOptions options = TestPipeline.testingPipelineOptions()
.as(BigtableTestOptions.class);
- String project = options.as(GcpOptions.class).getProject();
-
BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
- .setProjectId(project)
+ .setProjectId(options.getProjectId())
.setInstanceId(options.getInstanceId());
final String tableId = "BigtableReadTest";
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
index 03cb697..0ab7576 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
@@ -25,6 +25,11 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
* Properties needed when using Bigtable with the Beam SDK.
*/
public interface BigtableTestOptions extends TestPipelineOptions {
+ @Description("Project ID for Bigtable")
+ @Default.String("apache-beam-testing")
+ String getProjectId();
+ void setProjectId(String value);
+
@Description("Instance ID for Bigtable")
@Default.String("beam-test")
String getInstanceId();
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 010bcc4..1d168f1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -73,17 +73,15 @@ public class BigtableWriteIT implements Serializable {
private static BigtableTableAdminClient tableAdminClient;
private final String tableId =
String.format("BigtableWriteIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date());
- private String project;
@Before
public void setup() throws Exception {
PipelineOptionsFactory.register(BigtableTestOptions.class);
options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);
- project = options.as(GcpOptions.class).getProject();
bigtableOptions =
new Builder()
- .setProjectId(project)
+ .setProjectId(options.getProjectId())
.setInstanceId(options.getInstanceId())
.setUserAgent("apache-beam-test")
.build();
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
deleted file mode 100644
index c12cf55..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.hamcrest.Matchers.closeTo;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-
-import java.util.Random;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/**
- * Tests for {@link AdaptiveThrottler}.
- */
-@RunWith(JUnit4.class)
-public class AdaptiveThrottlerTest {
-
- static final long START_TIME_MS = 0;
- static final long SAMPLE_PERIOD_MS = 60000;
- static final long SAMPLE_BUCKET_MS = 1000;
- static final double OVERLOAD_RATIO = 2;
-
- /** Returns a throttler configured with the standard parameters above. */
- AdaptiveThrottler getThrottler() {
- return new AdaptiveThrottler(SAMPLE_PERIOD_MS, SAMPLE_BUCKET_MS, OVERLOAD_RATIO);
- }
-
- @Test
- public void testNoInitialThrottling() throws Exception {
- AdaptiveThrottler throttler = getThrottler();
- assertThat(throttler.throttlingProbability(START_TIME_MS), equalTo(0.0));
- assertThat("first request is not throttled",
- throttler.throttleRequest(START_TIME_MS), equalTo(false));
- }
-
- @Test
- public void testNoThrottlingIfNoErrors() throws Exception {
- AdaptiveThrottler throttler = getThrottler();
- long t = START_TIME_MS;
- for (; t < START_TIME_MS + 20; t++) {
- assertFalse(throttler.throttleRequest(t));
- throttler.successfulRequest(t);
- }
- assertThat(throttler.throttlingProbability(t), equalTo(0.0));
- }
-
- @Test
- public void testNoThrottlingAfterErrorsExpire() throws Exception {
- AdaptiveThrottler throttler = getThrottler();
- long t = START_TIME_MS;
- for (; t < START_TIME_MS + SAMPLE_PERIOD_MS; t++) {
- throttler.throttleRequest(t);
- // and no successfulRequest.
- }
- assertThat("check that we set up a non-zero probability of throttling",
- throttler.throttlingProbability(t), greaterThan(0.0));
- for (; t < START_TIME_MS + 2 * SAMPLE_PERIOD_MS; t++) {
- throttler.throttleRequest(t);
- throttler.successfulRequest(t);
- }
- assertThat(throttler.throttlingProbability(t), equalTo(0.0));
- }
-
- @Test
- public void testThrottlingAfterErrors() throws Exception {
- Random mockRandom = Mockito.mock(Random.class);
- Mockito.when(mockRandom.nextDouble()).thenReturn(
- 0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9,
- 0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9);
- AdaptiveThrottler throttler = new AdaptiveThrottler(
- SAMPLE_PERIOD_MS, SAMPLE_BUCKET_MS, OVERLOAD_RATIO, mockRandom);
- for (int i = 0; i < 20; i++) {
- boolean throttled = throttler.throttleRequest(START_TIME_MS + i);
- // 1/3rd of requests succeeding.
- if (i % 3 == 1) {
- throttler.successfulRequest(START_TIME_MS + i);
- }
-
- // Once we have some history in place, check what throttling happens.
- if (i >= 10) {
- // Expect 1/3rd of requests to be throttled. (So 1/3rd throttled, 1/3rd succeeding, 1/3rd
- // tried and failing).
- assertThat(String.format("for i=%d", i),
- throttler.throttlingProbability(START_TIME_MS + i), closeTo(0.33, /*error=*/ 0.1));
- // Requests 10..13 should be throttled, 14..19 not throttled given the mocked random numbers
- // that we fed to throttler.
- assertThat(String.format("for i=%d", i), throttled, equalTo(i < 14));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index a3f5d38..460049e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -27,7 +27,7 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes;
@@ -606,7 +606,7 @@ public class DatastoreV1Test {
/** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
@Test
public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
- datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100);
+ datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
}
/**
@@ -615,7 +615,7 @@ public class DatastoreV1Test {
*/
@Test
public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
- datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_ENTITIES_START * 2);
+ datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
}
// A helper method to test DatastoreWriterFn for various batch sizes.
@@ -628,14 +628,14 @@ public class DatastoreV1Test {
}
DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
- null, mockDatastoreFactory, new FakeWriteBatcher());
+ null, mockDatastoreFactory);
DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
doFnTester.processBundle(mutations);
int start = 0;
while (start < numMutations) {
- int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_ENTITIES_START);
+ int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
commitRequest.addAllMutations(mutations.subList(start, end));
@@ -651,28 +651,26 @@ public class DatastoreV1Test {
@Test
public void testDatatoreWriterFnWithLargeEntities() throws Exception {
List<Mutation> mutations = new ArrayList<>();
- int entitySize = 0;
+ int propertySize = 900_000;
for (int i = 0; i < 12; ++i) {
- Entity entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1))
- .putProperties("long", makeValue(new String(new char[900_000])
- ).setExcludeFromIndexes(true).build())
- .build();
- entitySize = entity.getSerializedSize(); // Take the size of any one entity.
- mutations.add(makeUpsert(entity).build());
+ Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1));
+ entity.putProperties("long", makeValue(new String(new char[propertySize])
+ ).setExcludeFromIndexes(true).build());
+ mutations.add(makeUpsert(entity.build()).build());
}
DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
- null, mockDatastoreFactory, new FakeWriteBatcher());
+ null, mockDatastoreFactory);
DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
doFnTester.processBundle(mutations);
// This test is over-specific currently; it requires that we split the 12 entity writes into 3
// requests, but we only need each CommitRequest to be less than 10MB in size.
- int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize;
+ int propertiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / propertySize;
int start = 0;
while (start < mutations.size()) {
- int end = Math.min(mutations.size(), start + entitiesPerRpc);
+ int end = Math.min(mutations.size(), start + propertiesPerRpc);
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
commitRequest.addAllMutations(mutations.subList(start, end));
@@ -783,7 +781,7 @@ public class DatastoreV1Test {
*/
@Test
public void testSplitQueryFnWithQueryLimit() throws Exception {
- Query queryWithLimit = QUERY.toBuilder()
+ Query queryWithLimit = QUERY.toBuilder().clone()
.setLimit(Int32Value.newBuilder().setValue(1))
.build();
@@ -896,50 +894,6 @@ public class DatastoreV1Test {
.apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
}
- @Test
- public void testWriteBatcherWithoutData() {
- DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
- writeBatcher.start();
- assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0));
- }
-
- @Test
- public void testWriteBatcherFastQueries() {
- DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
- writeBatcher.start();
- writeBatcher.addRequestLatency(0, 1000, 200);
- writeBatcher.addRequestLatency(0, 1000, 200);
- assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0));
- }
-
- @Test
- public void testWriteBatcherSlowQueries() {
- DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
- writeBatcher.start();
- writeBatcher.addRequestLatency(0, 10000, 200);
- writeBatcher.addRequestLatency(0, 10000, 200);
- assertEquals(100, writeBatcher.nextBatchSize(0));
- }
-
- @Test
- public void testWriteBatcherSizeNotBelowMinimum() {
- DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
- writeBatcher.start();
- writeBatcher.addRequestLatency(0, 30000, 50);
- writeBatcher.addRequestLatency(0, 30000, 50);
- assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0));
- }
-
- @Test
- public void testWriteBatcherSlidingWindow() {
- DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
- writeBatcher.start();
- writeBatcher.addRequestLatency(0, 30000, 50);
- writeBatcher.addRequestLatency(50000, 5000, 200);
- writeBatcher.addRequestLatency(100000, 5000, 200);
- assertEquals(200, writeBatcher.nextBatchSize(150000));
- }
-
/** Helper Methods */
/** A helper function that verifies if all the queries have unique keys. */
@@ -1079,24 +1033,8 @@ public class DatastoreV1Test {
private List<Query> splitQuery(Query query, int numSplits) {
List<Query> queries = new LinkedList<>();
for (int i = 0; i < numSplits; i++) {
- queries.add(query.toBuilder().build());
+ queries.add(query.toBuilder().clone().build());
}
return queries;
}
-
- /**
- * A WriteBatcher for unit tests, which does no timing-based adjustments (so unit tests have
- * consistent results).
- */
- static class FakeWriteBatcher implements DatastoreV1.WriteBatcher {
- @Override
- public void start() {}
- @Override
- public void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations) {
- }
- @Override
- public int nextBatchSize(long timeSinceEpochMillis) {
- return DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index cd61229..5e618df 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -374,7 +374,7 @@ class V1TestUtil {
// Read the next batch of query results.
private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
- Query.Builder query = this.query.toBuilder();
+ Query.Builder query = this.query.toBuilder().clone();
query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT));
if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
query.setStartCursor(currentBatch.getEndCursor());
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java
deleted file mode 100644
index 753d807..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
-
-import com.google.cloud.ServiceFactory;
-import com.google.cloud.spanner.DatabaseClient;
-import com.google.cloud.spanner.DatabaseId;
-import com.google.cloud.spanner.Spanner;
-import com.google.cloud.spanner.SpannerOptions;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.concurrent.GuardedBy;
-import org.mockito.Matchers;
-
-/**
- * A serialization friendly type service factory that maintains a mock {@link Spanner} and
- * {@link DatabaseClient}.
- * */
-class FakeServiceFactory
- implements ServiceFactory<Spanner, SpannerOptions>, Serializable {
-
- // Marked as static so they could be returned by serviceFactory, which is serializable.
- private static final Object lock = new Object();
-
- @GuardedBy("lock")
- private static final List<Spanner> mockSpanners = new ArrayList<>();
-
- @GuardedBy("lock")
- private static final List<DatabaseClient> mockDatabaseClients = new ArrayList<>();
-
- @GuardedBy("lock")
- private static int count = 0;
-
- private final int index;
-
- public FakeServiceFactory() {
- synchronized (lock) {
- index = count++;
- mockSpanners.add(mock(Spanner.class, withSettings().serializable()));
- mockDatabaseClients.add(mock(DatabaseClient.class, withSettings().serializable()));
- }
- when(mockSpanner().getDatabaseClient(Matchers.any(DatabaseId.class)))
- .thenReturn(mockDatabaseClient());
- }
-
- DatabaseClient mockDatabaseClient() {
- synchronized (lock) {
- return mockDatabaseClients.get(index);
- }
- }
-
- Spanner mockSpanner() {
- synchronized (lock) {
- return mockSpanners.get(index);
- }
- }
-
- @Override
- public Spanner create(SpannerOptions serviceOptions) {
- return mockSpanner();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
index 013b83d..03eb28e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
@@ -135,16 +135,4 @@ public class MutationSizeEstimatorTest {
assertThat(MutationSizeEstimator.sizeOf(timestampArray), is(24L));
assertThat(MutationSizeEstimator.sizeOf(dateArray), is(48L));
}
-
- @Test
- public void group() throws Exception {
- Mutation int64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
- Mutation float64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2.9).build();
- Mutation bool = Mutation.newInsertOrUpdateBuilder("test").set("one").to(false).build();
-
- MutationGroup group = MutationGroup.create(int64, float64, bool);
-
- assertThat(MutationSizeEstimator.sizeOf(group), is(17L));
- }
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java
deleted file mode 100644
index f479b4a..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import java.util.Random;
-
-/**
- * Useful randomness related utilities.
- */
-public class RandomUtils {
-
- private static final char[] ALPHANUMERIC = "1234567890abcdefghijklmnopqrstuvwxyz".toCharArray();
-
- private RandomUtils() {
- }
-
- public static String randomAlphaNumeric(int length) {
- Random random = new Random();
- char[] result = new char[length];
- for (int i = 0; i < length; i++) {
- result[i] = ALPHANUMERIC[random.nextInt(ALPHANUMERIC.length)];
- }
- return new String(result);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
deleted file mode 100644
index 5ba2da0..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.cloud.Timestamp;
-import com.google.cloud.spanner.KeySet;
-import com.google.cloud.spanner.ReadOnlyTransaction;
-import com.google.cloud.spanner.ResultSets;
-import com.google.cloud.spanner.Statement;
-import com.google.cloud.spanner.Struct;
-import com.google.cloud.spanner.TimestampBound;
-import com.google.cloud.spanner.Type;
-import com.google.cloud.spanner.Value;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/** Unit tests for {@link SpannerIO}. */
-@RunWith(JUnit4.class)
-public class SpannerIOReadTest implements Serializable {
- @Rule
- public final transient TestPipeline pipeline = TestPipeline.create();
- @Rule
- public final transient ExpectedException thrown = ExpectedException.none();
-
- private FakeServiceFactory serviceFactory;
- private ReadOnlyTransaction mockTx;
-
- private Type fakeType = Type.struct(Type.StructField.of("id", Type.int64()),
- Type.StructField.of("name", Type.string()));
-
- private List<Struct> fakeRows = Arrays.asList(
- Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(),
- Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build());
-
- @Before
- @SuppressWarnings("unchecked")
- public void setUp() throws Exception {
- serviceFactory = new FakeServiceFactory();
- mockTx = Mockito.mock(ReadOnlyTransaction.class);
- }
-
- @Test
- public void emptyTransform() throws Exception {
- SpannerIO.Read read = SpannerIO.read();
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("requires instance id to be set with");
- read.validate(null);
- }
-
- @Test
- public void emptyInstanceId() throws Exception {
- SpannerIO.Read read = SpannerIO.read().withDatabaseId("123");
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("requires instance id to be set with");
- read.validate(null);
- }
-
- @Test
- public void emptyDatabaseId() throws Exception {
- SpannerIO.Read read = SpannerIO.read().withInstanceId("123");
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("requires database id to be set with");
- read.validate(null);
- }
-
- @Test
- public void emptyQuery() throws Exception {
- SpannerIO.Read read =
- SpannerIO.read().withInstanceId("123").withDatabaseId("aaa").withTimestamp(Timestamp.now());
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("requires configuring query or read operation");
- read.validate(null);
- }
-
- @Test
- public void emptyColumns() throws Exception {
- SpannerIO.Read read =
- SpannerIO.read()
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
- .withTable("users");
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("requires a list of columns");
- read.validate(null);
- }
-
- @Test
- public void validRead() throws Exception {
- SpannerIO.Read read =
- SpannerIO.read()
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
- .withTable("users")
- .withColumns("id", "name", "email");
- read.validate(null);
- }
-
- @Test
- public void validQuery() throws Exception {
- SpannerIO.Read read =
- SpannerIO.read()
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
- .withQuery("SELECT * FROM users");
- read.validate(null);
- }
-
- @Test
- public void runQuery() throws Exception {
- SpannerIO.Read read =
- SpannerIO.read()
- .withProjectId("test")
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
- .withQuery("SELECT * FROM users")
- .withServiceFactory(serviceFactory);
-
- NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
- DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
-
- when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
- .thenReturn(mockTx);
- when(mockTx.executeQuery(any(Statement.class)))
- .thenReturn(ResultSets.forRows(fakeType, fakeRows));
-
- List<Struct> result = fnTester.processBundle(1);
- assertThat(result, Matchers.<Struct>iterableWithSize(2));
-
- verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound
- .strong());
- verify(mockTx).executeQuery(Statement.of("SELECT * FROM users"));
- }
-
- @Test
- public void runRead() throws Exception {
- SpannerIO.Read read =
- SpannerIO.read()
- .withProjectId("test")
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
- .withTable("users")
- .withColumns("id", "name")
- .withServiceFactory(serviceFactory);
-
- NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
- DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
-
- when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
- .thenReturn(mockTx);
- when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
- .thenReturn(ResultSets.forRows(fakeType, fakeRows));
-
- List<Struct> result = fnTester.processBundle(1);
- assertThat(result, Matchers.<Struct>iterableWithSize(2));
-
- verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong());
- verify(mockTx).read("users", KeySet.all(), Arrays.asList("id", "name"));
- }
-
- @Test
- public void runReadUsingIndex() throws Exception {
- SpannerIO.Read read =
- SpannerIO.read()
- .withProjectId("test")
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
- .withTable("users")
- .withColumns("id", "name")
- .withIndex("theindex")
- .withServiceFactory(serviceFactory);
-
- NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
- DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
-
- when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
- .thenReturn(mockTx);
- when(mockTx.readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name")))
- .thenReturn(ResultSets.forRows(fakeType, fakeRows));
-
- List<Struct> result = fnTester.processBundle(1);
- assertThat(result, Matchers.<Struct>iterableWithSize(2));
-
- verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong());
- verify(mockTx).readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name"));
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void readPipeline() throws Exception {
- Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
-
- PCollectionView<Transaction> tx = pipeline
- .apply("tx", SpannerIO.createTransaction()
- .withProjectId("test")
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withServiceFactory(serviceFactory));
-
- PCollection<Struct> one = pipeline.apply("read q", SpannerIO.read()
- .withProjectId("test")
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
- .withQuery("SELECT * FROM users")
- .withServiceFactory(serviceFactory)
- .withTransaction(tx));
- PCollection<Struct> two = pipeline.apply("read r", SpannerIO.read()
- .withProjectId("test")
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
- .withTable("users")
- .withColumns("id", "name")
- .withServiceFactory(serviceFactory)
- .withTransaction(tx));
-
- when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
- .thenReturn(mockTx);
-
- when(mockTx.executeQuery(Statement.of("SELECT 1"))).thenReturn(ResultSets.forRows(Type.struct(),
- Collections.<Struct>emptyList()));
-
- when(mockTx.executeQuery(Statement.of("SELECT * FROM users")))
- .thenReturn(ResultSets.forRows(fakeType, fakeRows));
- when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
- .thenReturn(ResultSets.forRows(fakeType, fakeRows));
- when(mockTx.getReadTimestamp()).thenReturn(timestamp);
-
- PAssert.that(one).containsInAnyOrder(fakeRows);
- PAssert.that(two).containsInAnyOrder(fakeRows);
-
- pipeline.run();
-
- verify(serviceFactory.mockDatabaseClient(), times(2))
- .readOnlyTransaction(TimestampBound.ofReadTimestamp(timestamp));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
new file mode 100644
index 0000000..5bdfea5
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.ServiceFactory;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerOptions;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import javax.annotation.concurrent.GuardedBy;
+
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+
+
+/**
+ * Unit tests for {@link SpannerIO}.
+ */
+@RunWith(JUnit4.class)
+public class SpannerIOTest implements Serializable {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ private FakeServiceFactory serviceFactory;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() throws Exception {
+ serviceFactory = new FakeServiceFactory();
+ }
+
+ @Test
+ public void emptyTransform() throws Exception {
+ SpannerIO.Write write = SpannerIO.write();
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("requires instance id to be set with");
+ write.validate(null);
+ }
+
+ @Test
+ public void emptyInstanceId() throws Exception {
+ SpannerIO.Write write = SpannerIO.write().withDatabaseId("123");
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("requires instance id to be set with");
+ write.validate(null);
+ }
+
+ @Test
+ public void emptyDatabaseId() throws Exception {
+ SpannerIO.Write write = SpannerIO.write().withInstanceId("123");
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("requires database id to be set with");
+ write.validate(null);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void singleMutationPipeline() throws Exception {
+ Mutation mutation = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2).build();
+ PCollection<Mutation> mutations = pipeline.apply(Create.of(mutation));
+
+ mutations.apply(
+ SpannerIO.write()
+ .withProjectId("test-project")
+ .withInstanceId("test-instance")
+ .withDatabaseId("test-database")
+ .withServiceFactory(serviceFactory));
+ pipeline.run();
+ verify(serviceFactory.mockSpanner())
+ .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+ verify(serviceFactory.mockDatabaseClient(), times(1))
+ .writeAtLeastOnce(argThat(new IterableOfSize(1)));
+ }
+
+ @Test
+ public void batching() throws Exception {
+ Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
+ Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+ SpannerIO.Write write =
+ SpannerIO.write()
+ .withProjectId("test-project")
+ .withInstanceId("test-instance")
+ .withDatabaseId("test-database")
+ .withBatchSizeBytes(1000000000)
+ .withServiceFactory(serviceFactory);
+ SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
+ DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+ fnTester.processBundle(Arrays.asList(one, two));
+
+ verify(serviceFactory.mockSpanner())
+ .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+ verify(serviceFactory.mockDatabaseClient(), times(1))
+ .writeAtLeastOnce(argThat(new IterableOfSize(2)));
+ }
+
+ @Test
+ public void batchingGroups() throws Exception {
+ Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
+ Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+ Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
+
+ // Have a room to accumulate one more item.
+ long batchSize = MutationSizeEstimator.sizeOf(one) + 1;
+
+ SpannerIO.Write write =
+ SpannerIO.write()
+ .withProjectId("test-project")
+ .withInstanceId("test-instance")
+ .withDatabaseId("test-database")
+ .withBatchSizeBytes(batchSize)
+ .withServiceFactory(serviceFactory);
+ SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
+ DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+ fnTester.processBundle(Arrays.asList(one, two, three));
+
+ verify(serviceFactory.mockSpanner())
+ .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+ verify(serviceFactory.mockDatabaseClient(), times(1))
+ .writeAtLeastOnce(argThat(new IterableOfSize(2)));
+ verify(serviceFactory.mockDatabaseClient(), times(1))
+ .writeAtLeastOnce(argThat(new IterableOfSize(1)));
+ }
+
+ @Test
+ public void noBatching() throws Exception {
+ Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
+ Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+ SpannerIO.Write write =
+ SpannerIO.write()
+ .withProjectId("test-project")
+ .withInstanceId("test-instance")
+ .withDatabaseId("test-database")
+ .withBatchSizeBytes(0) // turn off batching.
+ .withServiceFactory(serviceFactory);
+ SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
+ DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+ fnTester.processBundle(Arrays.asList(one, two));
+
+ verify(serviceFactory.mockSpanner())
+ .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+ verify(serviceFactory.mockDatabaseClient(), times(2))
+ .writeAtLeastOnce(argThat(new IterableOfSize(1)));
+ }
+
+ private static class FakeServiceFactory
+ implements ServiceFactory<Spanner, SpannerOptions>, Serializable {
+ // Marked as static so they could be returned by serviceFactory, which is serializable.
+ private static final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private static final List<Spanner> mockSpanners = new ArrayList<>();
+
+ @GuardedBy("lock")
+ private static final List<DatabaseClient> mockDatabaseClients = new ArrayList<>();
+
+ @GuardedBy("lock")
+ private static int count = 0;
+
+ private final int index;
+
+ public FakeServiceFactory() {
+ synchronized (lock) {
+ index = count++;
+ mockSpanners.add(mock(Spanner.class, withSettings().serializable()));
+ mockDatabaseClients.add(mock(DatabaseClient.class, withSettings().serializable()));
+ }
+ ApiFuture voidFuture = mock(ApiFuture.class, withSettings().serializable());
+ when(mockSpanner().getDatabaseClient(Matchers.any(DatabaseId.class)))
+ .thenReturn(mockDatabaseClient());
+ when(mockSpanner().closeAsync()).thenReturn(voidFuture);
+ }
+
+ DatabaseClient mockDatabaseClient() {
+ synchronized (lock) {
+ return mockDatabaseClients.get(index);
+ }
+ }
+
+ Spanner mockSpanner() {
+ synchronized (lock) {
+ return mockSpanners.get(index);
+ }
+ }
+
+ @Override
+ public Spanner create(SpannerOptions serviceOptions) {
+ return mockSpanner();
+ }
+ }
+
+ private static class IterableOfSize extends ArgumentMatcher<Iterable<Mutation>> {
+ private final int size;
+
+ private IterableOfSize(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public boolean matches(Object argument) {
+ return argument instanceof Iterable && Iterables.size((Iterable<?>) argument) == size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
deleted file mode 100644
index 09cdb8e..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.google.cloud.spanner.DatabaseId;
-import com.google.cloud.spanner.Mutation;
-import com.google.common.collect.Iterables;
-import java.io.Serializable;
-import java.util.Arrays;
-
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentMatcher;
-
-/**
- * Unit tests for {@link SpannerIO}.
- */
-@RunWith(JUnit4.class)
-public class SpannerIOWriteTest implements Serializable {
- @Rule public final transient TestPipeline pipeline = TestPipeline.create();
- @Rule public transient ExpectedException thrown = ExpectedException.none();
-
- private FakeServiceFactory serviceFactory;
-
- @Before
- @SuppressWarnings("unchecked")
- public void setUp() throws Exception {
- serviceFactory = new FakeServiceFactory();
- }
-
- @Test
- public void emptyTransform() throws Exception {
- SpannerIO.Write write = SpannerIO.write();
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("requires instance id to be set with");
- write.validate(null);
- }
-
- @Test
- public void emptyInstanceId() throws Exception {
- SpannerIO.Write write = SpannerIO.write().withDatabaseId("123");
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("requires instance id to be set with");
- write.validate(null);
- }
-
- @Test
- public void emptyDatabaseId() throws Exception {
- SpannerIO.Write write = SpannerIO.write().withInstanceId("123");
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("requires database id to be set with");
- write.validate(null);
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void singleMutationPipeline() throws Exception {
- Mutation mutation = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2).build();
- PCollection<Mutation> mutations = pipeline.apply(Create.of(mutation));
-
- mutations.apply(
- SpannerIO.write()
- .withProjectId("test-project")
- .withInstanceId("test-instance")
- .withDatabaseId("test-database")
- .withServiceFactory(serviceFactory));
- pipeline.run();
- verify(serviceFactory.mockSpanner())
- .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
- verify(serviceFactory.mockDatabaseClient(), times(1))
- .writeAtLeastOnce(argThat(new IterableOfSize(1)));
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void singleMutationGroupPipeline() throws Exception {
- Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
- Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
- Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
- PCollection<MutationGroup> mutations = pipeline
- .apply(Create.<MutationGroup>of(g(one, two, three)));
- mutations.apply(
- SpannerIO.write()
- .withProjectId("test-project")
- .withInstanceId("test-instance")
- .withDatabaseId("test-database")
- .withServiceFactory(serviceFactory)
- .grouped());
- pipeline.run();
- verify(serviceFactory.mockSpanner())
- .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
- verify(serviceFactory.mockDatabaseClient(), times(1))
- .writeAtLeastOnce(argThat(new IterableOfSize(3)));
- }
-
- @Test
- public void batching() throws Exception {
- MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
- MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
- SpannerIO.Write write =
- SpannerIO.write()
- .withProjectId("test-project")
- .withInstanceId("test-instance")
- .withDatabaseId("test-database")
- .withBatchSizeBytes(1000000000)
- .withServiceFactory(serviceFactory);
- SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
- DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
- fnTester.processBundle(Arrays.asList(one, two));
-
- verify(serviceFactory.mockSpanner())
- .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
- verify(serviceFactory.mockDatabaseClient(), times(1))
- .writeAtLeastOnce(argThat(new IterableOfSize(2)));
- }
-
- @Test
- public void batchingGroups() throws Exception {
- MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
- MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
- MutationGroup three = g(Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build());
-
- // Have a room to accumulate one more item.
- long batchSize = MutationSizeEstimator.sizeOf(one) + 1;
-
- SpannerIO.Write write =
- SpannerIO.write()
- .withProjectId("test-project")
- .withInstanceId("test-instance")
- .withDatabaseId("test-database")
- .withBatchSizeBytes(batchSize)
- .withServiceFactory(serviceFactory);
- SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
- DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
- fnTester.processBundle(Arrays.asList(one, two, three));
-
- verify(serviceFactory.mockSpanner())
- .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
- verify(serviceFactory.mockDatabaseClient(), times(1))
- .writeAtLeastOnce(argThat(new IterableOfSize(2)));
- verify(serviceFactory.mockDatabaseClient(), times(1))
- .writeAtLeastOnce(argThat(new IterableOfSize(1)));
- }
-
- @Test
- public void noBatching() throws Exception {
- MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
- MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
- SpannerIO.Write write =
- SpannerIO.write()
- .withProjectId("test-project")
- .withInstanceId("test-instance")
- .withDatabaseId("test-database")
- .withBatchSizeBytes(0) // turn off batching.
- .withServiceFactory(serviceFactory);
- SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
- DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
- fnTester.processBundle(Arrays.asList(one, two));
-
- verify(serviceFactory.mockSpanner())
- .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
- verify(serviceFactory.mockDatabaseClient(), times(2))
- .writeAtLeastOnce(argThat(new IterableOfSize(1)));
- }
-
- @Test
- public void groups() throws Exception {
- Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
- Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
- Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
-
- // Smallest batch size
- long batchSize = 1;
-
- SpannerIO.Write write =
- SpannerIO.write()
- .withProjectId("test-project")
- .withInstanceId("test-instance")
- .withDatabaseId("test-database")
- .withBatchSizeBytes(batchSize)
- .withServiceFactory(serviceFactory);
- SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
- DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
- fnTester.processBundle(Arrays.asList(g(one, two, three)));
-
- verify(serviceFactory.mockSpanner())
- .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
- verify(serviceFactory.mockDatabaseClient(), times(1))
- .writeAtLeastOnce(argThat(new IterableOfSize(3)));
- }
-
- @Test
- public void displayData() throws Exception {
- SpannerIO.Write write =
- SpannerIO.write()
- .withProjectId("test-project")
- .withInstanceId("test-instance")
- .withDatabaseId("test-database")
- .withBatchSizeBytes(123);
-
- DisplayData data = DisplayData.from(write);
- assertThat(data.items(), hasSize(4));
- assertThat(data, hasDisplayItem("projectId", "test-project"));
- assertThat(data, hasDisplayItem("instanceId", "test-instance"));
- assertThat(data, hasDisplayItem("databaseId", "test-database"));
- assertThat(data, hasDisplayItem("batchSizeBytes", 123));
- }
-
- private static class IterableOfSize extends ArgumentMatcher<Iterable<Mutation>> {
- private final int size;
-
- private IterableOfSize(int size) {
- this.size = size;
- }
-
- @Override
- public boolean matches(Object argument) {
- return argument instanceof Iterable && Iterables.size((Iterable<?>) argument) == size;
- }
- }
-
- private static MutationGroup g(Mutation m, Mutation... other) {
- return MutationGroup.create(m, other);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
deleted file mode 100644
index d866975..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.cloud.spanner.Database;
-import com.google.cloud.spanner.DatabaseAdminClient;
-import com.google.cloud.spanner.DatabaseClient;
-import com.google.cloud.spanner.DatabaseId;
-import com.google.cloud.spanner.Mutation;
-import com.google.cloud.spanner.Operation;
-import com.google.cloud.spanner.Spanner;
-import com.google.cloud.spanner.SpannerOptions;
-import com.google.cloud.spanner.Struct;
-import com.google.cloud.spanner.TimestampBound;
-import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** End-to-end test of Cloud Spanner Source. */
-@RunWith(JUnit4.class)
-public class SpannerReadIT {
-
- private static final int MAX_DB_NAME_LENGTH = 30;
-
- @Rule public final transient TestPipeline p = TestPipeline.create();
-
- /** Pipeline options for this test. */
- public interface SpannerTestPipelineOptions extends TestPipelineOptions {
- @Description("Instance ID to write to in Spanner")
- @Default.String("beam-test")
- String getInstanceId();
- void setInstanceId(String value);
-
- @Description("Database ID prefix to write to in Spanner")
- @Default.String("beam-testdb")
- String getDatabaseIdPrefix();
- void setDatabaseIdPrefix(String value);
-
- @Description("Table name")
- @Default.String("users")
- String getTable();
- void setTable(String value);
- }
-
- private Spanner spanner;
- private DatabaseAdminClient databaseAdminClient;
- private SpannerTestPipelineOptions options;
- private String databaseName;
- private String project;
-
- @Before
- public void setUp() throws Exception {
- PipelineOptionsFactory.register(SpannerTestPipelineOptions.class);
- options = TestPipeline.testingPipelineOptions().as(SpannerTestPipelineOptions.class);
-
- project = options.as(GcpOptions.class).getProject();
-
- spanner = SpannerOptions.newBuilder().setProjectId(project).build().getService();
-
- databaseName = generateDatabaseName();
-
- databaseAdminClient = spanner.getDatabaseAdminClient();
-
- // Delete database if exists.
- databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName);
-
- Operation<Database, CreateDatabaseMetadata> op =
- databaseAdminClient.createDatabase(
- options.getInstanceId(),
- databaseName,
- Collections.singleton(
- "CREATE TABLE "
- + options.getTable()
- + " ("
- + " Key INT64,"
- + " Value STRING(MAX),"
- + ") PRIMARY KEY (Key)"));
- op.waitFor();
- }
-
- @Test
- public void testRead() throws Exception {
- DatabaseClient databaseClient =
- spanner.getDatabaseClient(
- DatabaseId.of(
- project, options.getInstanceId(), databaseName));
-
- List<Mutation> mutations = new ArrayList<>();
- for (int i = 0; i < 5L; i++) {
- mutations.add(
- Mutation.newInsertOrUpdateBuilder(options.getTable())
- .set("key")
- .to((long) i)
- .set("value")
- .to(RandomUtils.randomAlphaNumeric(100))
- .build());
- }
-
- databaseClient.writeAtLeastOnce(mutations);
-
- SpannerConfig spannerConfig = SpannerConfig.create()
- .withProjectId(project)
- .withInstanceId(options.getInstanceId())
- .withDatabaseId(databaseName);
-
- PCollectionView<Transaction> tx =
- p.apply(
- SpannerIO.createTransaction()
- .withSpannerConfig(spannerConfig)
- .withTimestampBound(TimestampBound.strong()));
-
- PCollection<Struct> output =
- p.apply(
- SpannerIO.read()
- .withSpannerConfig(spannerConfig)
- .withQuery("SELECT * FROM " + options.getTable())
- .withTransaction(tx));
- PAssert.thatSingleton(output.apply("Count rows", Count.<Struct>globally())).isEqualTo(5L);
- p.run();
- }
-
- @After
- public void tearDown() throws Exception {
- databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName);
- spanner.close();
- }
-
- private String generateDatabaseName() {
- String random = RandomUtils
- .randomAlphaNumeric(MAX_DB_NAME_LENGTH - 1 - options.getDatabaseIdPrefix().length());
- return options.getDatabaseIdPrefix() + "-" + random;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
index d208f5c..8df224b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
@@ -33,7 +33,7 @@ import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
import java.util.Collections;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -59,6 +60,11 @@ public class SpannerWriteIT {
/** Pipeline options for this test. */
public interface SpannerTestPipelineOptions extends TestPipelineOptions {
+ @Description("Project ID for Spanner")
+ @Default.String("apache-beam-testing")
+ String getProjectId();
+ void setProjectId(String value);
+
@Description("Instance ID to write to in Spanner")
@Default.String("beam-test")
String getInstanceId();
@@ -79,16 +85,13 @@ public class SpannerWriteIT {
private DatabaseAdminClient databaseAdminClient;
private SpannerTestPipelineOptions options;
private String databaseName;
- private String project;
@Before
public void setUp() throws Exception {
PipelineOptionsFactory.register(SpannerTestPipelineOptions.class);
options = TestPipeline.testingPipelineOptions().as(SpannerTestPipelineOptions.class);
- project = options.as(GcpOptions.class).getProject();
-
- spanner = SpannerOptions.newBuilder().setProjectId(project).build().getService();
+ spanner = SpannerOptions.newBuilder().setProjectId(options.getProjectId()).build().getService();
databaseName = generateDatabaseName();
@@ -112,8 +115,9 @@ public class SpannerWriteIT {
}
private String generateDatabaseName() {
- String random = RandomUtils
- .randomAlphaNumeric(MAX_DB_NAME_LENGTH - 1 - options.getDatabaseIdPrefix().length());
+ String random = RandomStringUtils
+ .randomAlphanumeric(MAX_DB_NAME_LENGTH - 1 - options.getDatabaseIdPrefix().length())
+ .toLowerCase();
return options.getDatabaseIdPrefix() + "-" + random;
}
@@ -123,7 +127,7 @@ public class SpannerWriteIT {
.apply(ParDo.of(new GenerateMutations(options.getTable())))
.apply(
SpannerIO.write()
- .withProjectId(project)
+ .withProjectId(options.getProjectId())
.withInstanceId(options.getInstanceId())
.withDatabaseId(databaseName));
@@ -131,7 +135,7 @@ public class SpannerWriteIT {
DatabaseClient databaseClient =
spanner.getDatabaseClient(
DatabaseId.of(
- project, options.getInstanceId(), databaseName));
+ options.getProjectId(), options.getInstanceId(), databaseName));
ResultSet resultSet =
databaseClient
@@ -145,7 +149,7 @@ public class SpannerWriteIT {
@After
public void tearDown() throws Exception {
databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName);
- spanner.close();
+ spanner.closeAsync().get();
}
private static class GenerateMutations extends DoFn<Long, Mutation> {
@@ -161,7 +165,7 @@ public class SpannerWriteIT {
Mutation.WriteBuilder builder = Mutation.newInsertOrUpdateBuilder(table);
Long key = c.element();
builder.set("Key").to(key);
- builder.set("Value").to(RandomUtils.randomAlphaNumeric(valueSize));
+ builder.set("Value").to(RandomStringUtils.randomAlphabetic(valueSize));
Mutation mutation = builder.build();
c.output(mutation);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index 4bcbcd7..8749243 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
index a9c2e57..db5a1db 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -44,6 +44,37 @@
</plugins>
</build>
+ <properties>
+ <!--
+ This is the version of Hadoop used to compile the hadoop-common module.
+ This dependency is defined with a provided scope.
+ Users must supply their own Hadoop version at runtime.
+ -->
+ <hadoop.version>2.7.3</hadoop.version>
+ </properties>
+
+ <dependencyManagement>
+ <!--
+ We define dependencies here instead of sdks/java/io because
+ of a version mimatch between this Hadoop version and other
+ Hadoop versions declared in other io submodules.
+ -->
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <classifier>tests</classifier>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/input-format/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml
index 0953119..06f9f11 100644
--- a/sdks/java/io/hadoop/input-format/pom.xml
+++ b/sdks/java/io/hadoop/input-format/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 0b4c23f..efd47fd 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -166,7 +166,7 @@ import org.slf4j.LoggerFactory;
* }
* </pre>
*/
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
public class HadoopInputFormatIO {
private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatIO.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
index 12944f4..9f84e88 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
+++ b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
@@ -26,7 +26,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>beam-sdks-java-io-hadoop-jdk1.8-tests</artifactId>
@@ -108,11 +108,13 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
index 3f866a4..8745521 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.hadoop.inputformat;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -77,7 +76,7 @@ public class HIFIOWithElasticTest implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(HIFIOWithElasticTest.class);
private static final String ELASTIC_IN_MEM_HOSTNAME = "127.0.0.1";
- private static String elasticInMemPort = "9200";
+ private static final String ELASTIC_IN_MEM_PORT = "9200";
private static final String ELASTIC_INTERNAL_VERSION = "5.x";
private static final String TRUE = "true";
private static final String ELASTIC_INDEX_NAME = "beamdb";
@@ -95,10 +94,6 @@ public class HIFIOWithElasticTest implements Serializable {
@BeforeClass
public static void startServer()
throws NodeValidationException, InterruptedException, IOException {
- ServerSocket serverSocket = new ServerSocket(0);
- int port = serverSocket.getLocalPort();
- serverSocket.close();
- elasticInMemPort = String.valueOf(port);
ElasticEmbeddedServer.startElasticEmbeddedServer();
}
@@ -178,7 +173,7 @@ public class HIFIOWithElasticTest implements Serializable {
public Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set(ConfigurationOptions.ES_NODES, ELASTIC_IN_MEM_HOSTNAME);
- conf.set(ConfigurationOptions.ES_PORT, String.format("%s", elasticInMemPort));
+ conf.set(ConfigurationOptions.ES_PORT, String.format("%s", ELASTIC_IN_MEM_PORT));
conf.set(ConfigurationOptions.ES_RESOURCE, ELASTIC_RESOURCE);
conf.set("es.internal.es.version", ELASTIC_INTERNAL_VERSION);
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, TRUE);
@@ -214,7 +209,7 @@ public class HIFIOWithElasticTest implements Serializable {
Settings settings = Settings.builder()
.put("node.data", TRUE)
.put("network.host", ELASTIC_IN_MEM_HOSTNAME)
- .put("http.port", elasticInMemPort)
+ .put("http.port", ELASTIC_IN_MEM_PORT)
.put("path.data", elasticTempFolder.getRoot().getPath())
.put("path.home", elasticTempFolder.getRoot().getPath())
.put("transport.type", "local")
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/pom.xml b/sdks/java/io/hadoop/pom.xml
index bc3569d..a1c7a2e 100644
--- a/sdks/java/io/hadoop/pom.xml
+++ b/sdks/java/io/hadoop/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<packaging>pom</packaging>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index 40f516a..746b993 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -31,7 +31,8 @@
<description>Library to read and write from/to HBase</description>
<properties>
- <hbase.version>1.2.6</hbase.version>
+ <hbase.version>1.2.5</hbase.version>
+ <hbase.hadoop.version>2.5.1</hbase.hadoop.version>
</properties>
<build>
@@ -63,12 +64,6 @@
</dependency>
<dependency>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
<version>${hbase.version}</version>
@@ -108,26 +103,15 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
+ <version>${hbase.hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
+ <version>${hbase.hadoop.version}</version>
<scope>test</scope>
- <exclusions>
- <!-- Fix build on JDK-9 -->
- <exclusion>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
deleted file mode 100644
index 2973d1b..0000000
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hbase;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderProviderRegistrar;
-import org.apache.beam.sdk.coders.CoderProviders;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.hadoop.hbase.client.Result;
-
-/**
- * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}.
- */
-@AutoService(CoderProviderRegistrar.class)
-public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar {
- @Override
- public List<CoderProvider> getCoderProviders() {
- return ImmutableList.of(
- HBaseMutationCoder.getCoderProvider(),
- CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
- }
-}