You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/10/31 00:06:08 UTC
[1/2] beam git commit: [BEAM-1542] SpannerIO: mutation encoding and
size estimation improvements
Repository: beam
Updated Branches:
refs/heads/master aa26f4bf7 -> 09f68159d
[BEAM-1542] SpannerIO: mutation encoding and size estimation improvements
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3ba96003
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3ba96003
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3ba96003
Branch: refs/heads/master
Commit: 3ba96003d31ce98a54c0c51c1c0a9cf7c06e2fa2
Parents: aa26f4b
Author: Mairbek Khadikov <ma...@google.com>
Authored: Wed Oct 18 15:26:11 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Oct 30 17:02:36 2017 -0700
----------------------------------------------------------------------
.../io/gcp/spanner/MutationGroupEncoder.java | 660 +++++++++++++++++++
.../io/gcp/spanner/MutationSizeEstimator.java | 48 ++
.../gcp/spanner/MutationGroupEncoderTest.java | 636 ++++++++++++++++++
3 files changed, 1344 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3ba96003/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java
new file mode 100644
index 0000000..ba0b4eb
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Date;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Value;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.util.VarInt;
+import org.joda.time.DateTime;
+import org.joda.time.Days;
+import org.joda.time.MutableDateTime;
+
+/**
+ * Given the Spanner Schema, efficiently encodes the mutation group.
+ */
+class MutationGroupEncoder {
+ private static final DateTime MIN_DATE = new DateTime(1, 1, 1, 0, 0);
+
+ private final SpannerSchema schema;
+ private final List<String> tables;
+ private final Map<String, Integer> tablesIndexes = new HashMap<>();
+
+ public MutationGroupEncoder(SpannerSchema schema) {
+ this.schema = schema;
+ tables = schema.getTables();
+
+ for (int i = 0; i < tables.size(); i++) {
+ tablesIndexes.put(tables.get(i), i);
+ }
+ }
+
+ public byte[] encode(MutationGroup g) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ try {
+ VarInt.encode(g.attached().size(), bos);
+ for (Mutation m : g) {
+ encodeMutation(bos, m);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return bos.toByteArray();
+ }
+
+ private static void setBit(byte[] bytes, int i) {
+ int word = i / 8;
+ int bit = 7 - i % 8;
+ bytes[word] |= 1 << bit;
+ }
+
+ private static boolean getBit(byte[] bytes, int i) {
+ int word = i / 8;
+ int bit = 7 - i % 8;
+ return (bytes[word] & 1 << (bit)) != 0;
+ }
+
+ private void encodeMutation(ByteArrayOutputStream bos, Mutation m) throws IOException {
+ Mutation.Op op = m.getOperation();
+ bos.write(op.ordinal());
+ if (op == Mutation.Op.DELETE) {
+ encodeDelete(bos, m);
+ } else {
+ encodeModification(bos, m);
+ }
+ }
+
+ private void encodeDelete(ByteArrayOutputStream bos, Mutation m) throws IOException {
+ String table = m.getTable().toLowerCase();
+ int tableIndex = getTableIndex(table);
+ VarInt.encode(tableIndex, bos);
+ ObjectOutput out = new ObjectOutputStream(bos);
+ out.writeObject(m.getKeySet());
+ }
+
+ private Integer getTableIndex(String table) {
+ Integer result = tablesIndexes.get(table);
+ checkArgument(result != null, "Unknown table '%s'", table);
+ return result;
+ }
+
+ private Mutation decodeDelete(ByteArrayInputStream bis)
+ throws IOException {
+ int tableIndex = VarInt.decodeInt(bis);
+ String tableName = tables.get(tableIndex);
+
+ ObjectInputStream in = new ObjectInputStream(bis);
+ KeySet keySet;
+ try {
+ keySet = (KeySet) in.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ return Mutation.delete(tableName, keySet);
+ }
+
+ // Encodes a mutation that is not a delete one, using the following format
+ // [bitset of modified columns][value of column1][value of column2][value of column3]...
+ private void encodeModification(ByteArrayOutputStream bos, Mutation m) throws IOException {
+ String tableName = m.getTable().toLowerCase();
+ int tableIndex = getTableIndex(tableName);
+ VarInt.encode(tableIndex, bos);
+ List<SpannerSchema.Column> columns = schema.getColumns(tableName);
+ checkArgument(columns != null, "Schema for table " + tableName + " not " + "found");
+ Map<String, Value> map = mutationAsMap(m);
+ // java.util.BitSet#toByteArray returns array of unpredictable length. Using byte arrays
+ // instead.
+ int bitsetSize = (columns.size() + 7) / 8;
+ byte[] exists = new byte[bitsetSize];
+ byte[] nulls = new byte[bitsetSize];
+ for (int i = 0; i < columns.size(); i++) {
+ String columnName = columns.get(i).getName();
+ boolean columnExists = map.containsKey(columnName);
+ boolean columnNull = columnExists && map.get(columnName).isNull();
+ if (columnExists) {
+ setBit(exists, i);
+ }
+ if (columnNull) {
+ setBit(nulls, i);
+ map.remove(columnName);
+ }
+ }
+ bos.write(exists);
+ bos.write(nulls);
+ for (int i = 0; i < columns.size(); i++) {
+ if (!getBit(exists, i) || getBit(nulls, i)) {
+ continue;
+ }
+ SpannerSchema.Column column = columns.get(i);
+ Value value = map.remove(column.getName());
+ encodeValue(bos, value);
+ }
+ checkArgument(map.isEmpty(), "Columns %s were not defined in table %s", map.keySet(),
+ m.getTable());
+ }
+
+ private void encodeValue(ByteArrayOutputStream bos, Value value) throws IOException {
+ switch (value.getType().getCode()) {
+ case ARRAY:
+ encodeArray(bos, value);
+ break;
+ default:
+ encodePrimitive(bos, value);
+ }
+ }
+
+ private void encodeArray(ByteArrayOutputStream bos, Value value) throws IOException {
+ // TODO: avoid using Java serialization here.
+ ObjectOutputStream out = new ObjectOutputStream(bos);
+ switch (value.getType().getArrayElementType().getCode()) {
+ case BOOL: {
+ out.writeObject(new ArrayList<>(value.getBoolArray()));
+ break;
+ }
+ case INT64: {
+ out.writeObject(new ArrayList<>(value.getInt64Array()));
+ break;
+ }
+ case FLOAT64: {
+ out.writeObject(new ArrayList<>(value.getFloat64Array()));
+ break;
+ }
+ case STRING: {
+ out.writeObject(new ArrayList<>(value.getStringArray()));
+ break;
+ }
+ case BYTES: {
+ out.writeObject(new ArrayList<>(value.getBytesArray()));
+ break;
+ }
+ case TIMESTAMP: {
+ out.writeObject(new ArrayList<>(value.getTimestampArray()));
+ break;
+ }
+ case DATE: {
+ out.writeObject(new ArrayList<>(value.getDateArray()));
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown type " + value.getType());
+ }
+ }
+
+ private void encodePrimitive(ByteArrayOutputStream bos, Value value) throws IOException {
+ switch (value.getType().getCode()) {
+ case BOOL:
+ bos.write(value.getBool() ? 1 : 0);
+ break;
+ case INT64:
+ VarInt.encode(value.getInt64(), bos);
+ break;
+ case FLOAT64:
+ new DataOutputStream(bos).writeDouble(value.getFloat64());
+ break;
+ case STRING: {
+ String str = value.getString();
+ VarInt.encode(str.length(), bos);
+ bos.write(str.getBytes(StandardCharsets.UTF_8));
+ break;
+ }
+ case BYTES: {
+ ByteArray bytes = value.getBytes();
+ VarInt.encode(bytes.length(), bos);
+ bos.write(bytes.toByteArray());
+ break;
+ }
+ case TIMESTAMP: {
+ Timestamp timestamp = value.getTimestamp();
+ VarInt.encode(timestamp.getSeconds(), bos);
+ VarInt.encode(timestamp.getNanos(), bos);
+ break;
+ }
+ case DATE: {
+ Date date = value.getDate();
+ VarInt.encode(encodeDate(date), bos);
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown type " + value.getType());
+ }
+ }
+
+ public MutationGroup decode(byte[] bytes) {
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+
+ try {
+ int numMutations = VarInt.decodeInt(bis);
+ Mutation primary = decodeMutation(bis);
+ List<Mutation> attached = new ArrayList<>(numMutations);
+ for (int i = 0; i < numMutations; i++) {
+ attached.add(decodeMutation(bis));
+ }
+ return MutationGroup.create(primary, attached);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Mutation decodeMutation(ByteArrayInputStream bis) throws IOException {
+ Mutation.Op op = Mutation.Op.values()[bis.read()];
+ if (op == Mutation.Op.DELETE) {
+ return decodeDelete(bis);
+ }
+ return decodeModification(bis, op);
+ }
+
+ private Mutation decodeModification(ByteArrayInputStream bis, Mutation.Op op) throws IOException {
+ int tableIndex = VarInt.decodeInt(bis);
+ String tableName = tables.get(tableIndex);
+
+ Mutation.WriteBuilder m;
+ switch (op) {
+ case INSERT:
+ m = Mutation.newInsertBuilder(tableName);
+ break;
+ case INSERT_OR_UPDATE:
+ m = Mutation.newInsertOrUpdateBuilder(tableName);
+ break;
+ case REPLACE:
+ m = Mutation.newReplaceBuilder(tableName);
+ break;
+ case UPDATE:
+ m = Mutation.newUpdateBuilder(tableName);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown operation " + op);
+ }
+ List<SpannerSchema.Column> columns = schema.getColumns(tableName);
+ int bitsetSize = (columns.size() + 7) / 8;
+ byte[] exists = readBytes(bis, bitsetSize);
+ byte[] nulls = readBytes(bis, bitsetSize);
+
+ for (int i = 0; i < columns.size(); i++) {
+ if (!getBit(exists, i)) {
+ continue;
+ }
+ SpannerSchema.Column column = columns.get(i);
+ boolean isNull = getBit(nulls, i);
+ Type type = column.getType();
+ String fieldName = column.getName();
+ switch (type.getCode()) {
+ case ARRAY:
+ try {
+ decodeArray(bis, fieldName, type, isNull, m);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ default:
+ decodePrimitive(bis, fieldName, type, isNull, m);
+ }
+
+ }
+ return m.build();
+ }
+
+ private void decodeArray(ByteArrayInputStream bis, String fieldName, Type type, boolean isNull,
+ Mutation.WriteBuilder m) throws IOException, ClassNotFoundException {
+ // TODO: avoid using Java serialization here.
+ switch (type.getArrayElementType().getCode()) {
+ case BOOL: {
+ if (isNull) {
+ m.set(fieldName).toBoolArray((Iterable<Boolean>) null);
+ } else {
+ ObjectInputStream out = new ObjectInputStream(bis);
+ m.set(fieldName).toBoolArray((List<Boolean>) out.readObject());
+ }
+ break;
+ }
+ case INT64: {
+ if (isNull) {
+ m.set(fieldName).toInt64Array((Iterable<Long>) null);
+ } else {
+ ObjectInputStream out = new ObjectInputStream(bis);
+ m.set(fieldName).toInt64Array((List<Long>) out.readObject());
+ }
+ break;
+ }
+ case FLOAT64: {
+ if (isNull) {
+ m.set(fieldName).toFloat64Array((Iterable<Double>) null);
+ } else {
+ ObjectInputStream out = new ObjectInputStream(bis);
+ m.set(fieldName).toFloat64Array((List<Double>) out.readObject());
+ }
+ break;
+ }
+ case STRING: {
+ if (isNull) {
+ m.set(fieldName).toStringArray(null);
+ } else {
+ ObjectInputStream out = new ObjectInputStream(bis);
+ m.set(fieldName).toStringArray((List<String>) out.readObject());
+ }
+ break;
+ }
+ case BYTES: {
+ if (isNull) {
+ m.set(fieldName).toBytesArray(null);
+ } else {
+ ObjectInputStream out = new ObjectInputStream(bis);
+ m.set(fieldName).toBytesArray((List<ByteArray>) out.readObject());
+ }
+ break;
+ }
+ case TIMESTAMP: {
+ if (isNull) {
+ m.set(fieldName).toTimestampArray(null);
+ } else {
+ ObjectInputStream out = new ObjectInputStream(bis);
+ m.set(fieldName).toTimestampArray((List<Timestamp>) out.readObject());
+ }
+ break;
+ }
+ case DATE: {
+ if (isNull) {
+ m.set(fieldName).toDateArray(null);
+ } else {
+ ObjectInputStream out = new ObjectInputStream(bis);
+ m.set(fieldName).toDateArray((List<Date>) out.readObject());
+ }
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown type " + type);
+ }
+ }
+
+ private void decodePrimitive(ByteArrayInputStream bis, String fieldName, Type type,
+ boolean isNull, Mutation.WriteBuilder m) throws IOException {
+ switch (type.getCode()) {
+ case BOOL:
+ if (isNull) {
+ m.set(fieldName).to((Boolean) null);
+ } else {
+ m.set(fieldName).to(bis.read() != 0);
+ }
+ break;
+ case INT64:
+ if (isNull) {
+ m.set(fieldName).to((Long) null);
+ } else {
+ m.set(fieldName).to(VarInt.decodeLong(bis));
+ }
+ break;
+ case FLOAT64:
+ if (isNull) {
+ m.set(fieldName).to((Double) null);
+ } else {
+ m.set(fieldName).to(new DataInputStream(bis).readDouble());
+ }
+ break;
+ case STRING: {
+ if (isNull) {
+ m.set(fieldName).to((String) null);
+ } else {
+ int len = VarInt.decodeInt(bis);
+ byte[] bytes = readBytes(bis, len);
+ m.set(fieldName).to(new String(bytes, StandardCharsets.UTF_8));
+ }
+ break;
+ }
+ case BYTES: {
+ if (isNull) {
+ m.set(fieldName).to((ByteArray) null);
+ } else {
+ int len = VarInt.decodeInt(bis);
+ byte[] bytes = readBytes(bis, len);
+ m.set(fieldName).to(ByteArray.copyFrom(bytes));
+ }
+ break;
+ }
+ case TIMESTAMP: {
+ if (isNull) {
+ m.set(fieldName).to((Timestamp) null);
+ } else {
+ int seconds = VarInt.decodeInt(bis);
+ int nanoseconds = VarInt.decodeInt(bis);
+ m.set(fieldName).to(Timestamp.ofTimeSecondsAndNanos(seconds, nanoseconds));
+ }
+ break;
+ }
+ case DATE: {
+ if (isNull) {
+ m.set(fieldName).to((Date) null);
+ } else {
+ int days = VarInt.decodeInt(bis);
+ m.set(fieldName).to(decodeDate(days));
+ }
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown type " + type);
+ }
+ }
+
+ private byte[] readBytes(ByteArrayInputStream bis, int len) throws IOException {
+ byte[] tmp = new byte[len];
+ new DataInputStream(bis).readFully(tmp);
+ return tmp;
+ }
+
+ /**
+ * Builds a lexicographically sortable binary key based on a primary key descriptor.
+ * @param m a spanner mutation.
+ * @return a binary string that preserves the ordering of the primary key.
+ */
+ public byte[] encodeKey(Mutation m) {
+ Map<String, Value> mutationMap = mutationAsMap(m);
+ OrderedCode orderedCode = new OrderedCode();
+ for (SpannerSchema.KeyPart part : schema.getKeyParts(m.getTable())) {
+ Value val = mutationMap.get(part.getField());
+ if (val.isNull()) {
+ if (part.isDesc()) {
+ orderedCode.writeInfinityDecreasing();
+ } else {
+ orderedCode.writeInfinity();
+ }
+ } else {
+ Type.Code code = val.getType().getCode();
+ switch (code) {
+ case BOOL:
+ long v = val.getBool() ? 0 : 1;
+ if (part.isDesc()) {
+ orderedCode.writeSignedNumDecreasing(v);
+ } else {
+ orderedCode.writeSignedNumIncreasing(v);
+ }
+ break;
+ case INT64:
+ if (part.isDesc()) {
+ orderedCode.writeSignedNumDecreasing(val.getInt64());
+ } else {
+ orderedCode.writeSignedNumIncreasing(val.getInt64());
+ }
+ break;
+ case FLOAT64:
+ if (part.isDesc()) {
+ orderedCode.writeSignedNumDecreasing(Double.doubleToLongBits(val.getFloat64()));
+ } else {
+ orderedCode.writeSignedNumIncreasing(Double.doubleToLongBits(val.getFloat64()));
+ }
+ break;
+ case STRING:
+ if (part.isDesc()) {
+ orderedCode.writeBytesDecreasing(val.getString().getBytes());
+ } else {
+ orderedCode.writeBytes(val.getString().getBytes());
+ }
+ break;
+ case BYTES:
+ if (part.isDesc()) {
+ orderedCode.writeBytesDecreasing(val.getBytes().toByteArray());
+ } else {
+ orderedCode.writeBytes(val.getBytes().toByteArray());
+ }
+ break;
+ case TIMESTAMP: {
+ Timestamp value = val.getTimestamp();
+ if (part.isDesc()) {
+ orderedCode.writeNumDecreasing(value.getSeconds());
+ orderedCode.writeNumDecreasing(value.getNanos());
+ } else {
+ orderedCode.writeNumIncreasing(value.getSeconds());
+ orderedCode.writeNumIncreasing(value.getNanos());
+ }
+ break;
+ }
+ case DATE:
+ Date value = val.getDate();
+ if (part.isDesc()) {
+ orderedCode.writeSignedNumDecreasing(encodeDate(value));
+ } else {
+ orderedCode.writeSignedNumIncreasing(encodeDate(value));
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown type " + val.getType());
+ }
+ }
+ }
+ return orderedCode.getEncodedBytes();
+ }
+
+ public byte[] encodeKey(String table, Key key) {
+ OrderedCode orderedCode = new OrderedCode();
+ List<SpannerSchema.KeyPart> parts = schema.getKeyParts(table);
+ Iterator<Object> it = key.getParts().iterator();
+ for (SpannerSchema.KeyPart part : parts) {
+ Object value = it.next();
+ if (value == null) {
+ if (part.isDesc()) {
+ orderedCode.writeInfinityDecreasing();
+ } else {
+ orderedCode.writeInfinity();
+ }
+ } else {
+ if (value instanceof Boolean) {
+ long v = (Boolean) value ? 0 : 1;
+ if (part.isDesc()) {
+ orderedCode.writeSignedNumDecreasing(v);
+ } else {
+ orderedCode.writeSignedNumIncreasing(v);
+ }
+ } else if (value instanceof Long) {
+ long v = (long) value;
+ if (part.isDesc()) {
+ orderedCode.writeSignedNumDecreasing(v);
+ } else {
+ orderedCode.writeSignedNumIncreasing(v);
+ }
+ } else if (value instanceof Double) {
+ long v = Double.doubleToLongBits((double) value);
+ if (part.isDesc()) {
+ orderedCode.writeSignedNumDecreasing(v);
+ } else {
+ orderedCode.writeSignedNumIncreasing(v);
+ }
+ } else if (value instanceof String) {
+ String v = (String) value;
+ if (part.isDesc()) {
+ orderedCode.writeBytesDecreasing(v.getBytes());
+ } else {
+ orderedCode.writeBytes(v.getBytes());
+ }
+ } else if (value instanceof ByteArray) {
+ ByteArray v = (ByteArray) value;
+ if (part.isDesc()) {
+ orderedCode.writeBytesDecreasing(v.toByteArray());
+ } else {
+ orderedCode.writeBytes(v.toByteArray());
+ }
+ } else if (value instanceof Timestamp) {
+ Timestamp v = (Timestamp) value;
+ if (part.isDesc()) {
+ orderedCode.writeNumDecreasing(v.getSeconds());
+ orderedCode.writeNumDecreasing(v.getNanos());
+ } else {
+ orderedCode.writeNumIncreasing(v.getSeconds());
+ orderedCode.writeNumIncreasing(v.getNanos());
+ }
+ } else if (value instanceof Date) {
+ Date v = (Date) value;
+ if (part.isDesc()) {
+ orderedCode.writeSignedNumDecreasing(encodeDate(v));
+ } else {
+ orderedCode.writeSignedNumIncreasing(encodeDate(v));
+ }
+ } else {
+ throw new IllegalArgumentException("Unknown key part " + value);
+ }
+ }
+ }
+ return orderedCode.getEncodedBytes();
+ }
+
+ private static Map<String, Value> mutationAsMap(Mutation m) {
+ Map<String, Value> result = new HashMap<>();
+ Iterator<String> coli = m.getColumns().iterator();
+ Iterator<Value> vali = m.getValues().iterator();
+ while (coli.hasNext()) {
+ String column = coli.next();
+ Value val = vali.next();
+ result.put(column.toLowerCase(), val);
+ }
+ return result;
+ }
+
+ private static int encodeDate(Date date) {
+
+ MutableDateTime jodaDate = new MutableDateTime();
+ jodaDate.setDate(date.getYear(), date.getMonth(), date.getDayOfMonth());
+
+ return Days.daysBetween(MIN_DATE, jodaDate).getDays();
+ }
+
+ private static Date decodeDate(int daysSinceEpoch) {
+
+ DateTime jodaDate = MIN_DATE.plusDays(daysSinceEpoch);
+
+ return Date
+ .fromYearMonthDay(jodaDate.getYear(), jodaDate.getMonthOfYear(), jodaDate.getDayOfMonth());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3ba96003/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
index 2418816..c483af9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
@@ -18,6 +18,11 @@
package org.apache.beam.sdk.io.gcp.spanner;
import com.google.cloud.ByteArray;
+import com.google.cloud.Date;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeyRange;
+import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Value;
@@ -29,6 +34,9 @@ class MutationSizeEstimator {
/** Estimates a size of mutation in bytes. */
static long sizeOf(Mutation m) {
+ if (m.getOperation() == Mutation.Op.DELETE) {
+ return sizeOf(m.getKeySet());
+ }
long result = 0;
for (Value v : m.getValues()) {
switch (v.getType().getCode()) {
@@ -44,6 +52,46 @@ class MutationSizeEstimator {
return result;
}
+ private static long sizeOf(KeySet keySet) {
+ long result = 0;
+ for (Key k : keySet.getKeys()) {
+ result += sizeOf(k);
+ }
+ for (KeyRange kr : keySet.getRanges()) {
+ result += sizeOf(kr);
+ }
+ return result;
+ }
+
+ private static long sizeOf(KeyRange kr) {
+ return sizeOf(kr.getStart()) + sizeOf(kr.getEnd());
+ }
+
+ private static long sizeOf(Key k) {
+ long result = 0;
+ for (Object part : k.getParts()) {
+ if (part == null) {
+ continue;
+ }
+ if (part instanceof Boolean) {
+ result += 1;
+ } else if (part instanceof Long) {
+ result += 8;
+ } else if (part instanceof Double) {
+ result += 8;
+ } else if (part instanceof String) {
+ result += ((String) part).length();
+ } else if (part instanceof ByteArray) {
+ result += ((ByteArray) part).length();
+ } else if (part instanceof Timestamp) {
+ result += 12;
+ } else if (part instanceof Date) {
+ result += 12;
+ }
+ }
+ return result;
+ }
+
/** Estimates a size of the mutation group in bytes. */
public static long sizeOf(MutationGroup group) {
long result = 0;
http://git-wip-us.apache.org/repos/asf/beam/blob/3ba96003/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java
new file mode 100644
index 0000000..d40e356
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Date;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeyRange;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.primitives.UnsignedBytes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Tests for {@link MutationGroupEncoder}.
+ */
+public class MutationGroupEncoderTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private SpannerSchema allTypesSchema;
+
+ @Before
+ public void setUp() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+
+ builder.addColumn("test", "intkey", "INT64");
+ builder.addKeyPart("test", "intkey", false);
+
+ builder.addColumn("test", "bool", "BOOL");
+ builder.addColumn("test", "int64", "INT64");
+ builder.addColumn("test", "float64", "FLOAT64");
+ builder.addColumn("test", "string", "STRING");
+ builder.addColumn("test", "bytes", "BYTES");
+ builder.addColumn("test", "timestamp", "TIMESTAMP");
+ builder.addColumn("test", "date", "DATE");
+
+ builder.addColumn("test", "nullbool", "BOOL");
+ builder.addColumn("test", "nullint64", "INT64");
+ builder.addColumn("test", "nullfloat64", "FLOAT64");
+ builder.addColumn("test", "nullstring", "STRING");
+ builder.addColumn("test", "nullbytes", "BYTES");
+ builder.addColumn("test", "nulltimestamp", "TIMESTAMP");
+ builder.addColumn("test", "nulldate", "DATE");
+
+ builder.addColumn("test", "arrbool", "ARRAY<BOOL>");
+ builder.addColumn("test", "arrint64", "ARRAY<INT64>");
+ builder.addColumn("test", "arrfloat64", "ARRAY<FLOAT64>");
+ builder.addColumn("test", "arrstring", "ARRAY<STRING>");
+ builder.addColumn("test", "arrbytes", "ARRAY<BYTES>");
+ builder.addColumn("test", "arrtimestamp", "ARRAY<TIMESTAMP>");
+ builder.addColumn("test", "arrdate", "ARRAY<DATE>");
+
+ builder.addColumn("test", "nullarrbool", "ARRAY<BOOL>");
+ builder.addColumn("test", "nullarrint64", "ARRAY<INT64>");
+ builder.addColumn("test", "nullarrfloat64", "ARRAY<FLOAT64>");
+ builder.addColumn("test", "nullarrstring", "ARRAY<STRING>");
+ builder.addColumn("test", "nullarrbytes", "ARRAY<BYTES>");
+ builder.addColumn("test", "nullarrtimestamp", "ARRAY<TIMESTAMP>");
+ builder.addColumn("test", "nullarrdate", "ARRAY<DATE>");
+
+ allTypesSchema = builder.build();
+ }
+
+ @Test
+ public void testAllTypesSingleMutation() throws Exception {
+ encodeAndVerify(g(appendAllTypes(Mutation.newInsertOrUpdateBuilder("test")).build()));
+ encodeAndVerify(g(appendAllTypes(Mutation.newInsertBuilder("test")).build()));
+ encodeAndVerify(g(appendAllTypes(Mutation.newUpdateBuilder("test")).build()));
+ encodeAndVerify(g(appendAllTypes(Mutation.newReplaceBuilder("test")).build()));
+ }
+
+ @Test
+ public void testAllTypesMultipleMutations() throws Exception {
+ encodeAndVerify(g(
+ appendAllTypes(Mutation.newInsertOrUpdateBuilder("test")).build(),
+ appendAllTypes(Mutation.newInsertBuilder("test")).build(),
+ appendAllTypes(Mutation.newUpdateBuilder("test")).build(),
+ appendAllTypes(Mutation.newReplaceBuilder("test")).build(),
+ Mutation
+ .delete("test", KeySet.range(KeyRange.closedClosed(Key.of(1L), Key.of(2L))))));
+ }
+
+ @Test
+ public void testUnknownColumn() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+ builder.addKeyPart("test", "bool_field", false);
+ builder.addColumn("test", "bool_field", "BOOL");
+ SpannerSchema schema = builder.build();
+
+ Mutation mutation = Mutation.newInsertBuilder("test").set("unknown")
+ .to(true).build();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Columns [unknown] were not defined in table test");
+ encodeAndVerify(g(mutation), schema);
+ }
+
+ @Test
+ public void testUnknownTable() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+ builder.addKeyPart("test", "bool_field", false);
+ builder.addColumn("test", "bool_field", "BOOL");
+ SpannerSchema schema = builder.build();
+
+ Mutation mutation = Mutation.newInsertBuilder("unknown").set("bool_field")
+ .to(true).build();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Unknown table 'unknown'");
+ encodeAndVerify(g(mutation), schema);
+ }
+
+ @Test
+ public void testMutationCaseInsensitive() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+ builder.addKeyPart("test", "bool_field", false);
+ builder.addColumn("test", "bool_field", "BOOL");
+ SpannerSchema schema = builder.build();
+
+ Mutation mutation = Mutation.newInsertBuilder("TEsT").set("BoOL_FiELd").to(true).build();
+ encodeAndVerify(g(mutation), schema);
+ }
+
+ @Test
+ public void testDeleteCaseInsensitive() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+ builder.addKeyPart("test", "bool_field", false);
+ builder.addColumn("test", "int_field", "INT64");
+ SpannerSchema schema = builder.build();
+
+ Mutation mutation = Mutation.delete("TeSt", Key.of(1L));
+ encodeAndVerify(g(mutation), schema);
+ }
+
+ @Test
+ public void testDeletes() throws Exception {
+ encodeAndVerify(g(Mutation.delete("test", Key.of(1L))));
+ encodeAndVerify(g(Mutation.delete("test", Key.of((Long) null))));
+
+ KeySet allTypes = KeySet.newBuilder()
+ .addKey(Key.of(1L))
+ .addKey(Key.of((Long) null))
+ .addKey(Key.of(1.2))
+ .addKey(Key.of((Double) null))
+ .addKey(Key.of("one"))
+ .addKey(Key.of((String) null))
+ .addKey(Key.of(ByteArray.fromBase64("abcd")))
+ .addKey(Key.of((ByteArray) null))
+ .addKey(Key.of(Timestamp.now()))
+ .addKey(Key.of((Timestamp) null))
+ .addKey(Key.of(Date.fromYearMonthDay(2012, 1, 1)))
+ .addKey(Key.of((Date) null))
+ .build();
+
+ encodeAndVerify(g(Mutation.delete("test", allTypes)));
+
+ encodeAndVerify(
+ g(Mutation
+ .delete("test", KeySet.range(KeyRange.closedClosed(Key.of(1L), Key.of(2L))))));
+ }
+
+ private Mutation.WriteBuilder appendAllTypes(Mutation.WriteBuilder builder) {
+ Timestamp ts = Timestamp.now();
+ Date date = Date.fromYearMonthDay(2017, 1, 1);
+ return builder
+ .set("bool").to(true)
+ .set("int64").to(1L)
+ .set("float64").to(1.0)
+ .set("string").to("my string")
+ .set("bytes").to(ByteArray.fromBase64("abcdedf"))
+ .set("timestamp").to(ts)
+ .set("date").to(date)
+
+ .set("arrbool").toBoolArray(Arrays.asList(true, false, null, true, null, false))
+ .set("arrint64").toInt64Array(Arrays.asList(10L, -12L, null, null, 100000L))
+ .set("arrfloat64").toFloat64Array(Arrays.asList(10., -12.23, null, null, 100000.33231))
+ .set("arrstring").toStringArray(Arrays.asList("one", "two", null, null, "three"))
+ .set("arrbytes").toBytesArray(Arrays.asList(ByteArray.fromBase64("abcs"), null))
+ .set("arrtimestamp").toTimestampArray(Arrays.asList(Timestamp.MIN_VALUE, null, ts))
+ .set("arrdate").toDateArray(Arrays.asList(null, date))
+
+ .set("nullbool").to((Boolean) null)
+ .set("nullint64").to((Long) null)
+ .set("nullfloat64").to((Double) null)
+ .set("nullstring").to((String) null)
+ .set("nullbytes").to((ByteArray) null)
+ .set("nulltimestamp").to((Timestamp) null)
+ .set("nulldate").to((Date) null)
+
+ .set("nullarrbool").toBoolArray((Iterable<Boolean>) null)
+ .set("nullarrint64").toInt64Array((Iterable<Long>) null)
+ .set("nullarrfloat64").toFloat64Array((Iterable<Double>) null)
+ .set("nullarrstring").toStringArray(null)
+ .set("nullarrbytes").toBytesArray(null)
+ .set("nullarrtimestamp").toTimestampArray(null)
+ .set("nullarrdate").toDateArray(null);
+ }
+
+ @Test
+ public void int64Keys() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+
+ builder.addColumn("test", "key", "INT64");
+ builder.addKeyPart("test", "key", false);
+
+ builder.addColumn("test", "keydesc", "INT64");
+ builder.addKeyPart("test", "keydesc", true);
+
+ SpannerSchema schema = builder.build();
+
+ List<Mutation> mutations = Arrays.asList(
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(1L)
+ .set("keydesc").to(0L)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(2L)
+ .set("keydesc").to((Long) null)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(2L)
+ .set("keydesc").to(10L)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(2L)
+ .set("keydesc").to(9L)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to((Long) null)
+ .set("keydesc").to(0L)
+ .build());
+
+ List<Key> keys = Arrays.asList(
+ Key.of(1L, 0L),
+ Key.of(2L, null),
+ Key.of(2L, 10L),
+ Key.of(2L, 9L),
+ Key.of(2L, 0L)
+ );
+
+ verifyEncodedOrdering(schema, mutations);
+ verifyEncodedOrdering(schema, "test", keys);
+ }
+
+ @Test
+ public void float64Keys() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+
+ builder.addColumn("test", "key", "FLOAT64");
+ builder.addKeyPart("test", "key", false);
+
+ builder.addColumn("test", "keydesc", "FLOAT64");
+ builder.addKeyPart("test", "keydesc", true);
+
+ SpannerSchema schema = builder.build();
+
+ List<Mutation> mutations = Arrays.asList(
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(1.0)
+ .set("keydesc").to(0.)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(2.)
+ .set("keydesc").to((Long) null)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(2.)
+ .set("keydesc").to(10.)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(2.)
+ .set("keydesc").to(9.)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(2.)
+ .set("keydesc").to(0.)
+ .build());
+ List<Key> keys = Arrays.asList(
+ Key.of(1., 0.),
+ Key.of(2., null),
+ Key.of(2., 10.),
+ Key.of(2., 9.),
+ Key.of(2., 0.)
+ );
+
+ verifyEncodedOrdering(schema, mutations);
+ verifyEncodedOrdering(schema, "test", keys);
+ }
+
+ @Test
+ public void stringKeys() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+
+ builder.addColumn("test", "key", "STRING");
+ builder.addKeyPart("test", "key", false);
+
+ builder.addColumn("test", "keydesc", "STRING");
+ builder.addKeyPart("test", "keydesc", true);
+
+ SpannerSchema schema = builder.build();
+
+ List<Mutation> mutations = Arrays.asList(
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to("a")
+ .set("keydesc").to("bc")
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to("b")
+ .set("keydesc").to((String) null)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to("b")
+ .set("keydesc").to("z")
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to("b")
+ .set("keydesc").to("y")
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to("b")
+ .set("keydesc").to("a")
+ .build());
+
+ List<Key> keys = Arrays.asList(
+ Key.of("a", "bc"),
+ Key.of("b", null),
+ Key.of("b", "z"),
+ Key.of("b", "y"),
+ Key.of("b", "a")
+ );
+
+ verifyEncodedOrdering(schema, mutations);
+ verifyEncodedOrdering(schema, "test", keys);
+ }
+
+ @Test
+ public void bytesKeys() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+
+ builder.addColumn("test", "key", "BYTES");
+ builder.addKeyPart("test", "key", false);
+
+ builder.addColumn("test", "keydesc", "BYTES");
+ builder.addKeyPart("test", "keydesc", true);
+
+ SpannerSchema schema = builder.build();
+
+ List<Mutation> mutations = Arrays.asList(
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(ByteArray.fromBase64("abc"))
+ .set("keydesc").to(ByteArray.fromBase64("zzz"))
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(ByteArray.fromBase64("xxx"))
+ .set("keydesc").to((ByteArray) null)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(ByteArray.fromBase64("xxx"))
+ .set("keydesc").to(ByteArray.fromBase64("zzzz"))
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(ByteArray.fromBase64("xxx"))
+ .set("keydesc").to(ByteArray.fromBase64("ssss"))
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(ByteArray.fromBase64("xxx"))
+ .set("keydesc").to(ByteArray.fromBase64("aaa"))
+ .build());
+
+ List<Key> keys = Arrays.asList(
+ Key.of(ByteArray.fromBase64("abc"), ByteArray.fromBase64("zzz")),
+ Key.of(ByteArray.fromBase64("xxx"), null),
+ Key.of(ByteArray.fromBase64("xxx"), ByteArray.fromBase64("zzz")),
+ Key.of(ByteArray.fromBase64("xxx"), ByteArray.fromBase64("sss")),
+ Key.of(ByteArray.fromBase64("xxx"), ByteArray.fromBase64("aaa"))
+ );
+
+ verifyEncodedOrdering(schema, mutations);
+ verifyEncodedOrdering(schema, "test", keys);
+ }
+
+ @Test
+ public void dateKeys() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+
+ builder.addColumn("test", "key", "DATE");
+ builder.addKeyPart("test", "key", false);
+
+ builder.addColumn("test", "keydesc", "DATE");
+ builder.addKeyPart("test", "keydesc", true);
+
+ SpannerSchema schema = builder.build();
+
+ List<Mutation> mutations = Arrays.asList(
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(Date.fromYearMonthDay(2012, 10, 10))
+ .set("keydesc").to(Date.fromYearMonthDay(2000, 10, 10))
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(Date.fromYearMonthDay(2020, 10, 10))
+ .set("keydesc").to((Date) null)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(Date.fromYearMonthDay(2020, 10, 10))
+ .set("keydesc").to(Date.fromYearMonthDay(2050, 10, 10))
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(Date.fromYearMonthDay(2020, 10, 10))
+ .set("keydesc").to(Date.fromYearMonthDay(2000, 10, 10))
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(Date.fromYearMonthDay(2020, 10, 10))
+ .set("keydesc").to(Date.fromYearMonthDay(1900, 10, 10))
+ .build());
+
+ List<Key> keys = Arrays.asList(
+ Key.of(Date.fromYearMonthDay(2012, 10, 10), ByteArray.fromBase64("zzz")),
+ Key.of(Date.fromYearMonthDay(2015, 10, 10), null),
+ Key.of(Date.fromYearMonthDay(2015, 10, 10), Date.fromYearMonthDay(2050, 10, 10)),
+ Key.of(Date.fromYearMonthDay(2015, 10, 10), Date.fromYearMonthDay(2000, 10, 10)),
+ Key.of(Date.fromYearMonthDay(2015, 10, 10), Date.fromYearMonthDay(1900, 10, 10))
+ );
+
+ verifyEncodedOrdering(schema, mutations);
+ verifyEncodedOrdering(schema, "test", keys);
+ }
+
+ @Test
+ public void timestampKeys() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+
+ builder.addColumn("test", "key", "TIMESTAMP");
+ builder.addKeyPart("test", "key", false);
+
+ builder.addColumn("test", "keydesc", "TIMESTAMP");
+ builder.addKeyPart("test", "keydesc", true);
+
+ SpannerSchema schema = builder.build();
+
+ List<Mutation> mutations = Arrays.asList(
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(Timestamp.ofTimeMicroseconds(10000))
+ .set("keydesc").to(Timestamp.ofTimeMicroseconds(50000))
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(Timestamp.ofTimeMicroseconds(20000))
+ .set("keydesc").to((Timestamp) null)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(Timestamp.ofTimeMicroseconds(20000))
+ .set("keydesc").to(Timestamp.ofTimeMicroseconds(90000))
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(Timestamp.ofTimeMicroseconds(20000))
+ .set("keydesc").to(Timestamp.ofTimeMicroseconds(50000))
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("key").to(Timestamp.ofTimeMicroseconds(20000))
+ .set("keydesc").to(Timestamp.ofTimeMicroseconds(10000))
+ .build());
+
+
+ List<Key> keys = Arrays.asList(
+ Key.of(Timestamp.ofTimeMicroseconds(10000), ByteArray.fromBase64("zzz")),
+ Key.of(Timestamp.ofTimeMicroseconds(20000), null),
+ Key.of(Timestamp.ofTimeMicroseconds(20000), Timestamp.ofTimeMicroseconds(90000)),
+ Key.of(Timestamp.ofTimeMicroseconds(20000), Timestamp.ofTimeMicroseconds(50000)),
+ Key.of(Timestamp.ofTimeMicroseconds(20000), Timestamp.ofTimeMicroseconds(10000))
+ );
+
+ verifyEncodedOrdering(schema, mutations);
+ verifyEncodedOrdering(schema, "test", keys);
+ }
+
+ @Test
+ public void boolKeys() throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+
+ builder.addColumn("test", "boolkey", "BOOL");
+ builder.addKeyPart("test", "boolkey", false);
+
+ builder.addColumn("test", "boolkeydesc", "BOOL");
+ builder.addKeyPart("test", "boolkeydesc", true);
+
+ SpannerSchema schema = builder.build();
+
+ List<Mutation> mutations = Arrays.asList(
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("boolkey").to(true)
+ .set("boolkeydesc").to(false)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("boolkey").to(false)
+ .set("boolkeydesc").to(false)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("boolkey").to(false)
+ .set("boolkeydesc").to(true)
+ .build(),
+ Mutation.newInsertOrUpdateBuilder("test")
+ .set("boolkey").to((Boolean) null)
+ .set("boolkeydesc").to(false)
+ .build()
+ );
+
+ List<Key> keys = Arrays.asList(
+ Key.of(true, ByteArray.fromBase64("zzz")),
+ Key.of(false, null),
+ Key.of(false, false),
+ Key.of(false, true),
+ Key.of(null, false)
+ );
+
+ verifyEncodedOrdering(schema, mutations);
+ verifyEncodedOrdering(schema, "test", keys);
+ }
+
+ private void verifyEncodedOrdering(SpannerSchema schema, List<Mutation> mutations) {
+ MutationGroupEncoder encoder = new MutationGroupEncoder(schema);
+ List<byte[]> mutationEncodings = new ArrayList<>(mutations.size());
+ for (Mutation m : mutations) {
+ mutationEncodings.add(encoder.encodeKey(m));
+ }
+ List<byte[]> copy = new ArrayList<>(mutationEncodings);
+ Collections.sort(copy, UnsignedBytes.lexicographicalComparator());
+
+ Assert.assertEquals(mutationEncodings, copy);
+ }
+
+ private void verifyEncodedOrdering(SpannerSchema schema, String table, List<Key> keys) {
+ MutationGroupEncoder encoder = new MutationGroupEncoder(schema);
+ List<byte[]> keyEncodings = new ArrayList<>(keys.size());
+ for (Key k : keys) {
+ keyEncodings.add(encoder.encodeKey(table, k));
+ }
+ List<byte[]> copy = new ArrayList<>(keyEncodings);
+ Collections.sort(copy, UnsignedBytes.lexicographicalComparator());
+
+ Assert.assertEquals(keyEncodings, copy);
+ }
+
+ private MutationGroup g(Mutation mutation, Mutation... other) {
+ return MutationGroup.create(mutation, other);
+ }
+
+ private void encodeAndVerify(MutationGroup expected) {
+ SpannerSchema schema = this.allTypesSchema;
+ encodeAndVerify(expected, schema);
+ }
+
+ private static void encodeAndVerify(MutationGroup expected, SpannerSchema schema) {
+ MutationGroupEncoder coder = new MutationGroupEncoder(schema);
+ byte[] encode = coder.encode(expected);
+ MutationGroup actual = coder.decode(encode);
+
+ Assert.assertTrue(mutationGroupsEqual(expected, actual));
+ }
+
+ private static boolean mutationGroupsEqual(MutationGroup a, MutationGroup b) {
+ ImmutableList<Mutation> alist = ImmutableList.copyOf(a);
+ ImmutableList<Mutation> blist = ImmutableList.copyOf(b);
+
+ if (alist.size() != blist.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < alist.size(); i++) {
+ if (!mutationsEqual(alist.get(i), blist.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Is different from Mutation#equals. Case insensitive for table/column names, the order of
+ // the columns doesn't matter.
+ private static boolean mutationsEqual(Mutation a, Mutation b) {
+ if (a == b) {
+ return true;
+ }
+ if (a == null || b == null) {
+ return false;
+ }
+ if (a.getOperation() != b.getOperation()) {
+ return false;
+ }
+ if (!a.getTable().equalsIgnoreCase(b.getTable())) {
+ return false;
+ }
+ if (a.getOperation() == Mutation.Op.DELETE) {
+ return a.getKeySet().equals(b.getKeySet());
+ }
+
+ // Compare pairs instead? This seems to be good enough...
+ return ImmutableSet.copyOf(getNormalizedColumns(a))
+ .equals(ImmutableSet.copyOf(getNormalizedColumns(b))) && ImmutableSet.copyOf(a.getValues())
+ .equals(ImmutableSet.copyOf(b.getValues()));
+ }
+
+ // Pray for Java 8 support.
+ private static Iterable<String> getNormalizedColumns(Mutation a) {
+ return Iterables.transform(a.getColumns(), new Function<String, String>() {
+
+ @Override
+ public String apply(String input) {
+ return input.toLowerCase();
+ }
+ });
+ }
+}
[2/2] beam git commit: This closes #4014: [BEAM-1542] SpannerIO:
mutation encoding and size estimation improvements
Posted by jk...@apache.org.
This closes #4014: [BEAM-1542] SpannerIO: mutation encoding and size estimation improvements
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/09f68159
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/09f68159
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/09f68159
Branch: refs/heads/master
Commit: 09f68159df28b2c83a6be1643984c0e28507989b
Parents: aa26f4b 3ba9600
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Oct 30 17:02:42 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Oct 30 17:02:42 2017 -0700
----------------------------------------------------------------------
.../io/gcp/spanner/MutationGroupEncoder.java | 660 +++++++++++++++++++
.../io/gcp/spanner/MutationSizeEstimator.java | 48 ++
.../gcp/spanner/MutationGroupEncoderTest.java | 636 ++++++++++++++++++
3 files changed, 1344 insertions(+)
----------------------------------------------------------------------