You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/11/24 06:36:55 UTC
[beam] branch master updated: A schema transform implementation for SpannerIO.Write (#24278)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 09606de8779 A schema transform implementation for SpannerIO.Write (#24278)
09606de8779 is described below
commit 09606de8779ca284666581c7c2da314df4f9ddb1
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Wed Nov 23 22:36:46 2022 -0800
A schema transform implementation for SpannerIO.Write (#24278)
* A schema transform implementation for SpannerIO.Write
* fixup
* fixup
* fixup
* fixup
* fixup and comments
* fixup
* fixup
---
.../beam/sdk/io/gcp/spanner/MutationUtils.java | 2 +-
.../SpannerWriteSchemaTransformProvider.java | 177 +++++++++++++++++++++
.../beam/sdk/io/gcp/spanner/SpannerWriteIT.java | 40 +++++
3 files changed, 218 insertions(+), 1 deletion(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
index 085e9929a7b..9b74e21d285 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
@@ -88,7 +88,7 @@ final class MutationUtils {
return builder.build();
}
- private static Mutation createMutationFromBeamRows(
+ public static Mutation createMutationFromBeamRows(
Mutation.WriteBuilder mutationBuilder, Row row) {
Schema schema = row.getSchema();
List<String> columns = schema.getFieldNames();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
new file mode 100644
index 00000000000..3e3462d8df0
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.spanner.Mutation;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.FlatMapElements;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class SpannerWriteSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+ SpannerWriteSchemaTransformProvider.SpannerWriteSchemaTransformConfiguration> {
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized Class<SpannerWriteSchemaTransformConfiguration>
+ configurationClass() {
+ return SpannerWriteSchemaTransformConfiguration.class;
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ SpannerWriteSchemaTransformConfiguration configuration) {
+ return new SpannerSchemaTransformWrite(configuration);
+ }
+
+ static class SpannerSchemaTransformWrite implements SchemaTransform, Serializable {
+ private final SpannerWriteSchemaTransformConfiguration configuration;
+
+ SpannerSchemaTransformWrite(SpannerWriteSchemaTransformConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized PTransform<
+ @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
+ @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
+ buildTransform() {
+ // TODO: For now we are allowing ourselves to fail at runtime, but we could
+ // perform validations here at expansion time. This TODO is to add a few
+ // validations (e.g. table/database/instance existence, schema match, etc).
+ return new PTransform<@NonNull PCollectionRowTuple, @NonNull PCollectionRowTuple>() {
+ @Override
+ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) {
+ SpannerWriteResult result =
+ input
+ .get("input")
+ .apply(
+ MapElements.via(
+ new SimpleFunction<Row, Mutation>(
+ row ->
+ MutationUtils.createMutationFromBeamRows(
+ Mutation.newInsertOrUpdateBuilder(configuration.getTableId()),
+ Objects.requireNonNull(row))) {}))
+ .apply(
+ SpannerIO.write()
+ .withDatabaseId(configuration.getDatabaseId())
+ .withInstanceId(configuration.getInstanceId())
+ .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
+ Schema failureSchema =
+ Schema.builder()
+ .addStringField("operation")
+ .addStringField("instanceId")
+ .addStringField("databaseId")
+ .addStringField("tableId")
+ .addStringField("mutationData")
+ .build();
+ PCollection<Row> failures =
+ result
+ .getFailedMutations()
+ .apply(
+ FlatMapElements.into(TypeDescriptors.rows())
+ .via(
+ mtg ->
+ Objects.requireNonNull(mtg).attached().stream()
+ .map(
+ mutation ->
+ Row.withSchema(failureSchema)
+ .addValue(mutation.getOperation().toString())
+ .addValue(configuration.getInstanceId())
+ .addValue(configuration.getDatabaseId())
+ .addValue(mutation.getTable())
+ // TODO(pabloem): Figure out how to represent
+ // mutation
+ // contents in DLQ
+ .addValue(
+ Iterators.toString(
+ mutation.getValues().iterator()))
+ .build())
+ .collect(Collectors.toList())))
+ .setRowSchema(failureSchema);
+ return PCollectionRowTuple.of("failures", failures);
+ }
+ };
+ }
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:spanner_write:v1";
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
+ inputCollectionNames() {
+ return Collections.singletonList("input");
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
+ outputCollectionNames() {
+ return Collections.singletonList("failures");
+ }
+
+ @AutoValue
+ @DefaultSchema(AutoValueSchema.class)
+ public abstract static class SpannerWriteSchemaTransformConfiguration implements Serializable {
+ public abstract String getInstanceId();
+
+ public abstract String getDatabaseId();
+
+ public abstract String getTableId();
+
+ public static Builder builder() {
+ return new AutoValue_SpannerWriteSchemaTransformProvider_SpannerWriteSchemaTransformConfiguration
+ .Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setInstanceId(String instanceId);
+
+ public abstract Builder setDatabaseId(String databaseId);
+
+ public abstract Builder setTableId(String tableId);
+
+ public abstract SpannerWriteSchemaTransformConfiguration build();
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
index f8558ed2440..9594633d2dc 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
@@ -34,20 +34,26 @@ import com.google.cloud.spanner.Statement;
import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
import java.io.Serializable;
import java.util.Collections;
+import java.util.Objects;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
@@ -200,6 +206,40 @@ public class SpannerWriteIT {
assertThat(countNumberOfRecords(pgDatabaseName), equalTo((long) numRecords));
}
+ @Test
+ public void testWriteViaSchemaTransform() throws Exception {
+ int numRecords = 100;
+ final Schema tableSchema =
+ Schema.builder().addInt64Field("Key").addStringField("Value").build();
+ PCollectionRowTuple.of(
+ "input",
+ p.apply("Init", GenerateSequence.from(0).to(numRecords))
+ .apply(
+ MapElements.into(TypeDescriptors.rows())
+ .via(
+ seed ->
+ Row.withSchema(tableSchema)
+ .addValue(seed)
+ .addValue(Objects.requireNonNull(seed).toString())
+ .build()))
+ .setRowSchema(tableSchema))
+ .apply(
+ new SpannerWriteSchemaTransformProvider()
+ .from(
+ SpannerWriteSchemaTransformProvider.SpannerWriteSchemaTransformConfiguration
+ .builder()
+ .setDatabaseId(databaseName)
+ .setInstanceId(options.getInstanceId())
+ .setTableId(options.getTable())
+ .build())
+ .buildTransform());
+
+ PipelineResult result = p.run();
+ result.waitUntilFinish();
+ assertThat(result.getState(), is(PipelineResult.State.DONE));
+ assertThat(countNumberOfRecords(databaseName), equalTo((long) numRecords));
+ }
+
@Test
public void testSequentialWrite() throws Exception {
int numRecords = 100;