You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/20 21:26:16 UTC

[1/2] beam git commit: Support ValueProviders in SpannerIO.Write

Repository: beam
Updated Branches:
  refs/heads/master 608a9c459 -> 10e47646d


Support ValueProviders in SpannerIO.Write


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/42a2de91
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/42a2de91
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/42a2de91

Branch: refs/heads/master
Commit: 42a2de91adf1387bb8eaf9aa515a24f6f276bf40
Parents: 608a9c4
Author: Mairbek Khadikov <ma...@google.com>
Authored: Wed Jun 14 13:03:36 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 14:25:51 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/spanner/SpannerIO.java      | 31 ++++++++++++++------
 .../beam/sdk/io/gcp/spanner/SpannerIOTest.java  | 21 +++++++++++++
 2 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/42a2de91/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index af5253b..8bfc247 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -37,6 +37,7 @@ import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -123,13 +124,13 @@ public class SpannerIO {
   public abstract static class Write extends PTransform<PCollection<Mutation>, PDone> {
 
     @Nullable
-    abstract String getProjectId();
+    abstract ValueProvider<String> getProjectId();
 
     @Nullable
-    abstract String getInstanceId();
+    abstract ValueProvider<String> getInstanceId();
 
     @Nullable
-    abstract String getDatabaseId();
+    abstract ValueProvider<String> getDatabaseId();
 
     abstract long getBatchSizeBytes();
 
@@ -142,11 +143,11 @@ public class SpannerIO {
     @AutoValue.Builder
     abstract static class Builder {
 
-      abstract Builder setProjectId(String projectId);
+      abstract Builder setProjectId(ValueProvider<String> projectId);
 
-      abstract Builder setInstanceId(String instanceId);
+      abstract Builder setInstanceId(ValueProvider<String> instanceId);
 
-      abstract Builder setDatabaseId(String databaseId);
+      abstract Builder setDatabaseId(ValueProvider<String> databaseId);
 
       abstract Builder setBatchSizeBytes(long batchSizeBytes);
 
@@ -162,6 +163,10 @@ public class SpannerIO {
      * <p>Does not modify this object.
      */
     public Write withProjectId(String projectId) {
+      return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
+    }
+
+    public Write withProjectId(ValueProvider<String> projectId) {
       return toBuilder().setProjectId(projectId).build();
     }
 
@@ -172,6 +177,10 @@ public class SpannerIO {
      * <p>Does not modify this object.
      */
     public Write withInstanceId(String instanceId) {
+      return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
+    }
+
+    public Write withInstanceId(ValueProvider<String> instanceId) {
       return toBuilder().setInstanceId(instanceId).build();
     }
 
@@ -191,6 +200,10 @@ public class SpannerIO {
      * <p>Does not modify this object.
      */
     public Write withDatabaseId(String databaseId) {
+      return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
+    }
+
+    public Write withDatabaseId(ValueProvider<String> databaseId) {
       return toBuilder().setDatabaseId(databaseId).build();
     }
 
@@ -291,7 +304,7 @@ public class SpannerIO {
       SpannerOptions spannerOptions = getSpannerOptions();
       spanner = spannerOptions.getService();
       dbClient = spanner.getDatabaseClient(
-          DatabaseId.of(projectId(), spec.getInstanceId(), spec.getDatabaseId()));
+          DatabaseId.of(projectId(), spec.getInstanceId().get(), spec.getDatabaseId().get()));
       mutations = new ArrayList<>();
       batchSizeBytes = 0;
     }
@@ -309,7 +322,7 @@ public class SpannerIO {
     private String projectId() {
       return spec.getProjectId() == null
           ? ServiceOptions.getDefaultProjectId()
-          : spec.getProjectId();
+          : spec.getProjectId().get();
     }
 
     @FinishBundle
@@ -334,7 +347,7 @@ public class SpannerIO {
         spannerOptionsBuider.setServiceFactory(spec.getServiceFactory());
       }
       if (spec.getProjectId() != null) {
-        spannerOptionsBuider.setProjectId(spec.getProjectId());
+        spannerOptionsBuider.setProjectId(spec.getProjectId().get());
       }
       return spannerOptionsBuider.build();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/42a2de91/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
index 4a759fb..1e19a59 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -42,6 +45,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Before;
 import org.junit.Rule;
@@ -231,6 +235,23 @@ public class SpannerIOTest implements Serializable {
         .writeAtLeastOnce(argThat(new IterableOfSize(3)));
   }
 
+  @Test
+  public void displayData() throws Exception {
+    SpannerIO.Write write =
+        SpannerIO.write()
+            .withProjectId("test-project")
+            .withInstanceId("test-instance")
+            .withDatabaseId("test-database")
+            .withBatchSizeBytes(123);
+
+    DisplayData data = DisplayData.from(write);
+    assertThat(data.items(), hasSize(4));
+    assertThat(data, hasDisplayItem("projectId", "test-project"));
+    assertThat(data, hasDisplayItem("instanceId", "test-instance"));
+    assertThat(data, hasDisplayItem("databaseId", "test-database"));
+    assertThat(data, hasDisplayItem("batchSizeBytes", 123));
+  }
+
   private static class FakeServiceFactory
       implements ServiceFactory<Spanner, SpannerOptions>, Serializable {
     // Marked as static so they could be returned by serviceFactory, which is serializable.


[2/2] beam git commit: This closes #3358: [BEAM-1542] Support ValueProviders in SpannerIO

Posted by jk...@apache.org.
This closes #3358: [BEAM-1542] Support ValueProviders in SpannerIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10e47646
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10e47646
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10e47646

Branch: refs/heads/master
Commit: 10e47646dd5f20d4049d670249cae56c51768ae0
Parents: 608a9c4 42a2de9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jun 20 14:25:56 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 14:25:56 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/spanner/SpannerIO.java      | 31 ++++++++++++++------
 .../beam/sdk/io/gcp/spanner/SpannerIOTest.java  | 21 +++++++++++++
 2 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------