You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/24 22:52:02 UTC
[1/2] beam git commit: Adds SpannerAccessor - a utility for DoFn's
that use Spanner
Repository: beam
Updated Branches:
refs/heads/master f3de7363c -> c33cb0340
Adds SpannerAccessor - a utility for DoFn's that use Spanner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9c2e816
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9c2e816
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9c2e816
Branch: refs/heads/master
Commit: c9c2e81672676e3ec705269a94f11fb1a2596c48
Parents: f3de736
Author: Mairbek Khadikov <ma...@google.com>
Authored: Mon Aug 7 12:33:19 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 24 15:51:41 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/spanner/AbstractSpannerFn.java | 71 --------------------
.../sdk/io/gcp/spanner/CreateTransactionFn.java | 22 ++++--
.../sdk/io/gcp/spanner/NaiveSpannerReadFn.java | 18 +++--
.../sdk/io/gcp/spanner/SpannerAccessor.java | 43 ++++++++++++
.../beam/sdk/io/gcp/spanner/SpannerConfig.java | 22 ++++++
.../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 24 ++++---
6 files changed, 111 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
deleted file mode 100644
index 50efdea..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.cloud.spanner.DatabaseClient;
-import com.google.cloud.spanner.DatabaseId;
-import com.google.cloud.spanner.Spanner;
-import com.google.cloud.spanner.SpannerOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.ReleaseInfo;
-
-/**
- * Abstract {@link DoFn} that manages {@link Spanner} lifecycle. Use {@link
- * AbstractSpannerFn#databaseClient} to access the Cloud Spanner database client.
- */
-abstract class AbstractSpannerFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
- // A common user agent token that indicates that this request was originated from Apache Beam.
- private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";
-
- private transient Spanner spanner;
- private transient DatabaseClient databaseClient;
-
- abstract SpannerConfig getSpannerConfig();
-
- @Setup
- public void setup() throws Exception {
- SpannerConfig spannerConfig = getSpannerConfig();
- SpannerOptions.Builder builder = SpannerOptions.newBuilder();
- if (spannerConfig.getProjectId() != null) {
- builder.setProjectId(spannerConfig.getProjectId().get());
- }
- if (spannerConfig.getServiceFactory() != null) {
- builder.setServiceFactory(spannerConfig.getServiceFactory());
- }
- ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
- builder.setUserAgentPrefix(USER_AGENT_PREFIX + "/" + releaseInfo.getVersion());
- SpannerOptions options = builder.build();
- spanner = options.getService();
- databaseClient = spanner.getDatabaseClient(DatabaseId
- .of(options.getProjectId(), spannerConfig.getInstanceId().get(),
- spannerConfig.getDatabaseId().get()));
- }
-
- @Teardown
- public void teardown() throws Exception {
- if (spanner == null) {
- return;
- }
- spanner.close();
- spanner = null;
- }
-
- protected DatabaseClient databaseClient() {
- return databaseClient;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
index da8e8b1..5574ae1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
@@ -17,12 +17,14 @@
*/
package org.apache.beam.sdk.io.gcp.spanner;
+import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
+import org.apache.beam.sdk.transforms.DoFn;
/** Creates a batch transaction. */
-class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> {
+class CreateTransactionFn extends DoFn<Object, Transaction> {
private final SpannerIO.CreateTransaction config;
@@ -30,10 +32,22 @@ class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> {
this.config = config;
}
+ private transient SpannerAccessor spannerAccessor;
+
+ @Setup
+ public void setup() throws Exception {
+ spannerAccessor = config.getSpannerConfig().connectToSpanner();
+ }
+ @Teardown
+ public void teardown() throws Exception {
+ spannerAccessor.close();
+ }
+
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
+ DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
try (ReadOnlyTransaction readOnlyTransaction =
- databaseClient().readOnlyTransaction(config.getTimestampBound())) {
+ databaseClient.readOnlyTransaction(config.getTimestampBound())) {
// Run a dummy sql statement to force the RPC and obtain the timestamp from the server.
ResultSet resultSet = readOnlyTransaction.executeQuery(Statement.of("SELECT 1"));
while (resultSet.next()) {
@@ -44,8 +58,4 @@ class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> {
}
}
- @Override
- SpannerConfig getSpannerConfig() {
- return config.getSpannerConfig();
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
index 92b3fe3..5dc6ead 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
@@ -17,19 +17,22 @@
*/
package org.apache.beam.sdk.io.gcp.spanner;
+import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import com.google.common.annotations.VisibleForTesting;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollectionView;
/** A simplest read function implementation. Parallelism support is coming. */
@VisibleForTesting
-class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> {
+class NaiveSpannerReadFn extends DoFn<ReadOperation, Struct> {
private final SpannerConfig config;
@Nullable private final PCollectionView<Transaction> transaction;
+ private transient SpannerAccessor spannerAccessor;
NaiveSpannerReadFn(SpannerConfig config, @Nullable PCollectionView<Transaction> transaction) {
this.config = config;
@@ -40,8 +43,14 @@ class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> {
this(config, null);
}
- SpannerConfig getSpannerConfig() {
- return config;
+
+ @Setup
+ public void setup() throws Exception {
+ spannerAccessor = config.connectToSpanner();
+ }
+ @Teardown
+ public void teardown() throws Exception {
+ spannerAccessor.close();
}
@ProcessElement
@@ -52,8 +61,9 @@ class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> {
timestampBound = TimestampBound.ofReadTimestamp(transaction.timestamp());
}
ReadOperation op = c.element();
+ DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
try (ReadOnlyTransaction readOnlyTransaction =
- databaseClient().readOnlyTransaction(timestampBound)) {
+ databaseClient.readOnlyTransaction(timestampBound)) {
ResultSet resultSet = execute(op, readOnlyTransaction);
while (resultSet.next()) {
c.output(resultSet.getCurrentRowAsStruct());
http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
new file mode 100644
index 0000000..f32e661
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.Spanner;
+
+/**
+ * Manages lifecycle of {@link DatabaseClient} and {@link Spanner} instances.
+ */
+public class SpannerAccessor implements AutoCloseable {
+ private final Spanner spanner;
+ private final DatabaseClient databaseClient;
+
+ SpannerAccessor(Spanner spanner, DatabaseClient databaseClient) {
+ this.spanner = spanner;
+ this.databaseClient = databaseClient;
+ }
+
+ public DatabaseClient getDatabaseClient() {
+ return databaseClient;
+ }
+
+ @Override
+ public void close() {
+ spanner.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
index 034c38a..6646f32 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
@@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import com.google.cloud.ServiceFactory;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.annotations.VisibleForTesting;
@@ -29,10 +31,13 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.ReleaseInfo;
/** Configuration for a Cloud Spanner client. */
@AutoValue
public abstract class SpannerConfig implements Serializable {
+ // A common user agent token that indicates that this request was originated from Apache Beam.
+ private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";
@Nullable
abstract ValueProvider<String> getProjectId();
@@ -123,4 +128,21 @@ public abstract class SpannerConfig implements Serializable {
return toBuilder().setServiceFactory(serviceFactory).build();
}
+ public SpannerAccessor connectToSpanner() {
+ SpannerOptions.Builder builder = SpannerOptions.newBuilder();
+ if (getProjectId() != null) {
+ builder.setProjectId(getProjectId().get());
+ }
+ if (getServiceFactory() != null) {
+ builder.setServiceFactory(this.getServiceFactory());
+ }
+ ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
+ builder.setUserAgentPrefix(USER_AGENT_PREFIX + "/" + releaseInfo.getVersion());
+ SpannerOptions options = builder.build();
+ Spanner spanner = options.getService();
+ DatabaseClient databaseClient = spanner.getDatabaseClient(
+ DatabaseId.of(options.getProjectId(), getInstanceId().get(), getDatabaseId().get()));
+ return new SpannerAccessor(spanner, databaseClient);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
index 34a11da..9343c0c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.spanner;
import com.google.cloud.spanner.AbortedException;
+import com.google.cloud.spanner.DatabaseClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
@@ -25,6 +26,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
@@ -35,7 +37,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Batches together and writes mutations to Google Cloud Spanner. */
-@VisibleForTesting class SpannerWriteGroupFn extends AbstractSpannerFn<MutationGroup, Void> {
+@VisibleForTesting
+class SpannerWriteGroupFn extends DoFn<MutationGroup, Void> {
private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class);
private final SpannerIO.Write spec;
// Current batch of mutations to be written.
@@ -48,21 +51,25 @@ import org.slf4j.LoggerFactory;
.withMaxRetries(MAX_RETRIES)
.withInitialBackoff(Duration.standardSeconds(5));
- @VisibleForTesting SpannerWriteGroupFn(SpannerIO.Write spec) {
- this.spec = spec;
- }
+ private transient SpannerAccessor spannerAccessor;
- @Override SpannerConfig getSpannerConfig() {
- return spec.getSpannerConfig();
+ @VisibleForTesting
+ SpannerWriteGroupFn(SpannerIO.Write spec) {
+ this.spec = spec;
}
@Setup
public void setup() throws Exception {
- super.setup();
+ spannerAccessor = spec.getSpannerConfig().connectToSpanner();
mutations = new ArrayList<>();
batchSizeBytes = 0;
}
+ @Teardown
+ public void teardown() throws Exception {
+ spannerAccessor.close();
+ }
+
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
MutationGroup m = c.element();
@@ -94,10 +101,11 @@ import org.slf4j.LoggerFactory;
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
+ DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
while (true) {
// Batch upsert rows.
try {
- databaseClient().writeAtLeastOnce(Iterables.concat(mutations));
+ databaseClient.writeAtLeastOnce(Iterables.concat(mutations));
// Break if the commit threw no exception.
break;
[2/2] beam git commit: This closes #3696: [BEAM-1542] Add
SpannerAccessor
Posted by jk...@apache.org.
This closes #3696: [BEAM-1542] Add SpannerAccessor
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c33cb034
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c33cb034
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c33cb034
Branch: refs/heads/master
Commit: c33cb03403c4f6fdbb49a525f66f12210c00ea0a
Parents: f3de736 c9c2e81
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 24 15:51:45 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 24 15:51:45 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/spanner/AbstractSpannerFn.java | 71 --------------------
.../sdk/io/gcp/spanner/CreateTransactionFn.java | 22 ++++--
.../sdk/io/gcp/spanner/NaiveSpannerReadFn.java | 18 +++--
.../sdk/io/gcp/spanner/SpannerAccessor.java | 43 ++++++++++++
.../beam/sdk/io/gcp/spanner/SpannerConfig.java | 22 ++++++
.../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 24 ++++---
6 files changed, 111 insertions(+), 89 deletions(-)
----------------------------------------------------------------------