You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/24 22:52:02 UTC

[1/2] beam git commit: Adds SpannerAccessor - a utility for DoFn's that use Spanner

Repository: beam
Updated Branches:
  refs/heads/master f3de7363c -> c33cb0340


Adds SpannerAccessor - a utility for DoFn's that use Spanner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9c2e816
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9c2e816
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9c2e816

Branch: refs/heads/master
Commit: c9c2e81672676e3ec705269a94f11fb1a2596c48
Parents: f3de736
Author: Mairbek Khadikov <ma...@google.com>
Authored: Mon Aug 7 12:33:19 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 24 15:51:41 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/spanner/AbstractSpannerFn.java   | 71 --------------------
 .../sdk/io/gcp/spanner/CreateTransactionFn.java | 22 ++++--
 .../sdk/io/gcp/spanner/NaiveSpannerReadFn.java  | 18 +++--
 .../sdk/io/gcp/spanner/SpannerAccessor.java     | 43 ++++++++++++
 .../beam/sdk/io/gcp/spanner/SpannerConfig.java  | 22 ++++++
 .../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 24 ++++---
 6 files changed, 111 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
deleted file mode 100644
index 50efdea..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.cloud.spanner.DatabaseClient;
-import com.google.cloud.spanner.DatabaseId;
-import com.google.cloud.spanner.Spanner;
-import com.google.cloud.spanner.SpannerOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.ReleaseInfo;
-
-/**
- * Abstract {@link DoFn} that manages {@link Spanner} lifecycle. Use {@link
- * AbstractSpannerFn#databaseClient} to access the Cloud Spanner database client.
- */
-abstract class AbstractSpannerFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
-  // A common user agent token that indicates that this request was originated from Apache Beam.
-  private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";
-
-  private transient Spanner spanner;
-  private transient DatabaseClient databaseClient;
-
-  abstract SpannerConfig getSpannerConfig();
-
-  @Setup
-  public void setup() throws Exception {
-    SpannerConfig spannerConfig = getSpannerConfig();
-    SpannerOptions.Builder builder = SpannerOptions.newBuilder();
-    if (spannerConfig.getProjectId() != null) {
-      builder.setProjectId(spannerConfig.getProjectId().get());
-    }
-    if (spannerConfig.getServiceFactory() != null) {
-      builder.setServiceFactory(spannerConfig.getServiceFactory());
-    }
-    ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
-    builder.setUserAgentPrefix(USER_AGENT_PREFIX + "/" + releaseInfo.getVersion());
-    SpannerOptions options = builder.build();
-    spanner = options.getService();
-    databaseClient = spanner.getDatabaseClient(DatabaseId
-        .of(options.getProjectId(), spannerConfig.getInstanceId().get(),
-            spannerConfig.getDatabaseId().get()));
-  }
-
-  @Teardown
-  public void teardown() throws Exception {
-    if (spanner == null) {
-      return;
-    }
-    spanner.close();
-    spanner = null;
-  }
-
-  protected DatabaseClient databaseClient() {
-    return databaseClient;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
index da8e8b1..5574ae1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
@@ -17,12 +17,14 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner;
 
+import com.google.cloud.spanner.DatabaseClient;
 import com.google.cloud.spanner.ReadOnlyTransaction;
 import com.google.cloud.spanner.ResultSet;
 import com.google.cloud.spanner.Statement;
+import org.apache.beam.sdk.transforms.DoFn;
 
 /** Creates a batch transaction. */
-class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> {
+class CreateTransactionFn extends DoFn<Object, Transaction> {
 
   private final SpannerIO.CreateTransaction config;
 
@@ -30,10 +32,22 @@ class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> {
     this.config = config;
   }
 
+  private transient SpannerAccessor spannerAccessor;
+
+  @Setup
+  public void setup() throws Exception {
+    spannerAccessor = config.getSpannerConfig().connectToSpanner();
+  }
+  @Teardown
+  public void teardown() throws Exception {
+    spannerAccessor.close();
+  }
+
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
+    DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
     try (ReadOnlyTransaction readOnlyTransaction =
-        databaseClient().readOnlyTransaction(config.getTimestampBound())) {
+        databaseClient.readOnlyTransaction(config.getTimestampBound())) {
       // Run a dummy sql statement to force the RPC and obtain the timestamp from the server.
       ResultSet resultSet = readOnlyTransaction.executeQuery(Statement.of("SELECT 1"));
       while (resultSet.next()) {
@@ -44,8 +58,4 @@ class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> {
     }
   }
 
-  @Override
-  SpannerConfig getSpannerConfig() {
-    return config.getSpannerConfig();
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
index 92b3fe3..5dc6ead 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
@@ -17,19 +17,22 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner;
 
+import com.google.cloud.spanner.DatabaseClient;
 import com.google.cloud.spanner.ReadOnlyTransaction;
 import com.google.cloud.spanner.ResultSet;
 import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TimestampBound;
 import com.google.common.annotations.VisibleForTesting;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /** A simplest read function implementation. Parallelism support is coming. */
 @VisibleForTesting
-class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> {
+class NaiveSpannerReadFn extends DoFn<ReadOperation, Struct> {
   private final SpannerConfig config;
   @Nullable private final PCollectionView<Transaction> transaction;
+  private transient SpannerAccessor spannerAccessor;
 
   NaiveSpannerReadFn(SpannerConfig config, @Nullable PCollectionView<Transaction> transaction) {
     this.config = config;
@@ -40,8 +43,14 @@ class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> {
     this(config, null);
   }
 
-  SpannerConfig getSpannerConfig() {
-    return config;
+
+  @Setup
+  public void setup() throws Exception {
+    spannerAccessor = config.connectToSpanner();
+  }
+  @Teardown
+  public void teardown() throws Exception {
+    spannerAccessor.close();
   }
 
   @ProcessElement
@@ -52,8 +61,9 @@ class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> {
       timestampBound = TimestampBound.ofReadTimestamp(transaction.timestamp());
     }
     ReadOperation op = c.element();
+    DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
     try (ReadOnlyTransaction readOnlyTransaction =
-        databaseClient().readOnlyTransaction(timestampBound)) {
+        databaseClient.readOnlyTransaction(timestampBound)) {
       ResultSet resultSet = execute(op, readOnlyTransaction);
       while (resultSet.next()) {
         c.output(resultSet.getCurrentRowAsStruct());

http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
new file mode 100644
index 0000000..f32e661
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.Spanner;
+
+/**
+ * Manages lifecycle of {@link DatabaseClient} and {@link Spanner} instances.
+ */
+public class SpannerAccessor implements AutoCloseable {
+  private final Spanner spanner;
+  private final DatabaseClient databaseClient;
+
+  SpannerAccessor(Spanner spanner, DatabaseClient databaseClient) {
+    this.spanner = spanner;
+    this.databaseClient = databaseClient;
+  }
+
+  public DatabaseClient getDatabaseClient() {
+    return databaseClient;
+  }
+
+  @Override
+  public void close() {
+    spanner.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
index 034c38a..6646f32 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
@@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.cloud.ServiceFactory;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
 import com.google.cloud.spanner.Spanner;
 import com.google.cloud.spanner.SpannerOptions;
 import com.google.common.annotations.VisibleForTesting;
@@ -29,10 +31,13 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.ReleaseInfo;
 
 /** Configuration for a Cloud Spanner client. */
 @AutoValue
 public abstract class SpannerConfig implements Serializable {
+  // A common user agent token that indicates that this request was originated from Apache Beam.
+  private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";
 
   @Nullable
   abstract ValueProvider<String> getProjectId();
@@ -123,4 +128,21 @@ public abstract class SpannerConfig implements Serializable {
     return toBuilder().setServiceFactory(serviceFactory).build();
   }
 
+  public SpannerAccessor connectToSpanner() {
+    SpannerOptions.Builder builder = SpannerOptions.newBuilder();
+    if (getProjectId() != null) {
+      builder.setProjectId(getProjectId().get());
+    }
+    if (getServiceFactory() != null) {
+      builder.setServiceFactory(this.getServiceFactory());
+    }
+    ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
+    builder.setUserAgentPrefix(USER_AGENT_PREFIX + "/" + releaseInfo.getVersion());
+    SpannerOptions options = builder.build();
+    Spanner spanner = options.getService();
+    DatabaseClient databaseClient = spanner.getDatabaseClient(
+        DatabaseId.of(options.getProjectId(), getInstanceId().get(), getDatabaseId().get()));
+    return new SpannerAccessor(spanner, databaseClient);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
index 34a11da..9343c0c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.spanner;
 
 import com.google.cloud.spanner.AbortedException;
+import com.google.cloud.spanner.DatabaseClient;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
@@ -25,6 +26,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.BackOffUtils;
@@ -35,7 +37,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Batches together and writes mutations to Google Cloud Spanner. */
-@VisibleForTesting class SpannerWriteGroupFn extends AbstractSpannerFn<MutationGroup, Void> {
+@VisibleForTesting
+class SpannerWriteGroupFn extends DoFn<MutationGroup, Void> {
   private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class);
   private final SpannerIO.Write spec;
   // Current batch of mutations to be written.
@@ -48,21 +51,25 @@ import org.slf4j.LoggerFactory;
           .withMaxRetries(MAX_RETRIES)
           .withInitialBackoff(Duration.standardSeconds(5));
 
-  @VisibleForTesting SpannerWriteGroupFn(SpannerIO.Write spec) {
-    this.spec = spec;
-  }
+  private transient SpannerAccessor spannerAccessor;
 
-  @Override SpannerConfig getSpannerConfig() {
-    return spec.getSpannerConfig();
+  @VisibleForTesting
+  SpannerWriteGroupFn(SpannerIO.Write spec) {
+    this.spec = spec;
   }
 
   @Setup
   public void setup() throws Exception {
-    super.setup();
+    spannerAccessor = spec.getSpannerConfig().connectToSpanner();
     mutations = new ArrayList<>();
     batchSizeBytes = 0;
   }
 
+  @Teardown
+  public void teardown() throws Exception {
+    spannerAccessor.close();
+  }
+
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
     MutationGroup m = c.element();
@@ -94,10 +101,11 @@ import org.slf4j.LoggerFactory;
     Sleeper sleeper = Sleeper.DEFAULT;
     BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
 
+    DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
     while (true) {
       // Batch upsert rows.
       try {
-        databaseClient().writeAtLeastOnce(Iterables.concat(mutations));
+        databaseClient.writeAtLeastOnce(Iterables.concat(mutations));
 
         // Break if the commit threw no exception.
         break;


[2/2] beam git commit: This closes #3696: [BEAM-1542] Add SpannerAccessor

Posted by jk...@apache.org.
This closes #3696: [BEAM-1542] Add SpannerAccessor


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c33cb034
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c33cb034
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c33cb034

Branch: refs/heads/master
Commit: c33cb03403c4f6fdbb49a525f66f12210c00ea0a
Parents: f3de736 c9c2e81
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 24 15:51:45 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 24 15:51:45 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/spanner/AbstractSpannerFn.java   | 71 --------------------
 .../sdk/io/gcp/spanner/CreateTransactionFn.java | 22 ++++--
 .../sdk/io/gcp/spanner/NaiveSpannerReadFn.java  | 18 +++--
 .../sdk/io/gcp/spanner/SpannerAccessor.java     | 43 ++++++++++++
 .../beam/sdk/io/gcp/spanner/SpannerConfig.java  | 22 ++++++
 .../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 24 ++++---
 6 files changed, 111 insertions(+), 89 deletions(-)
----------------------------------------------------------------------