You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/13 20:42:05 UTC
[1/2] beam git commit: SpannerIO: Introduced a MutationGroup.
Repository: beam
Updated Branches:
refs/heads/master 996e35c1d -> 646caf255
SpannerIO: Introduced a MutationGroup.
Allows to group together mutation in a logical bundle that is submitted in the same transaction.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9115af48
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9115af48
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9115af48
Branch: refs/heads/master
Commit: 9115af488ceb907de121313ffa096d58a0ccc1e1
Parents: 996e35c
Author: Mairbek Khadikov <ma...@google.com>
Authored: Wed Jun 7 16:27:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 13 13:28:25 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/spanner/MutationGroup.java | 67 +++++++++++++++++
.../io/gcp/spanner/MutationSizeEstimator.java | 9 +++
.../beam/sdk/io/gcp/spanner/SpannerIO.java | 53 +++++++++++---
.../gcp/spanner/MutationSizeEstimatorTest.java | 12 ++++
.../beam/sdk/io/gcp/spanner/SpannerIOTest.java | 76 ++++++++++++++++----
5 files changed, 197 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java
new file mode 100644
index 0000000..5b08da2
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.Mutation;
+import com.google.common.collect.ImmutableList;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A bundle of mutations that must be submitted atomically.
+ *
+ * <p>One of the mutations is chosen to be "primary", and can be used to determine partitions.
+ */
+public final class MutationGroup implements Serializable, Iterable<Mutation> {
+ private final ImmutableList<Mutation> mutations;
+
+ /**
+ * Creates a new group.
+ *
+ * @param primary a primary mutation.
+ * @param other other mutations, usually interleaved in parent.
+ * @return new mutation group.
+ */
+ public static MutationGroup create(Mutation primary, Mutation... other) {
+ return create(primary, Arrays.asList(other));
+ }
+
+ public static MutationGroup create(Mutation primary, Iterable<Mutation> other) {
+ return new MutationGroup(ImmutableList.<Mutation>builder().add(primary).addAll(other).build());
+ }
+
+ @Override
+ public Iterator<Mutation> iterator() {
+ return mutations.iterator();
+ }
+
+ private MutationGroup(ImmutableList<Mutation> mutations) {
+ this.mutations = mutations;
+ }
+
+ public Mutation primary() {
+ return mutations.get(0);
+ }
+
+ public List<Mutation> attached() {
+ return mutations.subList(1, mutations.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
index 61652e7..2418816 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
@@ -44,6 +44,15 @@ class MutationSizeEstimator {
return result;
}
+ /** Estimates a size of the mutation group in bytes. */
+ public static long sizeOf(MutationGroup group) {
+ long result = 0;
+ for (Mutation m : group) {
+ result += sizeOf(m);
+ }
+ return result;
+ }
+
private static long estimatePrimitiveValue(Value v) {
switch (v.getType().getCode()) {
case BOOL:
http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 5058d13..af5253b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -29,10 +29,12 @@ import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -88,6 +90,11 @@ import org.slf4j.LoggerFactory;
* <li>If the pipeline was unexpectedly stopped, mutations that were already applied will not get
* rolled back.
* </ul>
+ *
+ * <p>Use {@link MutationGroup} to ensure that a small set mutations is bundled together. It is
+ * guaranteed that mutations in a group are submitted in the same transaction. Build
+ * {@link SpannerIO.Write} transform, and call {@link Write#grouped()} method. It will return a
+ * transformation that can be applied to a PCollection of MutationGroup.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class SpannerIO {
@@ -187,6 +194,13 @@ public class SpannerIO {
return toBuilder().setDatabaseId(databaseId).build();
}
+ /**
+ * Same transform but can be applied to {@link PCollection} of {@link MutationGroup}.
+ */
+ public WriteGrouped grouped() {
+ return new WriteGrouped(this);
+ }
+
@VisibleForTesting
Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
return toBuilder().setServiceFactory(serviceFactory).build();
@@ -204,7 +218,9 @@ public class SpannerIO {
@Override
public PDone expand(PCollection<Mutation> input) {
- input.apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteFn(this)));
+ input
+ .apply("To mutation group", ParDo.of(new ToMutationGroupFn()))
+ .apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteGroupFn(this)));
return PDone.in(input.getPipeline());
}
@@ -227,15 +243,37 @@ public class SpannerIO {
}
}
+ /** Same as {@link Write} but supports grouped mutations. */
+ public static class WriteGrouped extends PTransform<PCollection<MutationGroup>, PDone> {
+ private final Write spec;
+
+ public WriteGrouped(Write spec) {
+ this.spec = spec;
+ }
+
+ @Override public PDone expand(PCollection<MutationGroup> input) {
+ input.apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteGroupFn(spec)));
+ return PDone.in(input.getPipeline());
+ }
+ }
+
+ private static class ToMutationGroupFn extends DoFn<Mutation, MutationGroup> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ Mutation value = c.element();
+ c.output(MutationGroup.create(value));
+ }
+ }
+
/** Batches together and writes mutations to Google Cloud Spanner. */
@VisibleForTesting
- static class SpannerWriteFn extends DoFn<Mutation, Void> {
- private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteFn.class);
+ static class SpannerWriteGroupFn extends DoFn<MutationGroup, Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class);
private final Write spec;
private transient Spanner spanner;
private transient DatabaseClient dbClient;
// Current batch of mutations to be written.
- private List<Mutation> mutations;
+ private List<MutationGroup> mutations;
private long batchSizeBytes = 0;
private static final int MAX_RETRIES = 5;
@@ -244,8 +282,7 @@ public class SpannerIO {
.withMaxRetries(MAX_RETRIES)
.withInitialBackoff(Duration.standardSeconds(5));
- @VisibleForTesting
- SpannerWriteFn(Write spec) {
+ @VisibleForTesting SpannerWriteGroupFn(Write spec) {
this.spec = spec;
}
@@ -261,7 +298,7 @@ public class SpannerIO {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- Mutation m = c.element();
+ MutationGroup m = c.element();
mutations.add(m);
batchSizeBytes += MutationSizeEstimator.sizeOf(m);
if (batchSizeBytes >= spec.getBatchSizeBytes()) {
@@ -319,7 +356,7 @@ public class SpannerIO {
while (true) {
// Batch upsert rows.
try {
- dbClient.writeAtLeastOnce(mutations);
+ dbClient.writeAtLeastOnce(Iterables.concat(mutations));
// Break if the commit threw no exception.
break;
http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
index 03eb28e..013b83d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
@@ -135,4 +135,16 @@ public class MutationSizeEstimatorTest {
assertThat(MutationSizeEstimator.sizeOf(timestampArray), is(24L));
assertThat(MutationSizeEstimator.sizeOf(dateArray), is(48L));
}
+
+ @Test
+ public void group() throws Exception {
+ Mutation int64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
+ Mutation float64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2.9).build();
+ Mutation bool = Mutation.newInsertOrUpdateBuilder("test").set("one").to(false).build();
+
+ MutationGroup group = MutationGroup.create(int64, float64, bool);
+
+ assertThat(MutationSizeEstimator.sizeOf(group), is(17L));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
index 5bdfea5..4a759fb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
@@ -114,9 +114,31 @@ public class SpannerIOTest implements Serializable {
}
@Test
- public void batching() throws Exception {
+ @Category(NeedsRunner.class)
+ public void singleMutationGroupPipeline() throws Exception {
Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+ Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
+ PCollection<MutationGroup> mutations = pipeline
+ .apply(Create.<MutationGroup>of(g(one, two, three)));
+ mutations.apply(
+ SpannerIO.write()
+ .withProjectId("test-project")
+ .withInstanceId("test-instance")
+ .withDatabaseId("test-database")
+ .withServiceFactory(serviceFactory)
+ .grouped());
+ pipeline.run();
+ verify(serviceFactory.mockSpanner())
+ .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+ verify(serviceFactory.mockDatabaseClient(), times(1))
+ .writeAtLeastOnce(argThat(new IterableOfSize(3)));
+ }
+
+ @Test
+ public void batching() throws Exception {
+ MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
+ MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
SpannerIO.Write write =
SpannerIO.write()
.withProjectId("test-project")
@@ -124,8 +146,8 @@ public class SpannerIOTest implements Serializable {
.withDatabaseId("test-database")
.withBatchSizeBytes(1000000000)
.withServiceFactory(serviceFactory);
- SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
- DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+ SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write);
+ DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
fnTester.processBundle(Arrays.asList(one, two));
verify(serviceFactory.mockSpanner())
@@ -136,9 +158,9 @@ public class SpannerIOTest implements Serializable {
@Test
public void batchingGroups() throws Exception {
- Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
- Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
- Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
+ MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
+ MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
+ MutationGroup three = g(Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build());
// Have a room to accumulate one more item.
long batchSize = MutationSizeEstimator.sizeOf(one) + 1;
@@ -150,8 +172,8 @@ public class SpannerIOTest implements Serializable {
.withDatabaseId("test-database")
.withBatchSizeBytes(batchSize)
.withServiceFactory(serviceFactory);
- SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
- DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+ SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write);
+ DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
fnTester.processBundle(Arrays.asList(one, two, three));
verify(serviceFactory.mockSpanner())
@@ -164,8 +186,8 @@ public class SpannerIOTest implements Serializable {
@Test
public void noBatching() throws Exception {
- Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
- Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+ MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
+ MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
SpannerIO.Write write =
SpannerIO.write()
.withProjectId("test-project")
@@ -173,8 +195,8 @@ public class SpannerIOTest implements Serializable {
.withDatabaseId("test-database")
.withBatchSizeBytes(0) // turn off batching.
.withServiceFactory(serviceFactory);
- SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
- DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+ SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write);
+ DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
fnTester.processBundle(Arrays.asList(one, two));
verify(serviceFactory.mockSpanner())
@@ -183,6 +205,32 @@ public class SpannerIOTest implements Serializable {
.writeAtLeastOnce(argThat(new IterableOfSize(1)));
}
+ @Test
+ public void groups() throws Exception {
+ Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
+ Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+ Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
+
+ // Smallest batch size
+ long batchSize = 1;
+
+ SpannerIO.Write write =
+ SpannerIO.write()
+ .withProjectId("test-project")
+ .withInstanceId("test-instance")
+ .withDatabaseId("test-database")
+ .withBatchSizeBytes(batchSize)
+ .withServiceFactory(serviceFactory);
+ SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write);
+ DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
+ fnTester.processBundle(Arrays.asList(g(one, two, three)));
+
+ verify(serviceFactory.mockSpanner())
+ .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+ verify(serviceFactory.mockDatabaseClient(), times(1))
+ .writeAtLeastOnce(argThat(new IterableOfSize(3)));
+ }
+
private static class FakeServiceFactory
implements ServiceFactory<Spanner, SpannerOptions>, Serializable {
// Marked as static so they could be returned by serviceFactory, which is serializable.
@@ -241,4 +289,8 @@ public class SpannerIOTest implements Serializable {
return argument instanceof Iterable && Iterables.size((Iterable<?>) argument) == size;
}
}
+
+ private static MutationGroup g(Mutation m, Mutation... other) {
+ return MutationGroup.create(m, other);
+ }
}
[2/2] beam git commit: This closes #3319: [BEAM-1542] SpannerIO:
Introduced a MutationGroup
Posted by jk...@apache.org.
This closes #3319: [BEAM-1542] SpannerIO: Introduced a MutationGroup
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/646caf25
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/646caf25
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/646caf25
Branch: refs/heads/master
Commit: 646caf255510c735488b88ecc69edd9b9bde4081
Parents: 996e35c 9115af4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jun 13 13:28:47 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 13 13:28:47 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/spanner/MutationGroup.java | 67 +++++++++++++++++
.../io/gcp/spanner/MutationSizeEstimator.java | 9 +++
.../beam/sdk/io/gcp/spanner/SpannerIO.java | 53 +++++++++++---
.../gcp/spanner/MutationSizeEstimatorTest.java | 12 ++++
.../beam/sdk/io/gcp/spanner/SpannerIOTest.java | 76 ++++++++++++++++----
5 files changed, 197 insertions(+), 20 deletions(-)
----------------------------------------------------------------------