You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/13 20:42:05 UTC

[1/2] beam git commit: SpannerIO: Introduced a MutationGroup.

Repository: beam
Updated Branches:
  refs/heads/master 996e35c1d -> 646caf255


SpannerIO: Introduced a MutationGroup.

Allows to group together mutation in a logical bundle that is submitted in the same transaction.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9115af48
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9115af48
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9115af48

Branch: refs/heads/master
Commit: 9115af488ceb907de121313ffa096d58a0ccc1e1
Parents: 996e35c
Author: Mairbek Khadikov <ma...@google.com>
Authored: Wed Jun 7 16:27:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 13 13:28:25 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/spanner/MutationGroup.java  | 67 +++++++++++++++++
 .../io/gcp/spanner/MutationSizeEstimator.java   |  9 +++
 .../beam/sdk/io/gcp/spanner/SpannerIO.java      | 53 +++++++++++---
 .../gcp/spanner/MutationSizeEstimatorTest.java  | 12 ++++
 .../beam/sdk/io/gcp/spanner/SpannerIOTest.java  | 76 ++++++++++++++++----
 5 files changed, 197 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java
new file mode 100644
index 0000000..5b08da2
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.Mutation;
+import com.google.common.collect.ImmutableList;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A bundle of mutations that must be submitted atomically.
+ *
+ * <p>One of the mutations is chosen to be "primary", and can be used to determine partitions.
+ */
+public final class MutationGroup implements Serializable, Iterable<Mutation> {
+  private final ImmutableList<Mutation> mutations;
+
+  /**
+   * Creates a new group.
+   *
+   * @param primary a primary mutation.
+   * @param other other mutations, usually interleaved in parent.
+   * @return new mutation group.
+   */
+  public static MutationGroup create(Mutation primary, Mutation... other) {
+    return create(primary, Arrays.asList(other));
+  }
+
+  public static MutationGroup create(Mutation primary, Iterable<Mutation> other) {
+    return new MutationGroup(ImmutableList.<Mutation>builder().add(primary).addAll(other).build());
+  }
+
+  @Override
+  public Iterator<Mutation> iterator() {
+    return mutations.iterator();
+  }
+
+  private MutationGroup(ImmutableList<Mutation> mutations) {
+    this.mutations = mutations;
+  }
+
+  public Mutation primary() {
+    return mutations.get(0);
+  }
+
+  public List<Mutation> attached() {
+    return mutations.subList(1, mutations.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
index 61652e7..2418816 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
@@ -44,6 +44,15 @@ class MutationSizeEstimator {
     return result;
   }
 
+  /** Estimates a size of the mutation group in bytes. */
+  public static long sizeOf(MutationGroup group) {
+    long result = 0;
+    for (Mutation m : group) {
+      result += sizeOf(m);
+    }
+    return result;
+  }
+
   private static long estimatePrimitiveValue(Value v) {
     switch (v.getType().getCode()) {
       case BOOL:

http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 5058d13..af5253b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -29,10 +29,12 @@ import com.google.cloud.spanner.Mutation;
 import com.google.cloud.spanner.Spanner;
 import com.google.cloud.spanner.SpannerOptions;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -88,6 +90,11 @@ import org.slf4j.LoggerFactory;
  *   <li>If the pipeline was unexpectedly stopped, mutations that were already applied will not get
  *       rolled back.
  * </ul>
+ *
+ * <p>Use {@link MutationGroup} to ensure that a small set mutations is bundled together. It is
+ * guaranteed that mutations in a group are submitted in the same transaction. Build
+ * {@link SpannerIO.Write} transform, and call {@link Write#grouped()} method. It will return a
+ * transformation that can be applied to a PCollection of MutationGroup.
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class SpannerIO {
@@ -187,6 +194,13 @@ public class SpannerIO {
       return toBuilder().setDatabaseId(databaseId).build();
     }
 
+    /**
+     * Same transform but can be applied to {@link PCollection} of {@link MutationGroup}.
+     */
+    public WriteGrouped grouped() {
+      return new WriteGrouped(this);
+    }
+
     @VisibleForTesting
     Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
       return toBuilder().setServiceFactory(serviceFactory).build();
@@ -204,7 +218,9 @@ public class SpannerIO {
 
     @Override
     public PDone expand(PCollection<Mutation> input) {
-      input.apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteFn(this)));
+      input
+          .apply("To mutation group", ParDo.of(new ToMutationGroupFn()))
+          .apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteGroupFn(this)));
       return PDone.in(input.getPipeline());
     }
 
@@ -227,15 +243,37 @@ public class SpannerIO {
     }
   }
 
+  /** Same as {@link Write} but supports grouped mutations. */
+  public static class WriteGrouped extends PTransform<PCollection<MutationGroup>, PDone> {
+    private final Write spec;
+
+    public WriteGrouped(Write spec) {
+      this.spec = spec;
+    }
+
+    @Override public PDone expand(PCollection<MutationGroup> input) {
+      input.apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteGroupFn(spec)));
+      return PDone.in(input.getPipeline());
+    }
+  }
+
+  private static class ToMutationGroupFn extends DoFn<Mutation, MutationGroup> {
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      Mutation value = c.element();
+      c.output(MutationGroup.create(value));
+    }
+  }
+
   /** Batches together and writes mutations to Google Cloud Spanner. */
   @VisibleForTesting
-  static class SpannerWriteFn extends DoFn<Mutation, Void> {
-    private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteFn.class);
+  static class SpannerWriteGroupFn extends DoFn<MutationGroup, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class);
     private final Write spec;
     private transient Spanner spanner;
     private transient DatabaseClient dbClient;
     // Current batch of mutations to be written.
-    private List<Mutation> mutations;
+    private List<MutationGroup> mutations;
     private long batchSizeBytes = 0;
 
     private static final int MAX_RETRIES = 5;
@@ -244,8 +282,7 @@ public class SpannerIO {
             .withMaxRetries(MAX_RETRIES)
             .withInitialBackoff(Duration.standardSeconds(5));
 
-    @VisibleForTesting
-    SpannerWriteFn(Write spec) {
+    @VisibleForTesting SpannerWriteGroupFn(Write spec) {
       this.spec = spec;
     }
 
@@ -261,7 +298,7 @@ public class SpannerIO {
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
-      Mutation m = c.element();
+      MutationGroup m = c.element();
       mutations.add(m);
       batchSizeBytes += MutationSizeEstimator.sizeOf(m);
       if (batchSizeBytes >= spec.getBatchSizeBytes()) {
@@ -319,7 +356,7 @@ public class SpannerIO {
       while (true) {
         // Batch upsert rows.
         try {
-          dbClient.writeAtLeastOnce(mutations);
+          dbClient.writeAtLeastOnce(Iterables.concat(mutations));
 
           // Break if the commit threw no exception.
           break;

http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
index 03eb28e..013b83d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
@@ -135,4 +135,16 @@ public class MutationSizeEstimatorTest {
     assertThat(MutationSizeEstimator.sizeOf(timestampArray), is(24L));
     assertThat(MutationSizeEstimator.sizeOf(dateArray), is(48L));
   }
+
+  @Test
+  public void group() throws Exception {
+    Mutation int64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
+    Mutation float64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2.9).build();
+    Mutation bool = Mutation.newInsertOrUpdateBuilder("test").set("one").to(false).build();
+
+    MutationGroup group = MutationGroup.create(int64, float64, bool);
+
+    assertThat(MutationSizeEstimator.sizeOf(group), is(17L));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
index 5bdfea5..4a759fb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
@@ -114,9 +114,31 @@ public class SpannerIOTest implements Serializable {
   }
 
   @Test
-  public void batching() throws Exception {
+  @Category(NeedsRunner.class)
+  public void singleMutationGroupPipeline() throws Exception {
     Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
     Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+    Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
+    PCollection<MutationGroup> mutations = pipeline
+        .apply(Create.<MutationGroup>of(g(one, two, three)));
+    mutations.apply(
+        SpannerIO.write()
+            .withProjectId("test-project")
+            .withInstanceId("test-instance")
+            .withDatabaseId("test-database")
+            .withServiceFactory(serviceFactory)
+            .grouped());
+    pipeline.run();
+    verify(serviceFactory.mockSpanner())
+        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+    verify(serviceFactory.mockDatabaseClient(), times(1))
+        .writeAtLeastOnce(argThat(new IterableOfSize(3)));
+  }
+
+  @Test
+  public void batching() throws Exception {
+    MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
+    MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
     SpannerIO.Write write =
         SpannerIO.write()
             .withProjectId("test-project")
@@ -124,8 +146,8 @@ public class SpannerIOTest implements Serializable {
             .withDatabaseId("test-database")
             .withBatchSizeBytes(1000000000)
             .withServiceFactory(serviceFactory);
-    SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
-    DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+    SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write);
+    DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
     fnTester.processBundle(Arrays.asList(one, two));
 
     verify(serviceFactory.mockSpanner())
@@ -136,9 +158,9 @@ public class SpannerIOTest implements Serializable {
 
   @Test
   public void batchingGroups() throws Exception {
-    Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
-    Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
-    Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
+    MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
+    MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
+    MutationGroup three = g(Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build());
 
     // Have a room to accumulate one more item.
     long batchSize = MutationSizeEstimator.sizeOf(one) + 1;
@@ -150,8 +172,8 @@ public class SpannerIOTest implements Serializable {
             .withDatabaseId("test-database")
             .withBatchSizeBytes(batchSize)
             .withServiceFactory(serviceFactory);
-    SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
-    DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+    SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write);
+    DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
     fnTester.processBundle(Arrays.asList(one, two, three));
 
     verify(serviceFactory.mockSpanner())
@@ -164,8 +186,8 @@ public class SpannerIOTest implements Serializable {
 
   @Test
   public void noBatching() throws Exception {
-    Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
-    Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+    MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
+    MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
     SpannerIO.Write write =
         SpannerIO.write()
             .withProjectId("test-project")
@@ -173,8 +195,8 @@ public class SpannerIOTest implements Serializable {
             .withDatabaseId("test-database")
             .withBatchSizeBytes(0) // turn off batching.
             .withServiceFactory(serviceFactory);
-    SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
-    DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+    SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write);
+    DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
     fnTester.processBundle(Arrays.asList(one, two));
 
     verify(serviceFactory.mockSpanner())
@@ -183,6 +205,32 @@ public class SpannerIOTest implements Serializable {
         .writeAtLeastOnce(argThat(new IterableOfSize(1)));
   }
 
+  @Test
+  public void groups() throws Exception {
+    Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
+    Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+    Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
+
+    // Smallest batch size
+    long batchSize = 1;
+
+    SpannerIO.Write write =
+        SpannerIO.write()
+            .withProjectId("test-project")
+            .withInstanceId("test-instance")
+            .withDatabaseId("test-database")
+            .withBatchSizeBytes(batchSize)
+            .withServiceFactory(serviceFactory);
+    SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write);
+    DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
+    fnTester.processBundle(Arrays.asList(g(one, two, three)));
+
+    verify(serviceFactory.mockSpanner())
+        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+    verify(serviceFactory.mockDatabaseClient(), times(1))
+        .writeAtLeastOnce(argThat(new IterableOfSize(3)));
+  }
+
   private static class FakeServiceFactory
       implements ServiceFactory<Spanner, SpannerOptions>, Serializable {
     // Marked as static so they could be returned by serviceFactory, which is serializable.
@@ -241,4 +289,8 @@ public class SpannerIOTest implements Serializable {
       return argument instanceof Iterable && Iterables.size((Iterable<?>) argument) == size;
     }
   }
+
+  private static MutationGroup g(Mutation m, Mutation... other) {
+    return MutationGroup.create(m, other);
+  }
 }


[2/2] beam git commit: This closes #3319: [BEAM-1542] SpannerIO: Introduced a MutationGroup

Posted by jk...@apache.org.
This closes #3319: [BEAM-1542] SpannerIO: Introduced a MutationGroup


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/646caf25
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/646caf25
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/646caf25

Branch: refs/heads/master
Commit: 646caf255510c735488b88ecc69edd9b9bde4081
Parents: 996e35c 9115af4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jun 13 13:28:47 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 13 13:28:47 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/spanner/MutationGroup.java  | 67 +++++++++++++++++
 .../io/gcp/spanner/MutationSizeEstimator.java   |  9 +++
 .../beam/sdk/io/gcp/spanner/SpannerIO.java      | 53 +++++++++++---
 .../gcp/spanner/MutationSizeEstimatorTest.java  | 12 ++++
 .../beam/sdk/io/gcp/spanner/SpannerIOTest.java  | 76 ++++++++++++++++----
 5 files changed, 197 insertions(+), 20 deletions(-)
----------------------------------------------------------------------