You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 22:07:54 UTC

[06/10] incubator-fluo-recipes git commit: Updated package names in core module

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
new file mode 100644
index 0000000..61fec47
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.BytesBuilder;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.recipes.core.common.Pirtos;
+import org.apache.fluo.recipes.core.common.RowRange;
+import org.apache.fluo.recipes.core.common.TransientRegistry;
+import org.apache.fluo.recipes.core.impl.BucketUtil;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+/**
+ * See the project level documentation for information about this recipe.
+ */
+public class CollisionFreeMap<K, V> {
+
+  private static final String UPDATE_RANGE_END = ":u:~";
+
+  private static final String DATA_RANGE_END = ":d:~";
+
+  private String mapId;
+
+  private Class<K> keyType;
+  private Class<V> valType;
+  private SimpleSerializer serializer;
+  private Combiner<K, V> combiner;
+  UpdateObserver<K, V> updateObserver;
+  private long bufferSize;
+
+  static final Column UPDATE_COL = new Column("u", "v");
+  static final Column NEXT_COL = new Column("u", "next");
+
+  private int numBuckets = -1;
+
+  @SuppressWarnings("unchecked")
+  CollisionFreeMap(Options opts, SimpleSerializer serializer) throws Exception {
+
+    this.mapId = opts.mapId;
+    // TODO defer loading classes
+    // TODO centralize class loading
+    // TODO try to check type params
+    this.numBuckets = opts.numBuckets;
+    this.keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType);
+    this.valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType);
+    this.combiner =
+        (Combiner<K, V>) getClass().getClassLoader().loadClass(opts.combinerType).newInstance();
+    this.serializer = serializer;
+    if (opts.updateObserverType != null) {
+      this.updateObserver =
+          getClass().getClassLoader().loadClass(opts.updateObserverType)
+              .asSubclass(UpdateObserver.class).newInstance();
+    } else {
+      this.updateObserver = new NullUpdateObserver<>();
+    }
+    this.bufferSize = opts.getBufferSize();
+  }
+
+  private V deserVal(Bytes val) {
+    return serializer.deserialize(val.toArray(), valType);
+  }
+
+  private Bytes getKeyFromUpdateRow(Bytes prefix, Bytes row) {
+    return row.subSequence(prefix.length(), row.length() - 8);
+  }
+
+  void process(TransactionBase tx, Bytes ntfyRow, Column col) throws Exception {
+
+    Bytes nextKey = tx.get(ntfyRow, NEXT_COL);
+
+    ScannerConfiguration sc = new ScannerConfiguration();
+
+    if (nextKey != null) {
+      Bytes startRow =
+          Bytes.newBuilder(ntfyRow.length() + nextKey.length()).append(ntfyRow).append(nextKey)
+              .toBytes();
+      Span tmpSpan = Span.prefix(ntfyRow);
+      Span nextSpan =
+          new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(),
+              tmpSpan.isEndInclusive());
+      sc.setSpan(nextSpan);
+    } else {
+      sc.setSpan(Span.prefix(ntfyRow));
+    }
+
+    sc.setSpan(Span.prefix(ntfyRow));
+    sc.fetchColumn(UPDATE_COL.getFamily(), UPDATE_COL.getQualifier());
+    RowIterator iter = tx.get(sc);
+
+    Map<Bytes, List<Bytes>> updates = new HashMap<>();
+
+    long approxMemUsed = 0;
+
+    Bytes partiallyReadKey = null;
+
+    if (iter.hasNext()) {
+      Bytes lastKey = null;
+      while (iter.hasNext() && approxMemUsed < bufferSize) {
+        Entry<Bytes, ColumnIterator> rowCol = iter.next();
+        Bytes curRow = rowCol.getKey();
+
+        tx.delete(curRow, UPDATE_COL);
+
+        Bytes serializedKey = getKeyFromUpdateRow(ntfyRow, curRow);
+        lastKey = serializedKey;
+
+        List<Bytes> updateList = updates.get(serializedKey);
+        if (updateList == null) {
+          updateList = new ArrayList<>();
+          updates.put(serializedKey, updateList);
+        }
+
+        Bytes val = rowCol.getValue().next().getValue();
+        updateList.add(val);
+
+        approxMemUsed += curRow.length();
+        approxMemUsed += val.length();
+      }
+
+      if (iter.hasNext()) {
+        Entry<Bytes, ColumnIterator> rowCol = iter.next();
+        Bytes curRow = rowCol.getKey();
+
+        // check if more updates for last key
+        if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) {
+          // there are still more updates for this key
+          partiallyReadKey = lastKey;
+
+          // start next time at the current key
+          tx.set(ntfyRow, NEXT_COL, partiallyReadKey);
+        } else {
+          // start next time at the next possible key
+          Bytes nextPossible =
+              Bytes.newBuilder(lastKey.length() + 1).append(lastKey).append(new byte[] {0})
+                  .toBytes();
+          tx.set(ntfyRow, NEXT_COL, nextPossible);
+        }
+
+        // may not read all data because of mem limit, so notify self
+        tx.setWeakNotification(ntfyRow, col);
+      } else if (nextKey != null) {
+        // clear nextKey
+        tx.delete(ntfyRow, NEXT_COL);
+      }
+    } else if (nextKey != null) {
+      tx.delete(ntfyRow, NEXT_COL);
+    }
+
+    byte[] dataPrefix = ntfyRow.toArray();
+    // TODO this is awful... no sanity check... hard to read
+    dataPrefix[Bytes.of(mapId).length() + 1] = 'd';
+
+    BytesBuilder rowBuilder = Bytes.newBuilder();
+    rowBuilder.append(dataPrefix);
+    int rowPrefixLen = rowBuilder.getLength();
+
+    Set<Bytes> keysToFetch = updates.keySet();
+    if (partiallyReadKey != null) {
+      final Bytes prk = partiallyReadKey;
+      keysToFetch = Sets.filter(keysToFetch, b -> !b.equals(prk));
+    }
+    Map<Bytes, Map<Column, Bytes>> currentVals = getCurrentValues(tx, rowBuilder, keysToFetch);
+
+    ArrayList<Update<K, V>> updatesToReport = new ArrayList<>(updates.size());
+
+    for (Entry<Bytes, List<Bytes>> entry : updates.entrySet()) {
+      rowBuilder.setLength(rowPrefixLen);
+      Bytes currentValueRow = rowBuilder.append(entry.getKey()).toBytes();
+      Bytes currVal =
+          currentVals.getOrDefault(currentValueRow, Collections.emptyMap()).get(DATA_COLUMN);
+
+      Iterator<V> ui = Iterators.transform(entry.getValue().iterator(), this::deserVal);
+
+      K kd = serializer.deserialize(entry.getKey().toArray(), keyType);
+
+      if (partiallyReadKey != null && partiallyReadKey.equals(entry.getKey())) {
+        // not all updates were read for this key, so requeue the combined updates as an update
+        Optional<V> nv = combiner.combine(kd, ui);
+        if (nv.isPresent()) {
+          update(tx, Collections.singletonMap(kd, nv.get()));
+        }
+      } else {
+        Optional<V> nv = combiner.combine(kd, concat(ui, currVal));
+        Bytes newVal = nv.isPresent() ? Bytes.of(serializer.serialize(nv.get())) : null;
+        if (newVal != null ^ currVal != null || (currVal != null && !currVal.equals(newVal))) {
+          if (newVal == null) {
+            tx.delete(currentValueRow, DATA_COLUMN);
+          } else {
+            tx.set(currentValueRow, DATA_COLUMN, newVal);
+          }
+
+          Optional<V> cvd = Optional.ofNullable(currVal).map(this::deserVal);
+          updatesToReport.add(new Update<>(kd, cvd, nv));
+        }
+      }
+    }
+
+    // TODO could clear these as converted to objects to avoid double memory usage
+    updates.clear();
+    currentVals.clear();
+
+    if (updatesToReport.size() > 0) {
+      updateObserver.updatingValues(tx, updatesToReport.iterator());
+    }
+  }
+
+  private static final Column DATA_COLUMN = new Column("data", "current");
+
+  private Map<Bytes, Map<Column, Bytes>> getCurrentValues(TransactionBase tx, BytesBuilder prefix,
+      Set<Bytes> keySet) {
+
+    Set<Bytes> rows = new HashSet<>();
+
+    int prefixLen = prefix.getLength();
+    for (Bytes key : keySet) {
+      prefix.setLength(prefixLen);
+      rows.add(prefix.append(key).toBytes());
+    }
+
+    try {
+      return tx.get(rows, Collections.singleton(DATA_COLUMN));
+    } catch (IllegalArgumentException e) {
+      System.out.println(rows.size());
+      throw e;
+    }
+  }
+
+  private Iterator<V> concat(Iterator<V> updates, Bytes currentVal) {
+    if (currentVal == null) {
+      return updates;
+    }
+
+    return Iterators.concat(updates, Iterators.singletonIterator(deserVal(currentVal)));
+  }
+
+  /**
+   * This method will retrieve the current value for key and any outstanding updates and combine
+   * them using the configured {@link Combiner}. The result from the combiner is returned.
+   */
+  public V get(SnapshotBase tx, K key) {
+
+    byte[] k = serializer.serialize(key);
+
+    int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+    String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+
+
+    BytesBuilder rowBuilder = Bytes.newBuilder();
+    rowBuilder.append(mapId).append(":u:").append(bucketId).append(":").append(k);
+
+    ScannerConfiguration sc = new ScannerConfiguration();
+    sc.setSpan(Span.prefix(rowBuilder.toBytes()));
+
+    RowIterator iter = tx.get(sc);
+
+    Iterator<V> ui;
+
+    if (iter.hasNext()) {
+      ui = Iterators.transform(iter, e -> deserVal(e.getValue().next().getValue()));
+    } else {
+      ui = Collections.<V>emptyList().iterator();
+    }
+
+    rowBuilder.setLength(mapId.length());
+    rowBuilder.append(":d:").append(bucketId).append(":").append(k);
+
+    Bytes dataRow = rowBuilder.toBytes();
+
+    Bytes cv = tx.get(dataRow, DATA_COLUMN);
+
+    if (!ui.hasNext()) {
+      if (cv == null) {
+        return null;
+      } else {
+        return deserVal(cv);
+      }
+    }
+
+    return combiner.combine(key, concat(ui, cv)).orElse(null);
+  }
+
+  String getId() {
+    return mapId;
+  }
+
+  /**
+   * Queues updates for a collision free map. These updates will be made by an Observer executing
+   * another transaction. This method will not collide with other transaction queuing updates for
+   * the same keys.
+   *
+   * @param tx This transaction will be used to make the updates.
+   * @param updates The keys in the map should correspond to keys in the collision free map being
+   *        updated. The values in the map will be queued for updating.
+   */
+  public void update(TransactionBase tx, Map<K, V> updates) {
+    Preconditions.checkState(numBuckets > 0, "Not initialized");
+
+    Set<String> buckets = new HashSet<>();
+
+    BytesBuilder rowBuilder = Bytes.newBuilder();
+    rowBuilder.append(mapId).append(":u:");
+    int prefixLength = rowBuilder.getLength();
+
+    byte[] startTs = encSeq(tx.getStartTimestamp());
+
+    for (Entry<K, V> entry : updates.entrySet()) {
+      byte[] k = serializer.serialize(entry.getKey());
+      int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+      String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+
+      // reset to the common row prefix
+      rowBuilder.setLength(prefixLength);
+
+      Bytes row = rowBuilder.append(bucketId).append(":").append(k).append(startTs).toBytes();
+      Bytes val = Bytes.of(serializer.serialize(entry.getValue()));
+
+      // TODO set if not exists would be comforting here.... but
+      // collisions on bucketId+key+uuid should never occur
+      tx.set(row, UPDATE_COL, val);
+
+      buckets.add(bucketId);
+    }
+
+    for (String bucketId : buckets) {
+      rowBuilder.setLength(prefixLength);
+      rowBuilder.append(bucketId).append(":");
+
+      Bytes row = rowBuilder.toBytes();
+
+      tx.setWeakNotification(row, new Column("fluoRecipes", "cfm:" + mapId));
+    }
+  }
+
+
+  public static <K2, V2> CollisionFreeMap<K2, V2> getInstance(String mapId,
+      SimpleConfiguration appConf) {
+    Options opts = new Options(mapId, appConf);
+    try {
+      return new CollisionFreeMap<>(opts, SimpleSerializer.getInstance(appConf));
+    } catch (Exception e) {
+      // TODO
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * A @link {@link CollisionFreeMap} stores data in its own data format in the Fluo table. When
+   * initializing a Fluo table with something like Map Reduce or Spark, data will need to be written
+   * in this format. Thats the purpose of this method, it provide a simple class that can do this
+   * conversion.
+   *
+   */
+  public static <K2, V2> Initializer<K2, V2> getInitializer(String mapId, int numBuckets,
+      SimpleSerializer serializer) {
+    return new Initializer<>(mapId, numBuckets, serializer);
+  }
+
+
+  /**
+   * @see CollisionFreeMap#getInitializer(String, int, SimpleSerializer)
+   */
+  public static class Initializer<K2, V2> implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String mapId;
+
+    private SimpleSerializer serializer;
+
+    private int numBuckets = -1;
+
+    private Initializer(String mapId, int numBuckets, SimpleSerializer serializer) {
+      this.mapId = mapId;
+      this.numBuckets = numBuckets;
+      this.serializer = serializer;
+    }
+
+    public RowColumnValue convert(K2 key, V2 val) {
+      byte[] k = serializer.serialize(key);
+      int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+      String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+
+      BytesBuilder bb = Bytes.newBuilder();
+      Bytes row = bb.append(mapId).append(":d:").append(bucketId).append(":").append(k).toBytes();
+      byte[] v = serializer.serialize(val);
+
+      return new RowColumnValue(row, DATA_COLUMN, Bytes.of(v));
+    }
+  }
+
+  public static class Options {
+
+    static final long DEFAULT_BUFFER_SIZE = 1 << 22;
+    static final int DEFAULT_BUCKETS_PER_TABLET = 10;
+
+    int numBuckets;
+    Integer bucketsPerTablet = null;
+
+    Long bufferSize;
+
+    String keyType;
+    String valueType;
+    String combinerType;
+    String updateObserverType;
+    String mapId;
+
+    private static final String PREFIX = "recipes.cfm.";
+
+    Options(String mapId, SimpleConfiguration appConfig) {
+      this.mapId = mapId;
+
+      this.numBuckets = appConfig.getInt(PREFIX + mapId + ".buckets");
+      this.combinerType = appConfig.getString(PREFIX + mapId + ".combiner");
+      this.keyType = appConfig.getString(PREFIX + mapId + ".key");
+      this.valueType = appConfig.getString(PREFIX + mapId + ".val");
+      this.updateObserverType = appConfig.getString(PREFIX + mapId + ".updateObserver", null);
+      this.bufferSize = appConfig.getLong(PREFIX + mapId + ".bufferSize", DEFAULT_BUFFER_SIZE);
+      this.bucketsPerTablet =
+          appConfig.getInt(PREFIX + mapId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
+    }
+
+    public Options(String mapId, String combinerType, String keyType, String valType, int buckets) {
+      Preconditions.checkArgument(buckets > 0);
+      Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
+
+      this.mapId = mapId;
+      this.numBuckets = buckets;
+      this.combinerType = combinerType;
+      this.updateObserverType = null;
+      this.keyType = keyType;
+      this.valueType = valType;
+    }
+
+    public Options(String mapId, String combinerType, String updateObserverType, String keyType,
+        String valueType, int buckets) {
+      Preconditions.checkArgument(buckets > 0);
+      Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
+
+      this.mapId = mapId;
+      this.numBuckets = buckets;
+      this.combinerType = combinerType;
+      this.updateObserverType = updateObserverType;
+      this.keyType = keyType;
+      this.valueType = valueType;
+    }
+
+    /**
+     * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
+     * be used to actually deserialize and process the updates. This limit does not account for
+     * object overhead in java, which can be significant.
+     *
+     * <p>
+     * The way memory read is calculated is by summing the length of serialized key and value byte
+     * arrays. Once this sum exceeds the configured memory limit, no more update key values are
+     * processed in the current transaction. When not everything is processed, the observer
+     * processing updates will notify itself causing another transaction to continue processing
+     * later
+     */
+    public Options setBufferSize(long bufferSize) {
+      Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    long getBufferSize() {
+      if (bufferSize == null) {
+        return DEFAULT_BUFFER_SIZE;
+      }
+
+      return bufferSize;
+    }
+
+    /**
+     * Sets the number of buckets per tablet to generate. This affects how many split points will be
+     * generated when optimizing the Accumulo table.
+     *
+     */
+    public Options setBucketsPerTablet(int bucketsPerTablet) {
+      Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
+          + bucketsPerTablet);
+      this.bucketsPerTablet = bucketsPerTablet;
+      return this;
+    }
+
+    int getBucketsPerTablet() {
+      if (bucketsPerTablet == null) {
+        return DEFAULT_BUCKETS_PER_TABLET;
+      }
+
+      return bucketsPerTablet;
+    }
+
+    public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner, Class<K> keyType,
+        Class<V> valueType, int buckets) {
+      this(mapId, combiner.getName(), keyType.getName(), valueType.getName(), buckets);
+    }
+
+    public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner,
+        Class<? extends UpdateObserver<K, V>> updateObserver, Class<K> keyType, Class<V> valueType,
+        int buckets) {
+      this(mapId, combiner.getName(), updateObserver.getName(), keyType.getName(), valueType
+          .getName(), buckets);
+    }
+
+    void save(SimpleConfiguration appConfig) {
+      appConfig.setProperty(PREFIX + mapId + ".buckets", numBuckets + "");
+      appConfig.setProperty(PREFIX + mapId + ".combiner", combinerType + "");
+      appConfig.setProperty(PREFIX + mapId + ".key", keyType);
+      appConfig.setProperty(PREFIX + mapId + ".val", valueType);
+      if (updateObserverType != null) {
+        appConfig.setProperty(PREFIX + mapId + ".updateObserver", updateObserverType + "");
+      }
+      if (bufferSize != null) {
+        appConfig.setProperty(PREFIX + mapId + ".bufferSize", bufferSize);
+      }
+      if (bucketsPerTablet != null) {
+        appConfig.setProperty(PREFIX + mapId + ".bucketsPerTablet", bucketsPerTablet);
+      }
+    }
+  }
+
+  /**
+   * This method configures a collision free map for use. It must be called before initializing
+   * Fluo.
+   */
+  public static void configure(FluoConfiguration fluoConfig, Options opts) {
+    opts.save(fluoConfig.getAppConfiguration());
+    fluoConfig.addObserver(new ObserverConfiguration(CollisionFreeMapObserver.class.getName())
+        .setParameters(ImmutableMap.of("mapId", opts.mapId)));
+
+    Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
+    Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
+
+    new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("cfm." + opts.mapId,
+        new RowRange(dataRangeEnd, updateRangeEnd));
+  }
+
+  /**
+   * Return suggested Fluo table optimizations for all previously configured collision free maps.
+   *
+   * @param appConfig Must pass in the application configuration obtained from
+   *        {@code FluoClient.getAppConfiguration()} or
+   *        {@code FluoConfiguration.getAppConfiguration()}
+   */
+  public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
+    HashSet<String> mapIds = new HashSet<>();
+    appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
+        k -> mapIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
+
+    Pirtos pirtos = new Pirtos();
+    mapIds.forEach(mid -> pirtos.merge(getTableOptimizations(mid, appConfig)));
+
+    return pirtos;
+  }
+
+  /**
+   * Return suggested Fluo table optimizations for the specified collisiong free map.
+   *
+   * @param appConfig Must pass in the application configuration obtained from
+   *        {@code FluoClient.getAppConfiguration()} or
+   *        {@code FluoConfiguration.getAppConfiguration()}
+   */
+  public static Pirtos getTableOptimizations(String mapId, SimpleConfiguration appConfig) {
+    Options opts = new Options(mapId, appConfig);
+
+    BytesBuilder rowBuilder = Bytes.newBuilder();
+    rowBuilder.append(mapId);
+
+    List<Bytes> dataSplits = new ArrayList<>();
+    for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+      String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
+      rowBuilder.setLength(mapId.length());
+      dataSplits.add(rowBuilder.append(":d:").append(bucketId).toBytes());
+    }
+    Collections.sort(dataSplits);
+
+    List<Bytes> updateSplits = new ArrayList<>();
+    for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+      String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
+      rowBuilder.setLength(mapId.length());
+      updateSplits.add(rowBuilder.append(":u:").append(bucketId).toBytes());
+    }
+    Collections.sort(updateSplits);
+
+    Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
+    Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
+
+    List<Bytes> splits = new ArrayList<>();
+    splits.add(dataRangeEnd);
+    splits.add(updateRangeEnd);
+    splits.addAll(dataSplits);
+    splits.addAll(updateSplits);
+
+    Pirtos pirtos = new Pirtos();
+    pirtos.setSplits(splits);
+
+    pirtos.setTabletGroupingRegex(Pattern.quote(mapId + ":") + "[du]:");
+
+    return pirtos;
+  }
+
+  private static byte[] encSeq(long l) {
+    byte[] ret = new byte[8];
+    ret[0] = (byte) (l >>> 56);
+    ret[1] = (byte) (l >>> 48);
+    ret[2] = (byte) (l >>> 40);
+    ret[3] = (byte) (l >>> 32);
+    ret[4] = (byte) (l >>> 24);
+    ret[5] = (byte) (l >>> 16);
+    ret[6] = (byte) (l >>> 8);
+    ret[7] = (byte) (l >>> 0);
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMapObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMapObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMapObserver.java
new file mode 100644
index 0000000..96890af
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMapObserver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+
+/**
+ * This class is configured for use by CollisionFreeMap.configure(FluoConfiguration,
+ * CollisionFreeMap.Options) . This class should never have to be used directly.
+ */
+
+public class CollisionFreeMapObserver extends AbstractObserver {
+
+  @SuppressWarnings("rawtypes")
+  private CollisionFreeMap cfm;
+  private String mapId;
+
+  public CollisionFreeMapObserver() {}
+
+  @Override
+  public void init(Context context) throws Exception {
+    this.mapId = context.getParameters().get("mapId");
+    cfm = CollisionFreeMap.getInstance(mapId, context.getAppConfiguration());
+    cfm.updateObserver.init(mapId, context);
+  }
+
+  @Override
+  public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+    cfm.process(tx, row, col);
+  }
+
+  @Override
+  public ObservedColumn getObservedColumn() {
+    // TODO constants
+    return new ObservedColumn(new Column("fluoRecipes", "cfm:" + mapId), NotificationType.WEAK);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Combiner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Combiner.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Combiner.java
new file mode 100644
index 0000000..c9d468b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Combiner.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+public interface Combiner<K, V> {
+  /**
+   * This function is called to combine the current value of a key with updates that were queued for
+   * the key. See the collision free map project level documentation for more information.
+   *
+   * @return Then new value for the key. Returning Optional.absent() will cause the key to be
+   *         deleted.
+   */
+
+  Optional<V> combine(K key, Iterator<V> updates);
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/NullUpdateObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/NullUpdateObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/NullUpdateObserver.java
new file mode 100644
index 0000000..7bbfec1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/NullUpdateObserver.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Iterator;
+
+import org.apache.fluo.api.client.TransactionBase;
+
+class NullUpdateObserver<K, V> extends UpdateObserver<K, V> {
+  @Override
+  public void updatingValues(TransactionBase tx, Iterator<Update<K, V>> updates) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java
new file mode 100644
index 0000000..10e718e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Optional;
+
+public class Update<K, V> {
+
+  private final K key;
+  private final Optional<V> oldValue;
+  private final Optional<V> newValue;
+
+  Update(K key, Optional<V> oldValue, Optional<V> newValue) {
+    this.key = key;
+    this.oldValue = oldValue;
+    this.newValue = newValue;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public Optional<V> getNewValue() {
+    return newValue;
+  }
+
+  public Optional<V> getOldValue() {
+    return oldValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/UpdateObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/UpdateObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/UpdateObserver.java
new file mode 100644
index 0000000..2e48451
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/UpdateObserver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Iterator;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.observer.Observer.Context;
+
+/**
+ * A {@link CollisionFreeMap} calls this to allow additional processing to be done when key values
+ * are updated. See the project level documentation for more information.
+ */
+
+public abstract class UpdateObserver<K, V> {
+  public void init(String mapId, Context observerContext) throws Exception {}
+
+  public abstract void updatingValues(TransactionBase tx, Iterator<Update<K, V>> updates);
+
+  // TODO add close
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java
new file mode 100644
index 0000000..589b9cc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.serialization;
+
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+
+public interface SimpleSerializer {
+
+  /**
+   * Called immediately after construction and passed Fluo application configuration.
+   */
+  public void init(SimpleConfiguration appConfig);
+
+  // TODO refactor to support reuse of objects and byte arrays???
+  public <T> byte[] serialize(T obj);
+
+  public <T> T deserialize(byte[] serObj, Class<T> clazz);
+
+  public static void setSetserlializer(FluoConfiguration fluoConfig,
+      Class<? extends SimpleSerializer> serializerType) {
+    setSetserlializer(fluoConfig, serializerType.getName());
+  }
+
+  public static void setSetserlializer(FluoConfiguration fluoConfig, String serializerType) {
+    fluoConfig.getAppConfiguration().setProperty("recipes.serializer", serializerType);
+  }
+
+  public static SimpleSerializer getInstance(SimpleConfiguration appConfig) {
+    String serType =
+        appConfig.getString("recipes.serializer",
+            "org.apache.fluo.recipes.kryo.KryoSimplerSerializer");
+    try {
+      SimpleSerializer simplerSer =
+          SimpleSerializer.class.getClassLoader().loadClass(serType)
+              .asSubclass(SimpleSerializer.class).newInstance();
+      simplerSer.init(appConfig);
+      return simplerSer;
+    } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java
new file mode 100644
index 0000000..fa25b63
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.transaction;
+
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+/**
+ * Logs an operation (i.e GET, SET, or DELETE) in a Transaction. Multiple LogEntry objects make up a
+ * {@link TxLog}.
+ */
+public class LogEntry {
+
+  public enum Operation {
+    GET, SET, DELETE
+  }
+
+  private Operation op;
+  private Bytes row;
+  private Column col;
+  private Bytes value;
+
+  private LogEntry() {}
+
+  private LogEntry(Operation op, Bytes row, Column col, Bytes value) {
+    Objects.requireNonNull(op);
+    Objects.requireNonNull(row);
+    Objects.requireNonNull(col);
+    Objects.requireNonNull(value);
+    this.op = op;
+    this.row = row;
+    this.col = col;
+    this.value = value;
+  }
+
+  public Operation getOp() {
+    return op;
+  }
+
+  public Bytes getRow() {
+    return row;
+  }
+
+  public Column getColumn() {
+    return col;
+  }
+
+  public Bytes getValue() {
+    return value;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof LogEntry) {
+      LogEntry other = (LogEntry) o;
+      return ((op == other.op) && row.equals(other.row) && col.equals(other.col) && value
+          .equals(other.value));
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = op.hashCode();
+    result = 31 * result + row.hashCode();
+    result = 31 * result + col.hashCode();
+    result = 31 * result + value.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "LogEntry{op=" + op + ", row=" + row + ", col=" + col + ", value=" + value + "}";
+  }
+
+  public static LogEntry newGet(String row, Column col, String value) {
+    return newGet(Bytes.of(row), col, Bytes.of(value));
+  }
+
+  public static LogEntry newGet(Bytes row, Column col, Bytes value) {
+    return new LogEntry(Operation.GET, row, col, value);
+  }
+
+  public static LogEntry newSet(String row, Column col, String value) {
+    return newSet(Bytes.of(row), col, Bytes.of(value));
+  }
+
+  public static LogEntry newSet(Bytes row, Column col, Bytes value) {
+    return new LogEntry(Operation.SET, row, col, value);
+  }
+
+  public static LogEntry newDelete(String row, Column col) {
+    return newDelete(Bytes.of(row), col);
+  }
+
+  public static LogEntry newDelete(Bytes row, Column col) {
+    return new LogEntry(Operation.DELETE, row, col, Bytes.EMPTY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransaction.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransaction.java
new file mode 100644
index 0000000..8fb98d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransaction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.transaction;
+
+import java.util.function.Predicate;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.exceptions.CommitException;
+
+/**
+ * An implementation of {@link Transaction} that logs all transactions operations (GET, SET, or
+ * DELETE) in a {@link TxLog} that can be used for exports
+ */
+public class RecordingTransaction extends RecordingTransactionBase implements Transaction {
+
+  private final Transaction tx;
+
+  private RecordingTransaction(Transaction tx) {
+    super(tx);
+    this.tx = tx;
+  }
+
+  private RecordingTransaction(Transaction tx, Predicate<LogEntry> filter) {
+    super(tx, filter);
+    this.tx = tx;
+  }
+
+  @Override
+  public void commit() throws CommitException {
+    tx.commit();
+  }
+
+  @Override
+  public void close() {
+    tx.close();
+  }
+
+  /**
+   * Creates a RecordingTransaction by wrapping an existing Transaction
+   */
+  public static RecordingTransaction wrap(Transaction tx) {
+    return new RecordingTransaction(tx);
+  }
+
+  /**
+   * Creates a RecordingTransaction using the provided LogEntry filter and existing Transaction
+   */
+  public static RecordingTransaction wrap(Transaction tx, Predicate<LogEntry> filter) {
+    return new RecordingTransaction(tx, filter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
new file mode 100644
index 0000000..6303122
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.transaction;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+
+/**
+ * An implementation of {@link TransactionBase} that logs all transactions operations (GET, SET, or
+ * DELETE) in a {@link TxLog} that can be used for exports
+ */
+public class RecordingTransactionBase implements TransactionBase {
+
+  private final TransactionBase txb;
+  private final TxLog txLog = new TxLog();
+  private final Predicate<LogEntry> filter;
+
+  RecordingTransactionBase(TransactionBase txb, Predicate<LogEntry> filter) {
+    this.txb = txb;
+    this.filter = filter;
+  }
+
+  RecordingTransactionBase(TransactionBase txb) {
+    this(txb, le -> true);
+  }
+
+  @Override
+  public void setWeakNotification(Bytes row, Column col) {
+    txb.setWeakNotification(row, col);
+  }
+
+  @Override
+  public void setWeakNotification(String row, Column col) {
+    txb.setWeakNotification(row, col);
+  }
+
+  @Override
+  public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
+    txLog.filteredAdd(LogEntry.newSet(row, col, value), filter);
+    txb.set(row, col, value);
+  }
+
+  @Override
+  public void set(String row, Column col, String value) throws AlreadySetException {
+    txLog.filteredAdd(LogEntry.newSet(row, col, value), filter);
+    txb.set(row, col, value);
+  }
+
+  @Override
+  public void delete(Bytes row, Column col) {
+    txLog.filteredAdd(LogEntry.newDelete(row, col), filter);
+    txb.delete(row, col);
+  }
+
+  @Override
+  public void delete(String row, Column col) {
+    txLog.filteredAdd(LogEntry.newDelete(row, col), filter);
+    txb.delete(row, col);
+  }
+
+  /**
+   * Logs GETs for returned Row/Columns. Requests that return no data will not be logged.
+   */
+  @Override
+  public Bytes get(Bytes row, Column col) {
+    Bytes val = txb.get(row, col);
+    if (val != null) {
+      txLog.filteredAdd(LogEntry.newGet(row, col, val), filter);
+    }
+    return val;
+  }
+
+  /**
+   * Logs GETs for returned Row/Columns. Requests that return no data will not be logged.
+   */
+  @Override
+  public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+    Map<Column, Bytes> colVal = txb.get(row, columns);
+    for (Map.Entry<Column, Bytes> entry : colVal.entrySet()) {
+      txLog.filteredAdd(LogEntry.newGet(row, entry.getKey(), entry.getValue()), filter);
+    }
+    return colVal;
+  }
+
+  /**
+   * Logs GETs for returned Row/Columns. Requests that return no data will not be logged.
+   */
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+    Map<Bytes, Map<Column, Bytes>> rowColVal = txb.get(rows, columns);
+    for (Map.Entry<Bytes, Map<Column, Bytes>> rowEntry : rowColVal.entrySet()) {
+      for (Map.Entry<Column, Bytes> colEntry : rowEntry.getValue().entrySet()) {
+        txLog.filteredAdd(
+            LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()), filter);
+      }
+    }
+    return rowColVal;
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+    Map<Bytes, Map<Column, Bytes>> rowColVal = txb.get(rowColumns);
+    for (Map.Entry<Bytes, Map<Column, Bytes>> rowEntry : rowColVal.entrySet()) {
+      for (Map.Entry<Column, Bytes> colEntry : rowEntry.getValue().entrySet()) {
+        txLog.filteredAdd(
+            LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()), filter);
+      }
+    }
+    return rowColVal;
+  }
+
+  /**
+   * Logs GETs for Row/Columns returned by iterators. Requests that return no data will not be
+   * logged.
+   */
+  @Override
+  public RowIterator get(ScannerConfiguration config) {
+    final RowIterator rowIter = txb.get(config);
+    if (rowIter != null) {
+      return new RowIterator() {
+
+        @Override
+        public boolean hasNext() {
+          return rowIter.hasNext();
+        }
+
+        @Override
+        public Map.Entry<Bytes, ColumnIterator> next() {
+          final Map.Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
+          if ((rowEntry != null) && (rowEntry.getValue() != null)) {
+            final ColumnIterator colIter = rowEntry.getValue();
+            return new AbstractMap.SimpleEntry<>(rowEntry.getKey(), new ColumnIterator() {
+
+              @Override
+              public boolean hasNext() {
+                return colIter.hasNext();
+              }
+
+              @Override
+              public Map.Entry<Column, Bytes> next() {
+                Map.Entry<Column, Bytes> colEntry = colIter.next();
+                if (colEntry != null) {
+                  txLog.filteredAdd(
+                      LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()),
+                      filter);
+                }
+                return colEntry;
+              }
+            });
+          }
+          return rowEntry;
+        }
+      };
+    }
+    return rowIter;
+  }
+
+  @Override
+  public long getStartTimestamp() {
+    return txb.getStartTimestamp();
+  }
+
+  public TxLog getTxLog() {
+    return txLog;
+  }
+
+  /**
+   * Creates a RecordingTransactionBase by wrapping an existing TransactionBase
+   */
+  public static RecordingTransactionBase wrap(TransactionBase txb) {
+    return new RecordingTransactionBase(txb);
+  }
+
+  /**
+   * Creates a RecordingTransactionBase using the provided LogEntry filter function and existing
+   * TransactionBase
+   */
+  public static RecordingTransactionBase wrap(TransactionBase txb, Predicate<LogEntry> filter) {
+    return new RecordingTransactionBase(txb, filter);
+  }
+
+  @Override
+  public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+    Map<String, Map<Column, String>> rowColVal = txb.gets(rowColumns);
+    for (Map.Entry<String, Map<Column, String>> rowEntry : rowColVal.entrySet()) {
+      for (Map.Entry<Column, String> colEntry : rowEntry.getValue().entrySet()) {
+        txLog.filteredAdd(
+            LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()), filter);
+      }
+    }
+    return rowColVal;
+  }
+
+  // TODO alot of these String methods may be more efficient if called the Byte version in this
+  // class... this would avoid conversion from Byte->String->Byte
+  @Override
+  public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+    Map<String, Map<Column, String>> rowColVal = txb.gets(rows, columns);
+    for (Map.Entry<String, Map<Column, String>> rowEntry : rowColVal.entrySet()) {
+      for (Map.Entry<Column, String> colEntry : rowEntry.getValue().entrySet()) {
+        txLog.filteredAdd(
+            LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()), filter);
+      }
+    }
+    return rowColVal;
+  }
+
+  @Override
+  public String gets(String row, Column col) {
+    String val = txb.gets(row, col);
+    if (val != null) {
+      txLog.filteredAdd(LogEntry.newGet(row, col, val), filter);
+    }
+    return val;
+  }
+
+  @Override
+  public Map<Column, String> gets(String row, Set<Column> columns) {
+    Map<Column, String> colVal = txb.gets(row, columns);
+    for (Map.Entry<Column, String> entry : colVal.entrySet()) {
+      txLog.filteredAdd(LogEntry.newGet(row, entry.getKey(), entry.getValue()), filter);
+    }
+    return colVal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/TxLog.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/TxLog.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/TxLog.java
new file mode 100644
index 0000000..caa3c4d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/TxLog.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.transaction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.RowColumn;
+
+/**
+ * Contains list of operations (GET, SET, DELETE) performed during a {@link RecordingTransaction}
+ */
+public class TxLog {
+
+  private List<LogEntry> logEntries = new ArrayList<>();
+
+  public TxLog() {}
+
+  /**
+   * Adds LogEntry to TxLog
+   */
+  public void add(LogEntry entry) {
+    logEntries.add(entry);
+  }
+
+  /**
+   * Adds LogEntry to TxLog if it passes filter
+   */
+  public void filteredAdd(LogEntry entry, Predicate<LogEntry> filter) {
+    if (filter.test(entry)) {
+      add(entry);
+    }
+  }
+
+  /**
+   * Returns all LogEntry in TxLog
+   */
+  public List<LogEntry> getLogEntries() {
+    return Collections.unmodifiableList(logEntries);
+  }
+
+  /**
+   * Returns true if TxLog is empty
+   */
+  public boolean isEmpty() {
+    return logEntries.isEmpty();
+  }
+
+  /**
+   * Returns a map of RowColumn changes given an operation
+   */
+  public Map<RowColumn, Bytes> getOperationMap(LogEntry.Operation op) {
+    Map<RowColumn, Bytes> opMap = new HashMap<>();
+    for (LogEntry entry : logEntries) {
+      if (entry.getOp().equals(op)) {
+        opMap.put(new RowColumn(entry.getRow(), entry.getColumn()), entry.getValue());
+      }
+    }
+    return opMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/Encoder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/Encoder.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/Encoder.java
new file mode 100644
index 0000000..f437b16
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/Encoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.data.Bytes;
+
+/**
+ * Transforms Java primitives to and from bytes using desired encoding
+ *
+ * @since 1.0.0
+ */
+public interface Encoder {
+
+  /**
+   * Encodes an integer to {@link Bytes}
+   */
+  Bytes encode(int i);
+
+  /**
+   * Encodes a long to {@link Bytes}
+   */
+  Bytes encode(long l);
+
+  /**
+   * Encodes a String to {@link Bytes}
+   */
+  Bytes encode(String s);
+
+  /**
+   * Encodes a float to {@link Bytes}
+   */
+  Bytes encode(float f);
+
+  /**
+   * Encodes a double to {@link Bytes}
+   */
+  Bytes encode(double d);
+
+  /**
+   * Encodes a boolean to {@link Bytes}
+   */
+  Bytes encode(boolean b);
+
+  /**
+   * Decodes an integer from {@link Bytes}
+   */
+  int decodeInteger(Bytes b);
+
+  /**
+   * Decodes a long from {@link Bytes}
+   */
+  long decodeLong(Bytes b);
+
+  /**
+   * Decodes a String from {@link Bytes}
+   */
+  String decodeString(Bytes b);
+
+  /**
+   * Decodes a float from {@link Bytes}
+   */
+  float decodeFloat(Bytes b);
+
+  /**
+   * Decodes a double from {@link Bytes}
+   */
+  double decodeDouble(Bytes b);
+
+  /**
+   * Decodes a boolean from {@link Bytes}
+   */
+  boolean decodeBoolean(Bytes b);
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/StringEncoder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/StringEncoder.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/StringEncoder.java
new file mode 100644
index 0000000..939f97a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/StringEncoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.data.Bytes;
+
+/**
+ * Transforms Java primitives to and from bytes using a String encoding
+ *
+ * @since 1.0.0
+ */
+public class StringEncoder implements Encoder {
+
+  @Override
+  public Bytes encode(int i) {
+    return encode(Integer.toString(i));
+  }
+
+  @Override
+  public Bytes encode(long l) {
+    return encode(Long.toString(l));
+  }
+
+  @Override
+  public Bytes encode(String s) {
+    return Bytes.of(s);
+  }
+
+  @Override
+  public Bytes encode(float f) {
+    return encode(Float.toString(f));
+  }
+
+  @Override
+  public Bytes encode(double d) {
+    return encode(Double.toString(d));
+  }
+
+  @Override
+  public Bytes encode(boolean b) {
+    return encode(Boolean.toString(b));
+  }
+
+  @Override
+  public int decodeInteger(Bytes b) {
+    return Integer.parseInt(decodeString(b));
+  }
+
+  @Override
+  public long decodeLong(Bytes b) {
+    return Long.parseLong(decodeString(b));
+  }
+
+  @Override
+  public String decodeString(Bytes b) {
+    return b.toString();
+  }
+
+  @Override
+  public float decodeFloat(Bytes b) {
+    return Float.parseFloat(decodeString(b));
+  }
+
+  @Override
+  public double decodeDouble(Bytes b) {
+    return Double.parseDouble(decodeString(b));
+  }
+
+  @Override
+  public boolean decodeBoolean(Bytes b) {
+    return Boolean.parseBoolean(decodeString(b));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypeLayer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypeLayer.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypeLayer.java
new file mode 100644
index 0000000..82639f4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypeLayer.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.nio.ByteBuffer;
+
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+/**
+ * A simple convenience layer for Fluo. This layer attempts to make the following common operations
+ * easier.
+ * 
+ * <UL>
+ * <LI>Working with different types.
+ * <LI>Supplying default values
+ * <LI>Dealing with null return types.
+ * <LI>Working with row/column and column maps
+ * </UL>
+ * 
+ * <p>
+ * This layer was intentionally loosely coupled with the basic API. This allows other convenience
+ * layers for Fluo to build directly on the basic API w/o having to consider the particulars of this
+ * layer. Also its expected that integration with other languages may only use the basic API.
+ * </p>
+ * 
+ * <h3>Using</h3>
+ * 
+ * <p>
+ * A TypeLayer is created with a certain encoder that is used for converting from bytes to
+ * primitives and visa versa. In order to ensure that all of your code uses the same encoder, its
+ * probably best to centralize the choice of an encoder within your project. There are many ways do
+ * to this, below is an example of one way to centralize and use.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   public class MyTypeLayer extends TypeLayer {
+ *     public MyTypeLayer() {
+ *       super(new MyEncoder());
+ *     }
+ *   }
+ *   
+ *   public class MyObserver extends TypedObserver {
+ *     MyObserver(){
+ *       super(new MyTypeLayer());
+ *     }
+ *     
+ *     public abstract void process(TypedTransaction tx, Bytes row, Column col){
+ *       //do something w/ typed transaction
+ *     }
+ *   }
+ *   
+ *   public class MyUtil {
+ *      //A little util to print out some stuff
+ *      public void printStuff(Snapshot snap, byte[] row){
+ *        TypedSnapshot tsnap = new MytTypeLayer().wrap(snap);
+ *        
+ *        System.out.println(tsnap.get().row(row).fam("b90000").qual(137).toString("NP"));
+ *      } 
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <h3>Working with different types</h3>
+ * 
+ * <p>
+ * The following example code shows using the basic fluo API with different types.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void process(Transaction tx, byte[] row, byte[] cf, int cq, long val){
+ *     tx.set(Bytes.of(row), new Column(Bytes.of(cf), Bytes.of(Integer.toString(cq))),
+ *        Bytes.of(Long.toString(val));
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <p>
+ * Alternatively, the same thing can be written using a {@link TypedTransactionBase} in the
+ * following way. Because row(), fam(), qual(), and set() each take many different types, this
+ * enables many different permutations that would not be achievable with overloading.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void process(TypedTransaction tx, byte[] r, byte[] cf, int cq, long v){
+ *     tx.mutate().row(r).fam(cf).qual(cq).set(v);
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <h3>Default values</h3>
+ * 
+ * <p>
+ * The following example code shows using the basic fluo API to read a value and default to zero if
+ * it does not exist.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void add(Transaction tx, byte[] row, Column col, long amount){
+ *     
+ *     long balance = 0;
+ *     Bytes bval = tx.get(Bytes.of(row), col);
+ *     if(bval != null)
+ *       balance = Long.parseLong(bval.toString());
+ *     
+ *     balance += amount;
+ *     
+ *     tx.set(Bytes.of(row), col, Bytes.of(Long.toString(amount)));
+ *     
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <p>
+ * Alternatively, the same thing can be written using a {@link TypedTransactionBase} in the
+ * following way. This code avoids the null check by supplying a default value of zero.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void add(TypedTransaction tx, byte[] r, Column c, long amount){
+ *     long balance = tx.get().row(r).col(c).toLong(0);
+ *     balance += amount;
+ *     tx.mutate().row(r).col(c).set(balance);
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <p>
+ * For this particular case, shorter code can be written by using the increment method.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void add(TypedTransaction tx, byte[] r, Column c, long amount){
+ *     tx.mutate().row(r).col(c).increment(amount);
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <h3>Null return types</h3>
+ * 
+ * <p>
+ * When using the basic API, you must ensure the return type is not null before converting a string
+ * or long.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void process(Transaction tx, byte[] row, Column col, long amount) {
+ *     Bytes val =  tx.get(Bytes.of(row), col);
+ *     if(val == null)
+ *       return;   
+ *     long balance = Long.parseLong(val.toString());
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <p>
+ * With {@link TypedTransactionBase} if no default value is supplied, then the null is passed
+ * through.
+ * </p>
+ * 
+ * <pre>
+ * <code>
+ * 
+ *   void process(TypedTransaction tx, byte[] r, Column c, long amount){
+ *     Long balance =  tx.get().row(r).col(c).toLong();
+ *     if(balance == null)
+ *       return;   
+ *   }
+ * </code>
+ * </pre>
+ * 
+ * <h3>Defaulted maps</h3>
+ * 
+ * <p>
+ * The operations that return maps, return defaulted maps which make it easy to specify defaults and
+ * avoid null.
+ * </p>
+ * 
+ * <pre>
+ * {@code
+ *   // pretend this method has curly braces.  javadoc has issues with less than.
+ * 
+ *   void process(TypedTransaction tx, byte[] r, Column c1, Column c2, Column c3, long amount)
+ * 
+ *     Map<Column, Value> columns = tx.get().row(r).columns(c1,c2,c3);
+ *     
+ *     // If c1 does not exist in map, a Value that wraps null will be returned.
+ *     // When c1 does not exist val1 will be set to null and no NPE will be thrown.
+ *     String val1 = columns.get(c1).toString();
+ *     
+ *     // If c2 does not exist in map, then val2 will be set to empty string.
+ *     String val2 = columns.get(c2).toString("");
+ *     
+ *     // If c3 does not exist in map, then val9 will be set to 9.
+ *     Long val3 = columns.get(c3).toLong(9);
+ * }
+ * </pre>
+ * 
+ * <p>
+ * This also applies to getting sets of rows.
+ * </p>
+ * 
+ * <pre>
+ * {@code
+ *   // pretend this method has curly braces.  javadoc has issues with less than.
+ * 
+ *   void process(TypedTransaction tx, List<String> rows, Column c1, Column c2, Column c3,
+ *     long amount)
+ * 
+ *     Map<String,Map<Column,Value>> rowCols =
+ *        tx.get().rowsString(rows).columns(c1,c2,c3).toStringMap();
+ *     
+ *     // this will set val1 to null if row does not exist in map and/or column does not
+ *     // exist in child map
+ *     String val1 = rowCols.get("row1").get(c1).toString();
+ * }
+ * </pre>
+ *
+ * @since 1.0.0
+ */
+public class TypeLayer {
+
+  private Encoder encoder;
+
+  static class Data {
+    Bytes row;
+    Bytes family;
+    Bytes qual;
+    Bytes vis;
+
+    Column getCol() {
+      if (qual == null) {
+        return new Column(family);
+      } else if (vis == null) {
+        return new Column(family, qual);
+      } else {
+        return new Column(family, qual, vis);
+      }
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public abstract class RowMethods<R> {
+
+    abstract R create(Data data);
+
+    public R row(String row) {
+      return row(encoder.encode(row));
+    }
+
+    public R row(int row) {
+      return row(encoder.encode(row));
+    }
+
+    public R row(long row) {
+      return row(encoder.encode(row));
+    }
+
+    public R row(byte[] row) {
+      return row(Bytes.of(row));
+    }
+
+    public R row(ByteBuffer row) {
+      return row(Bytes.of(row));
+    }
+
+    public R row(Bytes row) {
+      Data data = new Data();
+      data.row = row;
+      R result = create(data);
+      return result;
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public abstract class SimpleFamilyMethods<R1> {
+
+    Data data;
+
+    SimpleFamilyMethods(Data data) {
+      this.data = data;
+    }
+
+    abstract R1 create1(Data data);
+
+    public R1 fam(String family) {
+      return fam(encoder.encode(family));
+    }
+
+    public R1 fam(int family) {
+      return fam(encoder.encode(family));
+    }
+
+    public R1 fam(long family) {
+      return fam(encoder.encode(family));
+    }
+
+    public R1 fam(byte[] family) {
+      return fam(Bytes.of(family));
+    }
+
+    public R1 fam(ByteBuffer family) {
+      return fam(Bytes.of(family));
+    }
+
+    public R1 fam(Bytes family) {
+      data.family = family;
+      return create1(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public abstract class FamilyMethods<R1, R2> extends SimpleFamilyMethods<R1> {
+
+    FamilyMethods(Data data) {
+      super(data);
+    }
+
+    abstract R2 create2(Data data);
+
+    public R2 col(Column col) {
+      data.family = col.getFamily();
+      data.qual = col.getQualifier();
+      data.vis = col.getVisibility();
+      return create2(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public abstract class QualifierMethods<R> {
+
+    private Data data;
+
+    QualifierMethods(Data data) {
+      this.data = data;
+    }
+
+    abstract R create(Data data);
+
+    public R qual(String qualifier) {
+      return qual(encoder.encode(qualifier));
+    }
+
+    public R qual(int qualifier) {
+      return qual(encoder.encode(qualifier));
+    }
+
+    public R qual(long qualifier) {
+      return qual(encoder.encode(qualifier));
+    }
+
+    public R qual(byte[] qualifier) {
+      return qual(Bytes.of(qualifier));
+    }
+
+    public R qual(ByteBuffer qualifier) {
+      return qual(Bytes.of(qualifier));
+    }
+
+    public R qual(Bytes qualifier) {
+      data.qual = qualifier;
+      return create(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public static class VisibilityMethods {
+
+    private Data data;
+
+    VisibilityMethods(Data data) {
+      this.data = data;
+    }
+
+    public Column vis() {
+      return new Column(data.family, data.qual);
+    }
+
+    public Column vis(String cv) {
+      return vis(Bytes.of(cv));
+    }
+
+    public Column vis(Bytes cv) {
+      return new Column(data.family, data.qual, cv);
+    }
+
+    public Column vis(ByteBuffer cv) {
+      return vis(Bytes.of(cv));
+    }
+
+    public Column vis(byte[] cv) {
+      return vis(Bytes.of(cv));
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class CQB extends QualifierMethods<VisibilityMethods> {
+    CQB(Data data) {
+      super(data);
+    }
+
+    @Override
+    VisibilityMethods create(Data data) {
+      return new VisibilityMethods(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class CFB extends SimpleFamilyMethods<CQB> {
+    CFB() {
+      super(new Data());
+    }
+
+    @Override
+    CQB create1(Data data) {
+      return new CQB(data);
+    }
+  }
+
+  public TypeLayer(Encoder encoder) {
+    this.encoder = encoder;
+  }
+
+  /**
+   * Initiates the chain of calls needed to build a column.
+   * 
+   * @return a column builder
+   */
+  public CFB bc() {
+    return new CFB();
+  }
+
+  public TypedSnapshot wrap(Snapshot snap) {
+    return new TypedSnapshot(snap, encoder, this);
+  }
+
+  public TypedTransactionBase wrap(TransactionBase tx) {
+    return new TypedTransactionBase(tx, encoder, this);
+  }
+
+  public TypedTransaction wrap(Transaction tx) {
+    return new TypedTransaction(tx, encoder, this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedLoader.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedLoader.java
new file mode 100644
index 0000000..0e14062
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedLoader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.client.TransactionBase;
+
+/**
+ * A {@link Loader} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public abstract class TypedLoader implements Loader {
+
+  private final TypeLayer tl;
+
+  public TypedLoader() {
+    tl = new TypeLayer(new StringEncoder());
+  }
+
+  public TypedLoader(TypeLayer tl) {
+    this.tl = tl;
+  }
+
+  @Override
+  public void load(TransactionBase tx, Context context) throws Exception {
+    load(tl.wrap(tx), context);
+  }
+
+  public abstract void load(TypedTransactionBase tx, Context context) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedObserver.java
new file mode 100644
index 0000000..ca68285
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedObserver.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+
+/**
+ * An {@link AbstractObserver} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public abstract class TypedObserver extends AbstractObserver {
+
+  private final TypeLayer tl;
+
+  public TypedObserver() {
+    tl = new TypeLayer(new StringEncoder());
+  }
+
+  public TypedObserver(TypeLayer tl) {
+    this.tl = tl;
+  }
+
+  @Override
+  public void process(TransactionBase tx, Bytes row, Column col) {
+    process(tl.wrap(tx), row, col);
+  }
+
+  public abstract void process(TypedTransactionBase tx, Bytes row, Column col);
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshot.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshot.java
new file mode 100644
index 0000000..6c09764
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshot.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.client.Snapshot;
+
+/**
+ * A {@link Snapshot} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedSnapshot extends TypedSnapshotBase implements Snapshot {
+
+  private final Snapshot closeSnapshot;
+
+  TypedSnapshot(Snapshot snapshot, Encoder encoder, TypeLayer tl) {
+    super(snapshot, encoder, tl);
+    closeSnapshot = snapshot;
+  }
+
+  @Override
+  public void close() {
+    closeSnapshot.close();
+  }
+}