You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/10/31 00:06:08 UTC

[1/2] beam git commit: [BEAM-1542] SpannerIO: mutation encoding and size estimation improvements

Repository: beam
Updated Branches:
  refs/heads/master aa26f4bf7 -> 09f68159d


[BEAM-1542] SpannerIO: mutation encoding and size estimation improvements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3ba96003
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3ba96003
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3ba96003

Branch: refs/heads/master
Commit: 3ba96003d31ce98a54c0c51c1c0a9cf7c06e2fa2
Parents: aa26f4b
Author: Mairbek Khadikov <ma...@google.com>
Authored: Wed Oct 18 15:26:11 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Oct 30 17:02:36 2017 -0700

----------------------------------------------------------------------
 .../io/gcp/spanner/MutationGroupEncoder.java    | 660 +++++++++++++++++++
 .../io/gcp/spanner/MutationSizeEstimator.java   |  48 ++
 .../gcp/spanner/MutationGroupEncoderTest.java   | 636 ++++++++++++++++++
 3 files changed, 1344 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3ba96003/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java
new file mode 100644
index 0000000..ba0b4eb
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Date;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Value;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.util.VarInt;
+import org.joda.time.DateTime;
+import org.joda.time.Days;
+import org.joda.time.MutableDateTime;
+
+/**
+ * Given the Spanner Schema, efficiently encodes the mutation group.
+ */
+class MutationGroupEncoder {
+  private static final DateTime MIN_DATE = new DateTime(1, 1, 1, 0, 0);
+
+  private final SpannerSchema schema;
+  private final List<String> tables;
+  private final Map<String, Integer> tablesIndexes = new HashMap<>();
+
+  public MutationGroupEncoder(SpannerSchema schema) {
+    this.schema = schema;
+    tables = schema.getTables();
+
+    for (int i = 0; i < tables.size(); i++) {
+      tablesIndexes.put(tables.get(i), i);
+    }
+  }
+
+  public byte[] encode(MutationGroup g) {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+    try {
+      VarInt.encode(g.attached().size(), bos);
+      for (Mutation m : g) {
+        encodeMutation(bos, m);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return bos.toByteArray();
+  }
+
+  private static void setBit(byte[] bytes, int i) {
+    int word = i / 8;
+    int bit = 7 - i % 8;
+    bytes[word] |= 1 << bit;
+  }
+
+  private static boolean getBit(byte[] bytes, int i) {
+    int word = i / 8;
+    int bit = 7 - i % 8;
+    return (bytes[word] & 1 << (bit)) != 0;
+  }
+
+  private void encodeMutation(ByteArrayOutputStream bos, Mutation m) throws IOException {
+    Mutation.Op op = m.getOperation();
+    bos.write(op.ordinal());
+    if (op == Mutation.Op.DELETE) {
+      encodeDelete(bos, m);
+    } else {
+      encodeModification(bos, m);
+    }
+  }
+
+  private void encodeDelete(ByteArrayOutputStream bos, Mutation m) throws IOException {
+    String table = m.getTable().toLowerCase();
+    int tableIndex = getTableIndex(table);
+    VarInt.encode(tableIndex, bos);
+    ObjectOutput out = new ObjectOutputStream(bos);
+    out.writeObject(m.getKeySet());
+  }
+
+  private Integer getTableIndex(String table) {
+    Integer result = tablesIndexes.get(table);
+    checkArgument(result != null, "Unknown table '%s'", table);
+    return result;
+  }
+
+  private Mutation decodeDelete(ByteArrayInputStream bis)
+      throws IOException {
+    int tableIndex = VarInt.decodeInt(bis);
+    String tableName = tables.get(tableIndex);
+
+    ObjectInputStream in = new ObjectInputStream(bis);
+    KeySet keySet;
+    try {
+      keySet = (KeySet) in.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+    return Mutation.delete(tableName, keySet);
+  }
+
+  // Encodes a mutation that is not a delete one, using the following format
+  // [bitset of modified columns][value of column1][value of column2][value of column3]...
+  private void encodeModification(ByteArrayOutputStream bos, Mutation m) throws IOException {
+    String tableName = m.getTable().toLowerCase();
+    int tableIndex = getTableIndex(tableName);
+    VarInt.encode(tableIndex, bos);
+    List<SpannerSchema.Column> columns = schema.getColumns(tableName);
+    checkArgument(columns != null, "Schema for table " + tableName + " not " + "found");
+    Map<String, Value> map = mutationAsMap(m);
+    // java.util.BitSet#toByteArray returns array of unpredictable length. Using byte arrays
+    // instead.
+    int bitsetSize = (columns.size() + 7) / 8;
+    byte[] exists = new byte[bitsetSize];
+    byte[] nulls = new byte[bitsetSize];
+    for (int i = 0; i < columns.size(); i++) {
+      String columnName = columns.get(i).getName();
+      boolean columnExists = map.containsKey(columnName);
+      boolean columnNull = columnExists && map.get(columnName).isNull();
+      if (columnExists) {
+        setBit(exists, i);
+      }
+      if (columnNull) {
+        setBit(nulls, i);
+        map.remove(columnName);
+      }
+    }
+    bos.write(exists);
+    bos.write(nulls);
+    for (int i = 0; i < columns.size(); i++) {
+      if (!getBit(exists, i) || getBit(nulls, i)) {
+        continue;
+      }
+      SpannerSchema.Column column = columns.get(i);
+      Value value = map.remove(column.getName());
+      encodeValue(bos, value);
+    }
+    checkArgument(map.isEmpty(), "Columns %s were not defined in table %s", map.keySet(),
+        m.getTable());
+  }
+
+  private void encodeValue(ByteArrayOutputStream bos, Value value) throws IOException {
+    switch (value.getType().getCode()) {
+      case ARRAY:
+        encodeArray(bos, value);
+        break;
+      default:
+        encodePrimitive(bos, value);
+    }
+  }
+
+  private void encodeArray(ByteArrayOutputStream bos, Value value) throws IOException {
+    // TODO: avoid using Java serialization here.
+    ObjectOutputStream out = new ObjectOutputStream(bos);
+    switch (value.getType().getArrayElementType().getCode()) {
+      case BOOL: {
+        out.writeObject(new ArrayList<>(value.getBoolArray()));
+        break;
+      }
+      case INT64: {
+        out.writeObject(new ArrayList<>(value.getInt64Array()));
+        break;
+      }
+      case FLOAT64: {
+        out.writeObject(new ArrayList<>(value.getFloat64Array()));
+        break;
+      }
+      case STRING: {
+        out.writeObject(new ArrayList<>(value.getStringArray()));
+        break;
+      }
+      case BYTES: {
+        out.writeObject(new ArrayList<>(value.getBytesArray()));
+        break;
+      }
+      case TIMESTAMP: {
+        out.writeObject(new ArrayList<>(value.getTimestampArray()));
+        break;
+      }
+      case DATE: {
+        out.writeObject(new ArrayList<>(value.getDateArray()));
+        break;
+      }
+      default:
+        throw new IllegalArgumentException("Unknown type " + value.getType());
+    }
+  }
+
+  private void encodePrimitive(ByteArrayOutputStream bos, Value value) throws IOException {
+    switch (value.getType().getCode()) {
+      case BOOL:
+        bos.write(value.getBool() ? 1 : 0);
+        break;
+      case INT64:
+        VarInt.encode(value.getInt64(), bos);
+        break;
+      case FLOAT64:
+        new DataOutputStream(bos).writeDouble(value.getFloat64());
+        break;
+      case STRING: {
+        String str = value.getString();
+        VarInt.encode(str.length(), bos);
+        bos.write(str.getBytes(StandardCharsets.UTF_8));
+        break;
+      }
+      case BYTES: {
+        ByteArray bytes = value.getBytes();
+        VarInt.encode(bytes.length(), bos);
+        bos.write(bytes.toByteArray());
+        break;
+      }
+      case TIMESTAMP: {
+        Timestamp timestamp = value.getTimestamp();
+        VarInt.encode(timestamp.getSeconds(), bos);
+        VarInt.encode(timestamp.getNanos(), bos);
+        break;
+      }
+      case DATE: {
+        Date date = value.getDate();
+        VarInt.encode(encodeDate(date), bos);
+        break;
+      }
+      default:
+        throw new IllegalArgumentException("Unknown type " + value.getType());
+    }
+  }
+
+  public MutationGroup decode(byte[] bytes) {
+    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+
+    try {
+      int numMutations = VarInt.decodeInt(bis);
+      Mutation primary = decodeMutation(bis);
+      List<Mutation> attached = new ArrayList<>(numMutations);
+      for (int i = 0; i < numMutations; i++) {
+        attached.add(decodeMutation(bis));
+      }
+      return MutationGroup.create(primary, attached);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Mutation decodeMutation(ByteArrayInputStream bis) throws IOException {
+    Mutation.Op op = Mutation.Op.values()[bis.read()];
+    if (op == Mutation.Op.DELETE) {
+      return decodeDelete(bis);
+    }
+    return decodeModification(bis, op);
+  }
+
+  private Mutation decodeModification(ByteArrayInputStream bis, Mutation.Op op) throws IOException {
+    int tableIndex = VarInt.decodeInt(bis);
+    String tableName = tables.get(tableIndex);
+
+    Mutation.WriteBuilder m;
+    switch (op) {
+      case INSERT:
+        m = Mutation.newInsertBuilder(tableName);
+        break;
+      case INSERT_OR_UPDATE:
+        m = Mutation.newInsertOrUpdateBuilder(tableName);
+        break;
+      case REPLACE:
+        m = Mutation.newReplaceBuilder(tableName);
+        break;
+      case UPDATE:
+        m = Mutation.newUpdateBuilder(tableName);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown operation " + op);
+    }
+    List<SpannerSchema.Column> columns = schema.getColumns(tableName);
+    int bitsetSize = (columns.size() + 7) / 8;
+    byte[] exists = readBytes(bis, bitsetSize);
+    byte[] nulls = readBytes(bis, bitsetSize);
+
+    for (int i = 0; i < columns.size(); i++) {
+      if (!getBit(exists, i)) {
+        continue;
+      }
+      SpannerSchema.Column column = columns.get(i);
+      boolean isNull = getBit(nulls, i);
+      Type type = column.getType();
+      String fieldName = column.getName();
+      switch (type.getCode()) {
+        case ARRAY:
+          try {
+            decodeArray(bis, fieldName, type, isNull, m);
+          } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        default:
+          decodePrimitive(bis, fieldName, type, isNull, m);
+      }
+
+    }
+    return m.build();
+  }
+
+  private void decodeArray(ByteArrayInputStream bis, String fieldName, Type type, boolean isNull,
+      Mutation.WriteBuilder m) throws IOException, ClassNotFoundException {
+    // TODO: avoid using Java serialization here.
+    switch (type.getArrayElementType().getCode()) {
+      case BOOL: {
+        if (isNull) {
+          m.set(fieldName).toBoolArray((Iterable<Boolean>) null);
+        } else {
+          ObjectInputStream out = new ObjectInputStream(bis);
+          m.set(fieldName).toBoolArray((List<Boolean>) out.readObject());
+        }
+        break;
+      }
+      case INT64: {
+        if (isNull) {
+          m.set(fieldName).toInt64Array((Iterable<Long>) null);
+        } else {
+          ObjectInputStream out = new ObjectInputStream(bis);
+          m.set(fieldName).toInt64Array((List<Long>) out.readObject());
+        }
+        break;
+      }
+      case FLOAT64: {
+        if (isNull) {
+          m.set(fieldName).toFloat64Array((Iterable<Double>) null);
+        } else {
+          ObjectInputStream out = new ObjectInputStream(bis);
+          m.set(fieldName).toFloat64Array((List<Double>) out.readObject());
+        }
+        break;
+      }
+      case STRING: {
+        if (isNull) {
+          m.set(fieldName).toStringArray(null);
+        } else {
+          ObjectInputStream out = new ObjectInputStream(bis);
+          m.set(fieldName).toStringArray((List<String>) out.readObject());
+        }
+        break;
+      }
+      case BYTES: {
+        if (isNull) {
+          m.set(fieldName).toBytesArray(null);
+        } else {
+          ObjectInputStream out = new ObjectInputStream(bis);
+          m.set(fieldName).toBytesArray((List<ByteArray>) out.readObject());
+        }
+        break;
+      }
+      case TIMESTAMP: {
+        if (isNull) {
+          m.set(fieldName).toTimestampArray(null);
+        } else {
+          ObjectInputStream out = new ObjectInputStream(bis);
+          m.set(fieldName).toTimestampArray((List<Timestamp>) out.readObject());
+        }
+        break;
+      }
+      case DATE: {
+        if (isNull) {
+          m.set(fieldName).toDateArray(null);
+        } else {
+          ObjectInputStream out = new ObjectInputStream(bis);
+          m.set(fieldName).toDateArray((List<Date>) out.readObject());
+        }
+        break;
+      }
+      default:
+        throw new IllegalArgumentException("Unknown type " + type);
+    }
+  }
+
+  private void decodePrimitive(ByteArrayInputStream bis, String fieldName, Type type,
+      boolean isNull, Mutation.WriteBuilder m) throws IOException {
+    switch (type.getCode()) {
+      case BOOL:
+        if (isNull) {
+          m.set(fieldName).to((Boolean) null);
+        } else {
+          m.set(fieldName).to(bis.read() != 0);
+        }
+        break;
+      case INT64:
+        if (isNull) {
+          m.set(fieldName).to((Long) null);
+        } else {
+          m.set(fieldName).to(VarInt.decodeLong(bis));
+        }
+        break;
+      case FLOAT64:
+        if (isNull) {
+          m.set(fieldName).to((Double) null);
+        } else {
+          m.set(fieldName).to(new DataInputStream(bis).readDouble());
+        }
+        break;
+      case STRING: {
+        if (isNull) {
+          m.set(fieldName).to((String) null);
+        } else {
+          int len = VarInt.decodeInt(bis);
+          byte[] bytes = readBytes(bis, len);
+          m.set(fieldName).to(new String(bytes, StandardCharsets.UTF_8));
+        }
+        break;
+      }
+      case BYTES: {
+        if (isNull) {
+          m.set(fieldName).to((ByteArray) null);
+        } else {
+          int len = VarInt.decodeInt(bis);
+          byte[] bytes = readBytes(bis, len);
+          m.set(fieldName).to(ByteArray.copyFrom(bytes));
+        }
+        break;
+      }
+      case TIMESTAMP: {
+        if (isNull) {
+          m.set(fieldName).to((Timestamp) null);
+        } else {
+          int seconds = VarInt.decodeInt(bis);
+          int nanoseconds = VarInt.decodeInt(bis);
+          m.set(fieldName).to(Timestamp.ofTimeSecondsAndNanos(seconds, nanoseconds));
+        }
+        break;
+      }
+      case DATE: {
+        if (isNull) {
+          m.set(fieldName).to((Date) null);
+        } else {
+          int days = VarInt.decodeInt(bis);
+          m.set(fieldName).to(decodeDate(days));
+        }
+        break;
+      }
+      default:
+        throw new IllegalArgumentException("Unknown type " + type);
+    }
+  }
+
+  private byte[] readBytes(ByteArrayInputStream bis, int len) throws IOException {
+    byte[] tmp = new byte[len];
+    new DataInputStream(bis).readFully(tmp);
+    return tmp;
+  }
+
+  /**
+   * Builds a lexicographically sortable binary key based on a primary key descriptor.
+   * @param m a spanner mutation.
+   * @return a binary string that preserves the ordering of the primary key.
+   */
+  public byte[] encodeKey(Mutation m) {
+    Map<String, Value> mutationMap = mutationAsMap(m);
+    OrderedCode orderedCode = new OrderedCode();
+    for (SpannerSchema.KeyPart part : schema.getKeyParts(m.getTable())) {
+      Value val = mutationMap.get(part.getField());
+      if (val.isNull()) {
+        if (part.isDesc()) {
+          orderedCode.writeInfinityDecreasing();
+        } else {
+          orderedCode.writeInfinity();
+        }
+      } else {
+        Type.Code code = val.getType().getCode();
+        switch (code) {
+          case BOOL:
+            long v = val.getBool() ? 0 : 1;
+            if (part.isDesc()) {
+              orderedCode.writeSignedNumDecreasing(v);
+            } else {
+              orderedCode.writeSignedNumIncreasing(v);
+            }
+            break;
+          case INT64:
+            if (part.isDesc()) {
+              orderedCode.writeSignedNumDecreasing(val.getInt64());
+            } else {
+              orderedCode.writeSignedNumIncreasing(val.getInt64());
+            }
+            break;
+          case FLOAT64:
+            if (part.isDesc()) {
+              orderedCode.writeSignedNumDecreasing(Double.doubleToLongBits(val.getFloat64()));
+            } else {
+              orderedCode.writeSignedNumIncreasing(Double.doubleToLongBits(val.getFloat64()));
+            }
+            break;
+          case STRING:
+            if (part.isDesc()) {
+              orderedCode.writeBytesDecreasing(val.getString().getBytes());
+            } else {
+              orderedCode.writeBytes(val.getString().getBytes());
+            }
+            break;
+          case BYTES:
+            if (part.isDesc()) {
+              orderedCode.writeBytesDecreasing(val.getBytes().toByteArray());
+            } else {
+              orderedCode.writeBytes(val.getBytes().toByteArray());
+            }
+            break;
+          case TIMESTAMP: {
+            Timestamp value = val.getTimestamp();
+            if (part.isDesc()) {
+              orderedCode.writeNumDecreasing(value.getSeconds());
+              orderedCode.writeNumDecreasing(value.getNanos());
+            } else {
+              orderedCode.writeNumIncreasing(value.getSeconds());
+              orderedCode.writeNumIncreasing(value.getNanos());
+            }
+            break;
+          }
+          case DATE:
+            Date value = val.getDate();
+            if (part.isDesc()) {
+              orderedCode.writeSignedNumDecreasing(encodeDate(value));
+            } else {
+              orderedCode.writeSignedNumIncreasing(encodeDate(value));
+            }
+            break;
+          default:
+            throw new IllegalArgumentException("Unknown type " + val.getType());
+        }
+      }
+    }
+    return orderedCode.getEncodedBytes();
+  }
+
+  public byte[] encodeKey(String table, Key key) {
+    OrderedCode orderedCode = new OrderedCode();
+    List<SpannerSchema.KeyPart> parts = schema.getKeyParts(table);
+    Iterator<Object> it = key.getParts().iterator();
+    for (SpannerSchema.KeyPart part : parts) {
+      Object value = it.next();
+      if (value == null) {
+        if (part.isDesc()) {
+          orderedCode.writeInfinityDecreasing();
+        } else {
+          orderedCode.writeInfinity();
+        }
+      } else {
+        if (value instanceof Boolean) {
+          long v = (Boolean) value ? 0 : 1;
+          if (part.isDesc()) {
+            orderedCode.writeSignedNumDecreasing(v);
+          } else {
+            orderedCode.writeSignedNumIncreasing(v);
+          }
+        } else if (value instanceof Long) {
+          long v = (long) value;
+          if (part.isDesc()) {
+            orderedCode.writeSignedNumDecreasing(v);
+          } else {
+            orderedCode.writeSignedNumIncreasing(v);
+          }
+        } else if (value instanceof Double) {
+          long v = Double.doubleToLongBits((double) value);
+          if (part.isDesc()) {
+            orderedCode.writeSignedNumDecreasing(v);
+          } else {
+            orderedCode.writeSignedNumIncreasing(v);
+          }
+        } else if (value instanceof String) {
+          String v = (String) value;
+          if (part.isDesc()) {
+            orderedCode.writeBytesDecreasing(v.getBytes());
+          } else {
+            orderedCode.writeBytes(v.getBytes());
+          }
+        } else if (value instanceof ByteArray) {
+          ByteArray v = (ByteArray) value;
+          if (part.isDesc()) {
+            orderedCode.writeBytesDecreasing(v.toByteArray());
+          } else {
+            orderedCode.writeBytes(v.toByteArray());
+          }
+        } else if (value instanceof Timestamp) {
+          Timestamp v = (Timestamp) value;
+          if (part.isDesc()) {
+            orderedCode.writeNumDecreasing(v.getSeconds());
+            orderedCode.writeNumDecreasing(v.getNanos());
+          } else {
+            orderedCode.writeNumIncreasing(v.getSeconds());
+            orderedCode.writeNumIncreasing(v.getNanos());
+          }
+        } else if (value instanceof Date) {
+          Date v = (Date) value;
+          if (part.isDesc()) {
+            orderedCode.writeSignedNumDecreasing(encodeDate(v));
+          } else {
+            orderedCode.writeSignedNumIncreasing(encodeDate(v));
+          }
+        } else {
+          throw new IllegalArgumentException("Unknown key part " + value);
+        }
+      }
+    }
+    return orderedCode.getEncodedBytes();
+  }
+
+  private static Map<String, Value> mutationAsMap(Mutation m) {
+    Map<String, Value> result = new HashMap<>();
+    Iterator<String> coli = m.getColumns().iterator();
+    Iterator<Value> vali = m.getValues().iterator();
+    while (coli.hasNext()) {
+      String column = coli.next();
+      Value val = vali.next();
+      result.put(column.toLowerCase(), val);
+    }
+    return result;
+  }
+
+  private static int encodeDate(Date date) {
+
+    MutableDateTime jodaDate = new MutableDateTime();
+    jodaDate.setDate(date.getYear(), date.getMonth(), date.getDayOfMonth());
+
+    return Days.daysBetween(MIN_DATE, jodaDate).getDays();
+  }
+
+  private static Date decodeDate(int daysSinceEpoch) {
+
+    DateTime jodaDate = MIN_DATE.plusDays(daysSinceEpoch);
+
+    return Date
+        .fromYearMonthDay(jodaDate.getYear(), jodaDate.getMonthOfYear(), jodaDate.getDayOfMonth());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3ba96003/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
index 2418816..c483af9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
@@ -18,6 +18,11 @@
 package org.apache.beam.sdk.io.gcp.spanner;
 
 import com.google.cloud.ByteArray;
+import com.google.cloud.Date;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeyRange;
+import com.google.cloud.spanner.KeySet;
 import com.google.cloud.spanner.Mutation;
 import com.google.cloud.spanner.Value;
 
@@ -29,6 +34,9 @@ class MutationSizeEstimator {
 
   /** Estimates a size of mutation in bytes. */
   static long sizeOf(Mutation m) {
+    if (m.getOperation() == Mutation.Op.DELETE) {
+      return sizeOf(m.getKeySet());
+    }
     long result = 0;
     for (Value v : m.getValues()) {
       switch (v.getType().getCode()) {
@@ -44,6 +52,46 @@ class MutationSizeEstimator {
     return result;
   }
 
+  private static long sizeOf(KeySet keySet) {
+    long result = 0;
+    for (Key k : keySet.getKeys()) {
+      result += sizeOf(k);
+    }
+    for (KeyRange kr : keySet.getRanges()) {
+      result += sizeOf(kr);
+    }
+    return result;
+  }
+
+  private static long sizeOf(KeyRange kr) {
+    return sizeOf(kr.getStart()) + sizeOf(kr.getEnd());
+  }
+
+  private static long sizeOf(Key k) {
+    long result = 0;
+    for (Object part : k.getParts()) {
+      if (part == null) {
+        continue;
+      }
+      if (part instanceof Boolean) {
+        result += 1;
+      } else if (part instanceof Long) {
+        result += 8;
+      } else if (part instanceof Double) {
+        result += 8;
+      } else if (part instanceof String) {
+        result += ((String) part).length();
+      } else if (part instanceof ByteArray) {
+        result += ((ByteArray) part).length();
+      } else if (part instanceof Timestamp) {
+        result += 12;
+      } else if (part instanceof Date) {
+        result += 12;
+      }
+    }
+    return result;
+  }
+
   /** Estimates a size of the mutation group in bytes. */
   public static long sizeOf(MutationGroup group) {
     long result = 0;

http://git-wip-us.apache.org/repos/asf/beam/blob/3ba96003/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java
new file mode 100644
index 0000000..d40e356
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Date;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeyRange;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.primitives.UnsignedBytes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Tests for {@link MutationGroupEncoder}.
+ */
+public class MutationGroupEncoderTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private SpannerSchema allTypesSchema;
+
+  @Before
+  public void setUp() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+
+    builder.addColumn("test", "intkey", "INT64");
+    builder.addKeyPart("test", "intkey", false);
+
+    builder.addColumn("test", "bool", "BOOL");
+    builder.addColumn("test", "int64", "INT64");
+    builder.addColumn("test", "float64", "FLOAT64");
+    builder.addColumn("test", "string", "STRING");
+    builder.addColumn("test", "bytes", "BYTES");
+    builder.addColumn("test", "timestamp", "TIMESTAMP");
+    builder.addColumn("test", "date", "DATE");
+
+    builder.addColumn("test", "nullbool", "BOOL");
+    builder.addColumn("test", "nullint64", "INT64");
+    builder.addColumn("test", "nullfloat64", "FLOAT64");
+    builder.addColumn("test", "nullstring", "STRING");
+    builder.addColumn("test", "nullbytes", "BYTES");
+    builder.addColumn("test", "nulltimestamp", "TIMESTAMP");
+    builder.addColumn("test", "nulldate", "DATE");
+
+    builder.addColumn("test", "arrbool", "ARRAY<BOOL>");
+    builder.addColumn("test", "arrint64", "ARRAY<INT64>");
+    builder.addColumn("test", "arrfloat64", "ARRAY<FLOAT64>");
+    builder.addColumn("test", "arrstring", "ARRAY<STRING>");
+    builder.addColumn("test", "arrbytes", "ARRAY<BYTES>");
+    builder.addColumn("test", "arrtimestamp", "ARRAY<TIMESTAMP>");
+    builder.addColumn("test", "arrdate", "ARRAY<DATE>");
+
+    builder.addColumn("test", "nullarrbool", "ARRAY<BOOL>");
+    builder.addColumn("test", "nullarrint64", "ARRAY<INT64>");
+    builder.addColumn("test", "nullarrfloat64", "ARRAY<FLOAT64>");
+    builder.addColumn("test", "nullarrstring", "ARRAY<STRING>");
+    builder.addColumn("test", "nullarrbytes", "ARRAY<BYTES>");
+    builder.addColumn("test", "nullarrtimestamp", "ARRAY<TIMESTAMP>");
+    builder.addColumn("test", "nullarrdate", "ARRAY<DATE>");
+
+    allTypesSchema = builder.build();
+  }
+
+  @Test
+  public void testAllTypesSingleMutation() throws Exception {
+    encodeAndVerify(g(appendAllTypes(Mutation.newInsertOrUpdateBuilder("test")).build()));
+    encodeAndVerify(g(appendAllTypes(Mutation.newInsertBuilder("test")).build()));
+    encodeAndVerify(g(appendAllTypes(Mutation.newUpdateBuilder("test")).build()));
+    encodeAndVerify(g(appendAllTypes(Mutation.newReplaceBuilder("test")).build()));
+  }
+
+  @Test
+  public void testAllTypesMultipleMutations() throws Exception {
+    encodeAndVerify(g(
+        appendAllTypes(Mutation.newInsertOrUpdateBuilder("test")).build(),
+        appendAllTypes(Mutation.newInsertBuilder("test")).build(),
+        appendAllTypes(Mutation.newUpdateBuilder("test")).build(),
+        appendAllTypes(Mutation.newReplaceBuilder("test")).build(),
+        Mutation
+            .delete("test", KeySet.range(KeyRange.closedClosed(Key.of(1L), Key.of(2L))))));
+  }
+
+  @Test
+  public void testUnknownColumn() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+    builder.addKeyPart("test", "bool_field", false);
+    builder.addColumn("test", "bool_field", "BOOL");
+    SpannerSchema schema = builder.build();
+
+    Mutation mutation = Mutation.newInsertBuilder("test").set("unknown")
+        .to(true).build();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Columns [unknown] were not defined in table test");
+    encodeAndVerify(g(mutation), schema);
+  }
+
+  @Test
+  public void testUnknownTable() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+    builder.addKeyPart("test", "bool_field", false);
+    builder.addColumn("test", "bool_field", "BOOL");
+    SpannerSchema schema = builder.build();
+
+    Mutation mutation = Mutation.newInsertBuilder("unknown").set("bool_field")
+        .to(true).build();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Unknown table 'unknown'");
+    encodeAndVerify(g(mutation), schema);
+  }
+
+  @Test
+  public void testMutationCaseInsensitive() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+    builder.addKeyPart("test", "bool_field", false);
+    builder.addColumn("test", "bool_field", "BOOL");
+    SpannerSchema schema = builder.build();
+
+    Mutation mutation = Mutation.newInsertBuilder("TEsT").set("BoOL_FiELd").to(true).build();
+    encodeAndVerify(g(mutation), schema);
+  }
+
+  @Test
+  public void testDeleteCaseInsensitive() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+    builder.addKeyPart("test", "bool_field", false);
+    builder.addColumn("test", "int_field", "INT64");
+    SpannerSchema schema = builder.build();
+
+    Mutation mutation = Mutation.delete("TeSt", Key.of(1L));
+    encodeAndVerify(g(mutation), schema);
+  }
+
+  @Test
+  public void testDeletes() throws Exception {
+    encodeAndVerify(g(Mutation.delete("test", Key.of(1L))));
+    encodeAndVerify(g(Mutation.delete("test", Key.of((Long) null))));
+
+    KeySet allTypes = KeySet.newBuilder()
+        .addKey(Key.of(1L))
+        .addKey(Key.of((Long) null))
+        .addKey(Key.of(1.2))
+        .addKey(Key.of((Double) null))
+        .addKey(Key.of("one"))
+        .addKey(Key.of((String) null))
+        .addKey(Key.of(ByteArray.fromBase64("abcd")))
+        .addKey(Key.of((ByteArray) null))
+        .addKey(Key.of(Timestamp.now()))
+        .addKey(Key.of((Timestamp) null))
+        .addKey(Key.of(Date.fromYearMonthDay(2012, 1, 1)))
+        .addKey(Key.of((Date) null))
+        .build();
+
+    encodeAndVerify(g(Mutation.delete("test", allTypes)));
+
+    encodeAndVerify(
+        g(Mutation
+            .delete("test", KeySet.range(KeyRange.closedClosed(Key.of(1L), Key.of(2L))))));
+  }
+
+  private Mutation.WriteBuilder appendAllTypes(Mutation.WriteBuilder builder) {
+    Timestamp ts = Timestamp.now();
+    Date date = Date.fromYearMonthDay(2017, 1, 1);
+    return builder
+        .set("bool").to(true)
+        .set("int64").to(1L)
+        .set("float64").to(1.0)
+        .set("string").to("my string")
+        .set("bytes").to(ByteArray.fromBase64("abcdedf"))
+        .set("timestamp").to(ts)
+        .set("date").to(date)
+
+        .set("arrbool").toBoolArray(Arrays.asList(true, false, null, true, null, false))
+        .set("arrint64").toInt64Array(Arrays.asList(10L, -12L, null, null, 100000L))
+        .set("arrfloat64").toFloat64Array(Arrays.asList(10., -12.23, null, null, 100000.33231))
+        .set("arrstring").toStringArray(Arrays.asList("one", "two", null, null, "three"))
+        .set("arrbytes").toBytesArray(Arrays.asList(ByteArray.fromBase64("abcs"), null))
+        .set("arrtimestamp").toTimestampArray(Arrays.asList(Timestamp.MIN_VALUE, null, ts))
+        .set("arrdate").toDateArray(Arrays.asList(null, date))
+
+        .set("nullbool").to((Boolean) null)
+        .set("nullint64").to((Long) null)
+        .set("nullfloat64").to((Double) null)
+        .set("nullstring").to((String) null)
+        .set("nullbytes").to((ByteArray) null)
+        .set("nulltimestamp").to((Timestamp) null)
+        .set("nulldate").to((Date) null)
+
+        .set("nullarrbool").toBoolArray((Iterable<Boolean>) null)
+        .set("nullarrint64").toInt64Array((Iterable<Long>) null)
+        .set("nullarrfloat64").toFloat64Array((Iterable<Double>) null)
+        .set("nullarrstring").toStringArray(null)
+        .set("nullarrbytes").toBytesArray(null)
+        .set("nullarrtimestamp").toTimestampArray(null)
+        .set("nullarrdate").toDateArray(null);
+  }
+
+  @Test
+  public void int64Keys() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+
+    builder.addColumn("test", "key", "INT64");
+    builder.addKeyPart("test", "key", false);
+
+    builder.addColumn("test", "keydesc", "INT64");
+    builder.addKeyPart("test", "keydesc", true);
+
+    SpannerSchema schema = builder.build();
+
+    List<Mutation> mutations = Arrays.asList(
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(1L)
+            .set("keydesc").to(0L)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(2L)
+            .set("keydesc").to((Long) null)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(2L)
+            .set("keydesc").to(10L)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(2L)
+            .set("keydesc").to(9L)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to((Long) null)
+            .set("keydesc").to(0L)
+            .build());
+
+    List<Key> keys = Arrays.asList(
+        Key.of(1L, 0L),
+        Key.of(2L, null),
+        Key.of(2L, 10L),
+        Key.of(2L, 9L),
+        Key.of(2L, 0L)
+    );
+
+    verifyEncodedOrdering(schema, mutations);
+    verifyEncodedOrdering(schema, "test", keys);
+  }
+
+  @Test
+  public void float64Keys() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+
+    builder.addColumn("test", "key", "FLOAT64");
+    builder.addKeyPart("test", "key", false);
+
+    builder.addColumn("test", "keydesc", "FLOAT64");
+    builder.addKeyPart("test", "keydesc", true);
+
+    SpannerSchema schema = builder.build();
+
+    List<Mutation> mutations = Arrays.asList(
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(1.0)
+            .set("keydesc").to(0.)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(2.)
+            .set("keydesc").to((Long) null)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(2.)
+            .set("keydesc").to(10.)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(2.)
+            .set("keydesc").to(9.)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(2.)
+            .set("keydesc").to(0.)
+            .build());
+    List<Key> keys = Arrays.asList(
+        Key.of(1., 0.),
+        Key.of(2., null),
+        Key.of(2., 10.),
+        Key.of(2., 9.),
+        Key.of(2., 0.)
+    );
+
+    verifyEncodedOrdering(schema, mutations);
+    verifyEncodedOrdering(schema, "test", keys);
+  }
+
+  @Test
+  public void stringKeys() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+
+    builder.addColumn("test", "key", "STRING");
+    builder.addKeyPart("test", "key", false);
+
+    builder.addColumn("test", "keydesc", "STRING");
+    builder.addKeyPart("test", "keydesc", true);
+
+    SpannerSchema schema = builder.build();
+
+    List<Mutation> mutations = Arrays.asList(
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to("a")
+            .set("keydesc").to("bc")
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to("b")
+            .set("keydesc").to((String) null)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to("b")
+            .set("keydesc").to("z")
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to("b")
+            .set("keydesc").to("y")
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to("b")
+            .set("keydesc").to("a")
+            .build());
+
+    List<Key> keys = Arrays.asList(
+        Key.of("a", "bc"),
+        Key.of("b", null),
+        Key.of("b", "z"),
+        Key.of("b", "y"),
+        Key.of("b", "a")
+    );
+
+    verifyEncodedOrdering(schema, mutations);
+    verifyEncodedOrdering(schema, "test", keys);
+  }
+
+  @Test
+  public void bytesKeys() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+
+    builder.addColumn("test", "key", "BYTES");
+    builder.addKeyPart("test", "key", false);
+
+    builder.addColumn("test", "keydesc", "BYTES");
+    builder.addKeyPart("test", "keydesc", true);
+
+    SpannerSchema schema = builder.build();
+
+    List<Mutation> mutations = Arrays.asList(
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(ByteArray.fromBase64("abc"))
+            .set("keydesc").to(ByteArray.fromBase64("zzz"))
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(ByteArray.fromBase64("xxx"))
+            .set("keydesc").to((ByteArray) null)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(ByteArray.fromBase64("xxx"))
+            .set("keydesc").to(ByteArray.fromBase64("zzzz"))
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(ByteArray.fromBase64("xxx"))
+            .set("keydesc").to(ByteArray.fromBase64("ssss"))
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(ByteArray.fromBase64("xxx"))
+            .set("keydesc").to(ByteArray.fromBase64("aaa"))
+            .build());
+
+    List<Key> keys = Arrays.asList(
+        Key.of(ByteArray.fromBase64("abc"), ByteArray.fromBase64("zzz")),
+        Key.of(ByteArray.fromBase64("xxx"), null),
+        Key.of(ByteArray.fromBase64("xxx"), ByteArray.fromBase64("zzz")),
+        Key.of(ByteArray.fromBase64("xxx"), ByteArray.fromBase64("sss")),
+        Key.of(ByteArray.fromBase64("xxx"), ByteArray.fromBase64("aaa"))
+    );
+
+    verifyEncodedOrdering(schema, mutations);
+    verifyEncodedOrdering(schema, "test", keys);
+  }
+
+  @Test
+  public void dateKeys() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+
+    builder.addColumn("test", "key", "DATE");
+    builder.addKeyPart("test", "key", false);
+
+    builder.addColumn("test", "keydesc", "DATE");
+    builder.addKeyPart("test", "keydesc", true);
+
+    SpannerSchema schema = builder.build();
+
+    List<Mutation> mutations = Arrays.asList(
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(Date.fromYearMonthDay(2012, 10, 10))
+            .set("keydesc").to(Date.fromYearMonthDay(2000, 10, 10))
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(Date.fromYearMonthDay(2020, 10, 10))
+            .set("keydesc").to((Date) null)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(Date.fromYearMonthDay(2020, 10, 10))
+            .set("keydesc").to(Date.fromYearMonthDay(2050, 10, 10))
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(Date.fromYearMonthDay(2020, 10, 10))
+            .set("keydesc").to(Date.fromYearMonthDay(2000, 10, 10))
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(Date.fromYearMonthDay(2020, 10, 10))
+            .set("keydesc").to(Date.fromYearMonthDay(1900, 10, 10))
+            .build());
+
+    List<Key> keys = Arrays.asList(
+        Key.of(Date.fromYearMonthDay(2012, 10, 10), ByteArray.fromBase64("zzz")),
+        Key.of(Date.fromYearMonthDay(2015, 10, 10), null),
+        Key.of(Date.fromYearMonthDay(2015, 10, 10), Date.fromYearMonthDay(2050, 10, 10)),
+        Key.of(Date.fromYearMonthDay(2015, 10, 10), Date.fromYearMonthDay(2000, 10, 10)),
+        Key.of(Date.fromYearMonthDay(2015, 10, 10), Date.fromYearMonthDay(1900, 10, 10))
+    );
+
+    verifyEncodedOrdering(schema, mutations);
+    verifyEncodedOrdering(schema, "test", keys);
+  }
+
+  @Test
+  public void timestampKeys() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+
+    builder.addColumn("test", "key", "TIMESTAMP");
+    builder.addKeyPart("test", "key", false);
+
+    builder.addColumn("test", "keydesc", "TIMESTAMP");
+    builder.addKeyPart("test", "keydesc", true);
+
+    SpannerSchema schema = builder.build();
+
+    List<Mutation> mutations = Arrays.asList(
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(Timestamp.ofTimeMicroseconds(10000))
+            .set("keydesc").to(Timestamp.ofTimeMicroseconds(50000))
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(Timestamp.ofTimeMicroseconds(20000))
+            .set("keydesc").to((Timestamp) null)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(Timestamp.ofTimeMicroseconds(20000))
+            .set("keydesc").to(Timestamp.ofTimeMicroseconds(90000))
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(Timestamp.ofTimeMicroseconds(20000))
+            .set("keydesc").to(Timestamp.ofTimeMicroseconds(50000))
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("key").to(Timestamp.ofTimeMicroseconds(20000))
+            .set("keydesc").to(Timestamp.ofTimeMicroseconds(10000))
+            .build());
+
+
+    List<Key> keys = Arrays.asList(
+        Key.of(Timestamp.ofTimeMicroseconds(10000), ByteArray.fromBase64("zzz")),
+        Key.of(Timestamp.ofTimeMicroseconds(20000), null),
+        Key.of(Timestamp.ofTimeMicroseconds(20000), Timestamp.ofTimeMicroseconds(90000)),
+        Key.of(Timestamp.ofTimeMicroseconds(20000), Timestamp.ofTimeMicroseconds(50000)),
+        Key.of(Timestamp.ofTimeMicroseconds(20000), Timestamp.ofTimeMicroseconds(10000))
+    );
+
+    verifyEncodedOrdering(schema, mutations);
+    verifyEncodedOrdering(schema, "test", keys);
+  }
+
+  @Test
+  public void boolKeys() throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+
+    builder.addColumn("test", "boolkey", "BOOL");
+    builder.addKeyPart("test", "boolkey", false);
+
+    builder.addColumn("test", "boolkeydesc", "BOOL");
+    builder.addKeyPart("test", "boolkeydesc", true);
+
+    SpannerSchema schema = builder.build();
+
+    List<Mutation> mutations = Arrays.asList(
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("boolkey").to(true)
+            .set("boolkeydesc").to(false)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("boolkey").to(false)
+            .set("boolkeydesc").to(false)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("boolkey").to(false)
+            .set("boolkeydesc").to(true)
+            .build(),
+        Mutation.newInsertOrUpdateBuilder("test")
+            .set("boolkey").to((Boolean) null)
+            .set("boolkeydesc").to(false)
+            .build()
+    );
+
+    List<Key> keys = Arrays.asList(
+        Key.of(true, ByteArray.fromBase64("zzz")),
+        Key.of(false, null),
+        Key.of(false, false),
+        Key.of(false, true),
+        Key.of(null, false)
+    );
+
+    verifyEncodedOrdering(schema, mutations);
+    verifyEncodedOrdering(schema, "test", keys);
+  }
+
+  private void verifyEncodedOrdering(SpannerSchema schema, List<Mutation> mutations) {
+    MutationGroupEncoder encoder = new MutationGroupEncoder(schema);
+    List<byte[]> mutationEncodings = new ArrayList<>(mutations.size());
+    for (Mutation m : mutations) {
+      mutationEncodings.add(encoder.encodeKey(m));
+    }
+    List<byte[]> copy = new ArrayList<>(mutationEncodings);
+    Collections.sort(copy, UnsignedBytes.lexicographicalComparator());
+
+    Assert.assertEquals(mutationEncodings, copy);
+  }
+
+  private void verifyEncodedOrdering(SpannerSchema schema, String table, List<Key> keys) {
+    MutationGroupEncoder encoder = new MutationGroupEncoder(schema);
+    List<byte[]> keyEncodings = new ArrayList<>(keys.size());
+    for (Key k : keys) {
+      keyEncodings.add(encoder.encodeKey(table, k));
+    }
+    List<byte[]> copy = new ArrayList<>(keyEncodings);
+    Collections.sort(copy, UnsignedBytes.lexicographicalComparator());
+
+    Assert.assertEquals(keyEncodings, copy);
+  }
+
+  private MutationGroup g(Mutation mutation, Mutation... other) {
+    return MutationGroup.create(mutation, other);
+  }
+
+  private void encodeAndVerify(MutationGroup expected) {
+    SpannerSchema schema = this.allTypesSchema;
+    encodeAndVerify(expected, schema);
+  }
+
+  private static void encodeAndVerify(MutationGroup expected, SpannerSchema schema) {
+    MutationGroupEncoder coder = new MutationGroupEncoder(schema);
+    byte[] encode = coder.encode(expected);
+    MutationGroup actual = coder.decode(encode);
+
+    Assert.assertTrue(mutationGroupsEqual(expected, actual));
+  }
+
+  private static boolean mutationGroupsEqual(MutationGroup a, MutationGroup b) {
+    ImmutableList<Mutation> alist = ImmutableList.copyOf(a);
+    ImmutableList<Mutation> blist = ImmutableList.copyOf(b);
+
+    if (alist.size() != blist.size()) {
+      return false;
+    }
+
+    for (int i = 0; i < alist.size(); i++) {
+      if (!mutationsEqual(alist.get(i), blist.get(i))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Is different from Mutation#equals. Case insensitive for table/column names, the order of
+  // the columns doesn't matter.
+  private static boolean mutationsEqual(Mutation a, Mutation b) {
+    if (a == b) {
+      return true;
+    }
+    if (a == null || b == null) {
+      return false;
+    }
+    if (a.getOperation() != b.getOperation()) {
+      return false;
+    }
+    if (!a.getTable().equalsIgnoreCase(b.getTable())) {
+      return false;
+    }
+    if (a.getOperation() == Mutation.Op.DELETE) {
+      return a.getKeySet().equals(b.getKeySet());
+    }
+
+    // Compare pairs instead? This seems to be good enough...
+    return ImmutableSet.copyOf(getNormalizedColumns(a))
+        .equals(ImmutableSet.copyOf(getNormalizedColumns(b))) && ImmutableSet.copyOf(a.getValues())
+        .equals(ImmutableSet.copyOf(b.getValues()));
+  }
+
+  // Pray for Java 8 support.
+  private static Iterable<String> getNormalizedColumns(Mutation a) {
+    return Iterables.transform(a.getColumns(), new Function<String, String>() {
+
+      @Override
+      public String apply(String input) {
+        return input.toLowerCase();
+      }
+    });
+  }
+}


[2/2] beam git commit: This closes #4014: [BEAM-1542] SpannerIO: mutation encoding and size estimation improvements

Posted by jk...@apache.org.
This closes #4014: [BEAM-1542] SpannerIO: mutation encoding and size estimation improvements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/09f68159
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/09f68159
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/09f68159

Branch: refs/heads/master
Commit: 09f68159df28b2c83a6be1643984c0e28507989b
Parents: aa26f4b 3ba9600
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Oct 30 17:02:42 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Oct 30 17:02:42 2017 -0700

----------------------------------------------------------------------
 .../io/gcp/spanner/MutationGroupEncoder.java    | 660 +++++++++++++++++++
 .../io/gcp/spanner/MutationSizeEstimator.java   |  48 ++
 .../gcp/spanner/MutationGroupEncoderTest.java   | 636 ++++++++++++++++++
 3 files changed, 1344 insertions(+)
----------------------------------------------------------------------