You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 22:07:53 UTC

[05/10] incubator-fluo-recipes git commit: Updated package names in core module

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
new file mode 100644
index 0000000..7764e67
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.map.DefaultedMap;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.recipes.core.types.TypeLayer.Data;
+import org.apache.fluo.recipes.core.types.TypeLayer.FamilyMethods;
+import org.apache.fluo.recipes.core.types.TypeLayer.QualifierMethods;
+import org.apache.fluo.recipes.core.types.TypeLayer.RowMethods;
+
+// TODO need to refactor column to use Encoder
+
+/**
+ * A {@link SnapshotBase} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedSnapshotBase implements SnapshotBase {
+
+  private SnapshotBase snapshot;
+  private Encoder encoder;
+  private TypeLayer tl;
+
+  /**
+   * @since 1.0.0
+   */
+  public class VisibilityMethods extends Value {
+
+    VisibilityMethods(Data data) {
+      super(data);
+    }
+
+    public Value vis(Bytes cv) {
+      data.vis = cv;
+      return new Value(data);
+    }
+
+    public Value vis(byte[] cv) {
+      data.vis = Bytes.of(cv);
+      return new Value(data);
+    }
+
+    public Value vis(ByteBuffer bb) {
+      data.vis = Bytes.of(bb);
+      return new Value(data);
+    }
+
+    public Value vis(String cv) {
+      data.vis = Bytes.of(cv);
+      return new Value(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class Value {
+    private Bytes bytes;
+    private boolean gotBytes = false;
+    Data data;
+
+    public Bytes getBytes() {
+      if (!gotBytes) {
+        try {
+          bytes = snapshot.get(data.row, data.getCol());
+          gotBytes = true;
+        } catch (Exception e) {
+          if (e instanceof RuntimeException) {
+            throw (RuntimeException) e;
+          }
+          throw new RuntimeException(e);
+        }
+      }
+
+      return bytes;
+    }
+
+    private Value(Bytes bytes) {
+      this.bytes = bytes;
+      this.gotBytes = true;
+    }
+
+    private Value(Data data) {
+      this.data = data;
+      this.gotBytes = false;
+    }
+
+    public Integer toInteger() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeInteger(getBytes());
+    }
+
+    public int toInteger(int defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeInteger(getBytes());
+    }
+
+    public Long toLong() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeLong(getBytes());
+    }
+
+    public long toLong(long defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeLong(getBytes());
+    }
+
+    @Override
+    public String toString() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeString(getBytes());
+    }
+
+    public String toString(String defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeString(getBytes());
+    }
+
+    public Float toFloat() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeFloat(getBytes());
+    }
+
+    public float toFloat(float defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeFloat(getBytes());
+    }
+
+    public Double toDouble() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeDouble(getBytes());
+    }
+
+    public double toDouble(double defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeDouble(getBytes());
+    }
+
+    public Boolean toBoolean() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return encoder.decodeBoolean(getBytes());
+    }
+
+    public boolean toBoolean(boolean defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return encoder.decodeBoolean(getBytes());
+    }
+
+    public byte[] toBytes() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return getBytes().toArray();
+    }
+
+    public byte[] toBytes(byte[] defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return getBytes().toArray();
+    }
+
+    public ByteBuffer toByteBuffer() {
+      if (getBytes() == null) {
+        return null;
+      }
+      return ByteBuffer.wrap(getBytes().toArray());
+    }
+
+    public ByteBuffer toByteBuffer(ByteBuffer defaultValue) {
+      if (getBytes() == null) {
+        return defaultValue;
+      }
+      return toByteBuffer();
+    }
+
+    @Override
+    public int hashCode() {
+      if (getBytes() == null) {
+        return 0;
+      }
+
+      return getBytes().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof Value) {
+        Value ov = (Value) o;
+        if (getBytes() == null) {
+          return ov.getBytes() == null;
+        } else {
+          return getBytes().equals(ov.getBytes());
+        }
+      }
+
+      return false;
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class ValueQualifierBuilder extends QualifierMethods<VisibilityMethods> {
+
+    ValueQualifierBuilder(Data data) {
+      tl.super(data);
+    }
+
+    @Override
+    VisibilityMethods create(Data data) {
+      return new VisibilityMethods(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class ValueFamilyMethods extends FamilyMethods<ValueQualifierBuilder, Value> {
+
+    ValueFamilyMethods(Data data) {
+      tl.super(data);
+    }
+
+    @Override
+    ValueQualifierBuilder create1(Data data) {
+      return new ValueQualifierBuilder(data);
+    }
+
+    @Override
+    Value create2(Data data) {
+      return new Value(data);
+    }
+
+    public Map<Column, Value> columns(Set<Column> columns) {
+      try {
+        return wrap(snapshot.get(data.row, columns));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public Map<Column, Value> columns(Column... columns) {
+      try {
+        return wrap(snapshot.get(data.row, new HashSet<>(Arrays.asList(columns))));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class MapConverter {
+    private Collection<Bytes> rows;
+    private Set<Column> columns;
+
+    public MapConverter(Collection<Bytes> rows, Set<Column> columns) {
+      this.rows = rows;
+      this.columns = columns;
+    }
+
+    private Map<Bytes, Map<Column, Bytes>> getInput() {
+      try {
+        return snapshot.get(rows, columns);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private Map wrap2(Map m) {
+      return Collections.unmodifiableMap(DefaultedMap.decorate(m, new DefaultedMap(new Value(
+          (Bytes) null))));
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<String, Map<Column, Value>> toStringMap() {
+      Map<Bytes, Map<Column, Bytes>> in = getInput();
+      Map<String, Map<Column, Value>> out = new HashMap<>();
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+        out.put(encoder.decodeString(rowEntry.getKey()), wrap(rowEntry.getValue()));
+      }
+
+      return wrap2(out);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<Long, Map<Column, Value>> toLongMap() {
+      Map<Bytes, Map<Column, Bytes>> in = getInput();
+      Map<Long, Map<Column, Value>> out = new HashMap<>();
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+        out.put(encoder.decodeLong(rowEntry.getKey()), wrap(rowEntry.getValue()));
+      }
+
+      return wrap2(out);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<Integer, Map<Column, Value>> toIntegerMap() {
+      Map<Bytes, Map<Column, Bytes>> in = getInput();
+      Map<Integer, Map<Column, Value>> out = new HashMap<>();
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+        out.put(encoder.decodeInteger(rowEntry.getKey()), wrap(rowEntry.getValue()));
+      }
+
+      return wrap2(out);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<Bytes, Map<Column, Value>> toBytesMap() {
+      Map<Bytes, Map<Column, Bytes>> in = getInput();
+      Map<Bytes, Map<Column, Value>> out = new HashMap<>();
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+        out.put(rowEntry.getKey(), wrap(rowEntry.getValue()));
+      }
+
+      return wrap2(out);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class ColumnsMethods {
+    private Collection<Bytes> rows;
+
+    public ColumnsMethods(Collection<Bytes> rows) {
+      this.rows = rows;
+    }
+
+    public MapConverter columns(Set<Column> columns) {
+      return new MapConverter(rows, columns);
+    }
+
+    public MapConverter columns(Column... columns) {
+      return columns(new HashSet<>(Arrays.asList(columns)));
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class ValueRowMethods extends RowMethods<ValueFamilyMethods> {
+
+    ValueRowMethods() {
+      tl.super();
+    }
+
+    @Override
+    ValueFamilyMethods create(Data data) {
+      return new ValueFamilyMethods(data);
+    }
+
+    public ColumnsMethods rows(Collection<Bytes> rows) {
+      return new ColumnsMethods(rows);
+    }
+
+    public ColumnsMethods rows(Bytes... rows) {
+      return new ColumnsMethods(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsString(String... rows) {
+      return rowsString(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsString(Collection<String> rows) {
+      ArrayList<Bytes> conv = new ArrayList<>();
+      for (String row : rows) {
+        conv.add(encoder.encode(row));
+      }
+
+      return rows(conv);
+    }
+
+    public ColumnsMethods rowsLong(Long... rows) {
+      return rowsLong(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsLong(Collection<Long> rows) {
+      ArrayList<Bytes> conv = new ArrayList<>();
+      for (Long row : rows) {
+        conv.add(encoder.encode(row));
+      }
+
+      return rows(conv);
+    }
+
+    public ColumnsMethods rowsInteger(Integer... rows) {
+      return rowsInteger(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsInteger(Collection<Integer> rows) {
+      ArrayList<Bytes> conv = new ArrayList<>();
+      for (Integer row : rows) {
+        conv.add(encoder.encode(row));
+      }
+
+      return rows(conv);
+    }
+
+    public ColumnsMethods rowsBytes(byte[]... rows) {
+      return rowsBytes(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsBytes(Collection<byte[]> rows) {
+      ArrayList<Bytes> conv = new ArrayList<>();
+      for (byte[] row : rows) {
+        conv.add(Bytes.of(row));
+      }
+
+      return rows(conv);
+    }
+
+    public ColumnsMethods rowsByteBuffers(ByteBuffer... rows) {
+      return rowsByteBuffers(Arrays.asList(rows));
+    }
+
+    public ColumnsMethods rowsByteBuffers(Collection<ByteBuffer> rows) {
+      ArrayList<Bytes> conv = new ArrayList<>();
+      for (ByteBuffer row : rows) {
+        conv.add(Bytes.of(row));
+      }
+
+      return rows(conv);
+    }
+
+  }
+
+  TypedSnapshotBase(SnapshotBase snapshot, Encoder encoder, TypeLayer tl) {
+    this.snapshot = snapshot;
+    this.encoder = encoder;
+    this.tl = tl;
+  }
+
+  @Override
+  public Bytes get(Bytes row, Column column) {
+    return snapshot.get(row, column);
+  }
+
+  @Override
+  public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+    return snapshot.get(row, columns);
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+    return snapshot.get(rowColumns);
+  }
+
+  @Override
+  public RowIterator get(ScannerConfiguration config) {
+    return snapshot.get(config);
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+    return snapshot.get(rows, columns);
+  }
+
+  public ValueRowMethods get() {
+    return new ValueRowMethods();
+  }
+
+  @SuppressWarnings({"unchecked"})
+  private Map<Column, Value> wrap(Map<Column, Bytes> map) {
+    Map<Column, Value> ret = Maps.transformValues(map, input -> new Value(input));
+    return Collections.unmodifiableMap(DefaultedMap.decorate(ret, new Value((Bytes) null)));
+  }
+
+  @Override
+  public long getStartTimestamp() {
+    return snapshot.getStartTimestamp();
+  }
+
+  @Override
+  public String gets(String row, Column column) {
+    return snapshot.gets(row, column);
+  }
+
+  @Override
+  public Map<Column, String> gets(String row, Set<Column> columns) {
+    return snapshot.gets(row, columns);
+  }
+
+  @Override
+  public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+    return snapshot.gets(rows, columns);
+  }
+
+  @Override
+  public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+    return snapshot.gets(rowColumns);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java
new file mode 100644
index 0000000..17631e0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.exceptions.CommitException;
+
+/**
+ * A {@link Transaction} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedTransaction extends TypedTransactionBase implements Transaction {
+
+  private final Transaction closeTx;
+
+  @VisibleForTesting
+  protected TypedTransaction(Transaction tx, Encoder encoder, TypeLayer tl) {
+    super(tx, encoder, tl);
+    closeTx = tx;
+  }
+
+  @Override
+  public void commit() throws CommitException {
+    closeTx.commit();
+  }
+
+  @Override
+  public void close() {
+    closeTx.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java
new file mode 100644
index 0000000..69ec694
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.nio.ByteBuffer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+import org.apache.fluo.recipes.core.types.TypeLayer.Data;
+import org.apache.fluo.recipes.core.types.TypeLayer.FamilyMethods;
+import org.apache.fluo.recipes.core.types.TypeLayer.QualifierMethods;
+import org.apache.fluo.recipes.core.types.TypeLayer.RowMethods;
+
+/**
+ * A {@link TransactionBase} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedTransactionBase extends TypedSnapshotBase implements TransactionBase {
+
+  private final TransactionBase tx;
+  private final Encoder encoder;
+  private final TypeLayer tl;
+
+  /**
+   * @since 1.0.0
+   */
+  public class Mutator {
+
+    private boolean set = false;
+    Data data;
+
+    Mutator(Data data) {
+      this.data = data;
+    }
+
+    void checkNotSet() {
+      if (set) {
+        throw new IllegalStateException("Already set value");
+      }
+    }
+
+    public void set(Bytes bytes) throws AlreadySetException {
+      checkNotSet();
+      tx.set(data.row, data.getCol(), bytes);
+      set = true;
+    }
+
+    public void set(String s) throws AlreadySetException {
+      set(encoder.encode(s));
+    }
+
+    public void set(int i) throws AlreadySetException {
+      set(encoder.encode(i));
+    }
+
+    public void set(long l) throws AlreadySetException {
+      set(encoder.encode(l));
+    }
+
+    public void set(float f) throws AlreadySetException {
+      set(encoder.encode(f));
+    }
+
+    public void set(double d) throws AlreadySetException {
+      set(encoder.encode(d));
+    }
+
+    public void set(boolean b) throws AlreadySetException {
+      set(encoder.encode(b));
+    }
+
+    public void set(byte[] ba) throws AlreadySetException {
+      set(Bytes.of(ba));
+    }
+
+    public void set(ByteBuffer bb) throws AlreadySetException {
+      set(Bytes.of(bb));
+    }
+
+    /**
+     * Set an empty value
+     */
+    public void set() throws AlreadySetException {
+      set(Bytes.EMPTY);
+    }
+
+    /**
+     * Reads the current value of the row/column, adds i, sets the sum. If the row/column does not
+     * have a current value, then it defaults to zero.
+     *
+     * @param i Integer increment amount
+     * @throws AlreadySetException if value was previously set in transaction
+     */
+    public void increment(int i) throws AlreadySetException {
+      checkNotSet();
+      Bytes val = tx.get(data.row, data.getCol());
+      int v = 0;
+      if (val != null) {
+        v = encoder.decodeInteger(val);
+      }
+      tx.set(data.row, data.getCol(), encoder.encode(v + i));
+    }
+
+    /**
+     * Reads the current value of the row/column, adds l, sets the sum. If the row/column does not
+     * have a current value, then it defaults to zero.
+     *
+     * @param l Long increment amount
+     * @throws AlreadySetException if value was previously set in transaction
+     */
+    public void increment(long l) throws AlreadySetException {
+      checkNotSet();
+      Bytes val = tx.get(data.row, data.getCol());
+      long v = 0;
+      if (val != null) {
+        v = encoder.decodeLong(val);
+      }
+      tx.set(data.row, data.getCol(), encoder.encode(v + l));
+    }
+
+    public void delete() throws AlreadySetException {
+      checkNotSet();
+      tx.delete(data.row, data.getCol());
+      set = true;
+    }
+
+    public void weaklyNotify() {
+      checkNotSet();
+      tx.setWeakNotification(data.row, data.getCol());
+      set = true;
+    }
+
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class VisibilityMutator extends Mutator {
+
+    VisibilityMutator(Data data) {
+      super(data);
+    }
+
+    public Mutator vis(String cv) {
+      checkNotSet();
+      data.vis = Bytes.of(cv);
+      return new Mutator(data);
+    }
+
+    public Mutator vis(Bytes cv) {
+      checkNotSet();
+      data.vis = cv;
+      return new Mutator(data);
+    }
+
+    public Mutator vis(byte[] cv) {
+      checkNotSet();
+      data.vis = Bytes.of(cv);
+      return new Mutator(data);
+    }
+
+    public Mutator vis(ByteBuffer cv) {
+      checkNotSet();
+      data.vis = Bytes.of(cv);
+      return new Mutator(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class MutatorQualifierMethods extends QualifierMethods<VisibilityMutator> {
+
+    MutatorQualifierMethods(Data data) {
+      tl.super(data);
+    }
+
+    @Override
+    VisibilityMutator create(Data data) {
+      return new VisibilityMutator(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class MutatorFamilyMethods extends FamilyMethods<MutatorQualifierMethods, Mutator> {
+
+    MutatorFamilyMethods(Data data) {
+      tl.super(data);
+    }
+
+    @Override
+    MutatorQualifierMethods create1(Data data) {
+      return new MutatorQualifierMethods(data);
+    }
+
+    @Override
+    Mutator create2(Data data) {
+      return new Mutator(data);
+    }
+  }
+
+  /**
+   * @since 1.0.0
+   */
+  public class MutatorRowMethods extends RowMethods<MutatorFamilyMethods> {
+
+    MutatorRowMethods() {
+      tl.super();
+    }
+
+    @Override
+    MutatorFamilyMethods create(Data data) {
+      return new MutatorFamilyMethods(data);
+    }
+
+  }
+
+  @VisibleForTesting
+  protected TypedTransactionBase(TransactionBase tx, Encoder encoder, TypeLayer tl) {
+    super(tx, encoder, tl);
+    this.tx = tx;
+    this.encoder = encoder;
+    this.tl = tl;
+  }
+
+  public MutatorRowMethods mutate() {
+    return new MutatorRowMethods();
+  }
+
+  @Override
+  public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
+    tx.set(row, col, value);
+  }
+
+  @Override
+  public void set(String row, Column col, String value) throws AlreadySetException {
+    tx.set(row, col, value);
+  }
+
+  @Override
+  public void setWeakNotification(Bytes row, Column col) {
+    tx.setWeakNotification(row, col);
+  }
+
+  @Override
+  public void setWeakNotification(String row, Column col) {
+    tx.setWeakNotification(row, col);
+  }
+
+  @Override
+  public void delete(Bytes row, Column col) throws AlreadySetException {
+    tx.delete(row, col);
+  }
+
+  @Override
+  public void delete(String row, Column col) {
+    tx.delete(row, col);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java b/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java
deleted file mode 100644
index 2501fa1..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.data;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.hash.Hashing;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.BytesBuilder;
-import org.apache.fluo.recipes.common.Pirtos;
-
-/**
- * This recipe provides code to help add a hash of the row as a prefix of the row. Using this recipe
- * rows are structured like the following.
- * 
- * <p>
- * {@code <prefix>:<fixed len row hash>:<user row>}
- * 
- * <p>
- * The recipe also provides code the help generate split points and configure balancing of the
- * prefix.
- * 
- * <p>
- * The project documentation has more information.
- */
-public class RowHasher {
-
-  private static final int HASH_LEN = 4;
-
-  public Pirtos getTableOptimizations(int numTablets) {
-
-    List<Bytes> splits = new ArrayList<>(numTablets - 1);
-
-    int numSplits = numTablets - 1;
-    int distance = (((int) Math.pow(Character.MAX_RADIX, HASH_LEN) - 1) / numTablets) + 1;
-    int split = distance;
-    for (int i = 0; i < numSplits; i++) {
-      splits.add(Bytes.of(prefix
-          + Strings.padStart(Integer.toString(split, Character.MAX_RADIX), HASH_LEN, '0')));
-      split += distance;
-    }
-
-    splits.add(Bytes.of(prefix + "~"));
-
-
-    Pirtos pirtos = new Pirtos();
-    pirtos.setSplits(splits);
-    pirtos.setTabletGroupingRegex(Pattern.quote(prefix.toString()));
-
-    return pirtos;
-  }
-
-
-  private Bytes prefix;
-
-  public RowHasher(String prefix) {
-    this.prefix = Bytes.of(prefix + ":");
-  }
-
-  /**
-   * @return Returns input with prefix and hash of input prepended.
-   */
-  public Bytes addHash(String row) {
-    return addHash(Bytes.of(row));
-  }
-
-  /**
-   * @return Returns input with prefix and hash of input prepended.
-   */
-  public Bytes addHash(Bytes row) {
-    BytesBuilder builder = Bytes.newBuilder(prefix.length() + 5 + row.length());
-    builder.append(prefix);
-    builder.append(genHash(row));
-    builder.append(":");
-    builder.append(row);
-    return builder.toBytes();
-  }
-
-  private boolean hasHash(Bytes row) {
-    for (int i = prefix.length(); i < prefix.length() + HASH_LEN; i++) {
-      byte b = row.byteAt(i);
-      boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9');
-      if (!isAlphaNum) {
-        return false;
-      }
-    }
-
-    if (row.byteAt(prefix.length() - 1) != ':' || row.byteAt(prefix.length() + HASH_LEN) != ':') {
-      return false;
-    }
-
-    return true;
-  }
-
-  /**
-   * @return Returns input with prefix and hash stripped from beginning.
-   */
-  public Bytes removeHash(Bytes row) {
-    Preconditions.checkArgument(row.length() >= prefix.length() + 5,
-        "Row is shorter than expected " + row);
-    Preconditions.checkArgument(row.subSequence(0, prefix.length()).equals(prefix),
-        "Row does not have expected prefix " + row);
-    Preconditions.checkArgument(hasHash(row), "Row does not have expected hash " + row);
-    return row.subSequence(prefix.length() + 5, row.length());
-  }
-
-  private static String genHash(Bytes row) {
-    int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt();
-    hash = hash & 0x7fffffff;
-    // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is
-    // nice for debugging.
-    String hashString =
-        Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0');
-    hashString = hashString.substring(hashString.length() - HASH_LEN);
-
-    return hashString;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java
deleted file mode 100644
index c477ab1..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Objects;
-
-public class Export<K, V> {
-  private final K key;
-  private final V value;
-
-  public Export(K key, V val) {
-    Objects.requireNonNull(key);
-    Objects.requireNonNull(val);
-    this.key = key;
-    this.value = val;
-  }
-
-  public K getKey() {
-    return key;
-  }
-
-  public V getValue() {
-    return value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java
deleted file mode 100644
index fa9bb45..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import com.google.common.base.Preconditions;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.recipes.impl.BucketUtil;
-import org.apache.fluo.recipes.types.StringEncoder;
-import org.apache.fluo.recipes.types.TypeLayer;
-import org.apache.fluo.recipes.types.TypedTransactionBase;
-
-/**
- * This class encapsulates a buckets serialization code.
- */
-class ExportBucket {
-  private static final String NOTIFICATION_CF = "fluoRecipes";
-  private static final String NOTIFICATION_CQ_PREFIX = "eq:";
-  private static final Column EXPORT_COL = new Column("e", "v");
-  private static final Column NEXT_COL = new Column("e", "next");
-
-  static Column newNotificationColumn(String queueId) {
-    return new Column(NOTIFICATION_CF, NOTIFICATION_CQ_PREFIX + queueId);
-  }
-
-  private final TypedTransactionBase ttx;
-  private final String qid;
-  private final Bytes bucketRow;
-
-  static Bytes generateBucketRow(String qid, int bucket, int numBuckets) {
-    return Bytes.of(qid + ":" + BucketUtil.genBucketId(bucket, numBuckets));
-  }
-
-  ExportBucket(TransactionBase tx, String qid, int bucket, int numBuckets) {
-    // TODO encode in a more robust way... but for now fail early
-    Preconditions.checkArgument(!qid.contains(":"), "Export QID can not contain :");
-    this.ttx = new TypeLayer(new StringEncoder()).wrap(tx);
-    this.qid = qid;
-    this.bucketRow = generateBucketRow(qid, bucket, numBuckets);
-  }
-
-  ExportBucket(TransactionBase tx, Bytes bucketRow) {
-    this.ttx = new TypeLayer(new StringEncoder()).wrap(tx);
-
-    int colonLoc = -1;
-
-    for (int i = 0; i < bucketRow.length(); i++) {
-      if (bucketRow.byteAt(i) == ':') {
-        colonLoc = i;
-        break;
-      }
-    }
-
-    Preconditions.checkArgument(colonLoc != -1 && colonLoc != bucketRow.length(),
-        "Invalid bucket row " + bucketRow);
-    Preconditions.checkArgument(bucketRow.byteAt(bucketRow.length() - 1) == ':',
-        "Invalid bucket row " + bucketRow);
-
-    this.bucketRow = bucketRow.subSequence(0, bucketRow.length() - 1);
-    this.qid = bucketRow.subSequence(0, colonLoc).toString();
-  }
-
-  private static byte[] encSeq(long l) {
-    byte[] ret = new byte[8];
-    ret[0] = (byte) (l >>> 56);
-    ret[1] = (byte) (l >>> 48);
-    ret[2] = (byte) (l >>> 40);
-    ret[3] = (byte) (l >>> 32);
-    ret[4] = (byte) (l >>> 24);
-    ret[5] = (byte) (l >>> 16);
-    ret[6] = (byte) (l >>> 8);
-    ret[7] = (byte) (l >>> 0);
-    return ret;
-  }
-
-  private static long decodeSeq(Bytes seq) {
-    return (((long) seq.byteAt(0) << 56) + ((long) (seq.byteAt(1) & 255) << 48)
-        + ((long) (seq.byteAt(2) & 255) << 40) + ((long) (seq.byteAt(3) & 255) << 32)
-        + ((long) (seq.byteAt(4) & 255) << 24) + ((seq.byteAt(5) & 255) << 16)
-        + ((seq.byteAt(6) & 255) << 8) + ((seq.byteAt(7) & 255) << 0));
-  }
-
-
-  public void add(long seq, byte[] key, byte[] value) {
-    Bytes row =
-        Bytes.newBuilder(bucketRow.length() + 1 + key.length + 8).append(bucketRow).append(":")
-            .append(key).append(encSeq(seq)).toBytes();
-    ttx.set(row, EXPORT_COL, Bytes.of(value));
-  }
-
-  /**
-   * Computes the minimial row for a bucket
-   */
-  private Bytes getMinimalRow() {
-    return Bytes.newBuilder(bucketRow.length() + 1).append(bucketRow).append(":").toBytes();
-  }
-
-  public void notifyExportObserver() {
-    ttx.mutate().row(getMinimalRow()).col(newNotificationColumn(qid)).weaklyNotify();
-  }
-
-  public Iterator<ExportEntry> getExportIterator(Bytes continueRow) {
-    ScannerConfiguration sc = new ScannerConfiguration();
-
-    if (continueRow != null) {
-      Span tmpSpan = Span.prefix(bucketRow);
-      Span nextSpan =
-          new Span(new RowColumn(continueRow, EXPORT_COL), true, tmpSpan.getEnd(),
-              tmpSpan.isEndInclusive());
-      sc.setSpan(nextSpan);
-    } else {
-      sc.setSpan(Span.prefix(bucketRow));
-    }
-
-    sc.fetchColumn(EXPORT_COL.getFamily(), EXPORT_COL.getQualifier());
-    RowIterator iter = ttx.get(sc);
-
-    if (iter.hasNext()) {
-      return new ExportIterator(iter);
-    } else {
-      return Collections.<ExportEntry>emptySet().iterator();
-    }
-  }
-
-  private class ExportIterator implements Iterator<ExportEntry> {
-
-    private RowIterator rowIter;
-    private Bytes lastRow;
-
-    public ExportIterator(RowIterator rowIter) {
-      this.rowIter = rowIter;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return rowIter.hasNext();
-    }
-
-    @Override
-    public ExportEntry next() {
-      Entry<Bytes, ColumnIterator> rowCol = rowIter.next();
-      Bytes row = rowCol.getKey();
-
-      Bytes keyBytes = row.subSequence(bucketRow.length() + 1, row.length() - 8);
-      Bytes seqBytes = row.subSequence(row.length() - 8, row.length());
-
-      ExportEntry ee = new ExportEntry();
-
-      ee.key = keyBytes.toArray();
-      ee.seq = decodeSeq(seqBytes);
-      // TODO maybe leave as Bytes?
-      ee.value = rowCol.getValue().next().getValue().toArray();
-
-      lastRow = row;
-
-      return ee;
-    }
-
-    @Override
-    public void remove() {
-      ttx.mutate().row(lastRow).col(EXPORT_COL).delete();
-    }
-  }
-
-  public Bytes getContinueRow() {
-    return ttx.get(getMinimalRow(), NEXT_COL);
-  }
-
-  public void setContinueRow(ExportEntry ee) {
-    Bytes nextRow =
-        Bytes.newBuilder(bucketRow.length() + 1 + ee.key.length + 8).append(bucketRow).append(":")
-            .append(ee.key).append(encSeq(ee.seq)).toBytes();
-
-    ttx.set(getMinimalRow(), NEXT_COL, nextRow);
-  }
-
-  public void clearContinueRow() {
-    ttx.delete(getMinimalRow(), NEXT_COL);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java
deleted file mode 100644
index 1b156b9..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-class ExportEntry {
-  byte[] key;
-  long seq;
-  byte[] value;
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java
deleted file mode 100644
index 972af6e..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import com.google.common.collect.Iterators;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-
-public class ExportObserver<K, V> extends AbstractObserver {
-
-  private static class MemLimitIterator implements Iterator<ExportEntry> {
-
-    private long memConsumed = 0;
-    private long memLimit;
-    private int extraPerKey;
-    private Iterator<ExportEntry> source;
-
-    public MemLimitIterator(Iterator<ExportEntry> input, long limit, int extraPerKey) {
-      this.source = input;
-      this.memLimit = limit;
-      this.extraPerKey = extraPerKey;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return memConsumed < memLimit && source.hasNext();
-    }
-
-    @Override
-    public ExportEntry next() {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
-      }
-      ExportEntry ee = source.next();
-      memConsumed += ee.key.length + extraPerKey + ee.value.length;
-      return ee;
-    }
-
-    @Override
-    public void remove() {
-      source.remove();
-    }
-  }
-
-  private String queueId;
-  private Class<K> keyType;
-  private Class<V> valType;
-  SimpleSerializer serializer;
-  private Exporter<K, V> exporter;
-
-  private long memLimit;
-
-  protected String getQueueId() {
-    return queueId;
-  }
-
-  SimpleSerializer getSerializer() {
-    return serializer;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void init(Context context) throws Exception {
-    queueId = context.getParameters().get("queueId");
-    ExportQueue.Options opts = new ExportQueue.Options(queueId, context.getAppConfiguration());
-
-    // TODO defer loading classes... so that not done during fluo init
-    // TODO move class loading to centralized place... also attempt to check type params
-    keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType);
-    valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType);
-    exporter =
-        getClass().getClassLoader().loadClass(opts.exporterType).asSubclass(Exporter.class)
-            .newInstance();
-
-    serializer = SimpleSerializer.getInstance(context.getAppConfiguration());
-
-    memLimit = opts.getBufferSize();
-
-    exporter.init(queueId, context);
-  }
-
-  @Override
-  public ObservedColumn getObservedColumn() {
-    return new ObservedColumn(ExportBucket.newNotificationColumn(queueId), NotificationType.WEAK);
-  }
-
-  @Override
-  public void process(TransactionBase tx, Bytes row, Column column) throws Exception {
-    ExportBucket bucket = new ExportBucket(tx, row);
-
-    Bytes continueRow = bucket.getContinueRow();
-
-    Iterator<ExportEntry> input = bucket.getExportIterator(continueRow);
-    MemLimitIterator memLimitIter = new MemLimitIterator(input, memLimit, 8 + queueId.length());
-
-    Iterator<SequencedExport<K, V>> exportIterator =
-        Iterators.transform(
-            memLimitIter,
-            ee -> new SequencedExport<>(serializer.deserialize(ee.key, keyType), serializer
-                .deserialize(ee.value, valType), ee.seq));
-
-    exportIterator = Iterators.consumingIterator(exportIterator);
-
-    exporter.processExports(exportIterator);
-
-    if (input.hasNext()) {
-      // not everything was processed so notify self
-      bucket.notifyExportObserver();
-
-      if (!memLimitIter.hasNext()) {
-        // stopped because of mem limit... set continue key
-        bucket.setContinueRow(input.next());
-        continueRow = null;
-      }
-    }
-
-    if (continueRow != null) {
-      bucket.clearContinueRow();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java
deleted file mode 100644
index 13518e7..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-import com.google.common.hash.Hashing;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.recipes.common.Pirtos;
-import org.apache.fluo.recipes.common.RowRange;
-import org.apache.fluo.recipes.common.TransientRegistry;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-
-public class ExportQueue<K, V> {
-
-  private static final String RANGE_BEGIN = "#";
-  private static final String RANGE_END = ":~";
-
-  private int numBuckets;
-  private SimpleSerializer serializer;
-  private String queueId;
-
-  // usage hint : could be created once in an observers init method
-  // usage hint : maybe have a queue for each type of data being exported???
-  // maybe less queues are
-  // more efficient though because more batching at export time??
-  ExportQueue(Options opts, SimpleSerializer serializer) throws Exception {
-    // TODO sanity check key type based on type params
-    // TODO defer creating classes until needed.. so that its not done during Fluo init
-    this.queueId = opts.queueId;
-    this.numBuckets = opts.numBuckets;
-    this.serializer = serializer;
-  }
-
-  public void add(TransactionBase tx, K key, V value) {
-    addAll(tx, Collections.singleton(new Export<>(key, value)).iterator());
-  }
-
-  public void addAll(TransactionBase tx, Iterator<Export<K, V>> exports) {
-
-    Set<Integer> bucketsNotified = new HashSet<>();
-    while (exports.hasNext()) {
-      Export<K, V> export = exports.next();
-
-      byte[] k = serializer.serialize(export.getKey());
-      byte[] v = serializer.serialize(export.getValue());
-
-      int hash = Hashing.murmur3_32().hashBytes(k).asInt();
-      int bucketId = Math.abs(hash % numBuckets);
-
-      ExportBucket bucket = new ExportBucket(tx, queueId, bucketId, numBuckets);
-      bucket.add(tx.getStartTimestamp(), k, v);
-
-      if (!bucketsNotified.contains(bucketId)) {
-        bucket.notifyExportObserver();
-        bucketsNotified.add(bucketId);
-      }
-    }
-  }
-
-  public static <K2, V2> ExportQueue<K2, V2> getInstance(String exportQueueId,
-      SimpleConfiguration appConfig) {
-    Options opts = new Options(exportQueueId, appConfig);
-    try {
-      return new ExportQueue<>(opts, SimpleSerializer.getInstance(appConfig));
-    } catch (Exception e) {
-      // TODO
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Call this method before initializing Fluo.
-   *
-   * @param fluoConfig The configuration that will be used to initialize fluo.
-   */
-  public static void configure(FluoConfiguration fluoConfig, Options opts) {
-    SimpleConfiguration appConfig = fluoConfig.getAppConfiguration();
-    opts.save(appConfig);
-
-    fluoConfig.addObserver(new ObserverConfiguration(ExportObserver.class.getName())
-        .setParameters(Collections.singletonMap("queueId", opts.queueId)));
-
-    Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN);
-    Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END);
-
-    new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("exportQueue."
-        + opts.queueId, new RowRange(exportRangeStart, exportRangeStop));
-  }
-
-  /**
-   * Return suggested Fluo table optimizations for all previously configured export queues.
-   *
-   * @param appConfig Must pass in the application configuration obtained from
-   *        {@code FluoClient.getAppConfiguration()} or
-   *        {@code FluoConfiguration.getAppConfiguration()}
-   */
-
-  public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
-    HashSet<String> queueIds = new HashSet<>();
-    appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
-        k -> queueIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
-
-    Pirtos pirtos = new Pirtos();
-    queueIds.forEach(qid -> pirtos.merge(getTableOptimizations(qid, appConfig)));
-
-    return pirtos;
-  }
-
-  /**
-   * Return suggested Fluo table optimizations for the specified export queue.
-   *
-   * @param appConfig Must pass in the application configuration obtained from
-   *        {@code FluoClient.getAppConfiguration()} or
-   *        {@code FluoConfiguration.getAppConfiguration()}
-   */
-  public static Pirtos getTableOptimizations(String queueId, SimpleConfiguration appConfig) {
-    Options opts = new Options(queueId, appConfig);
-
-    List<Bytes> splits = new ArrayList<>();
-
-    Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN);
-    Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END);
-
-    splits.add(exportRangeStart);
-    splits.add(exportRangeStop);
-
-    List<Bytes> exportSplits = new ArrayList<>();
-    for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
-      exportSplits.add(ExportBucket.generateBucketRow(opts.queueId, i, opts.numBuckets));
-    }
-    Collections.sort(exportSplits);
-    splits.addAll(exportSplits);
-
-    Pirtos pirtos = new Pirtos();
-    pirtos.setSplits(splits);
-
-    // the tablet with end row <queueId># does not contain any data for the export queue and
-    // should not be grouped with the export queue
-    pirtos.setTabletGroupingRegex(Pattern.quote(queueId + ":"));
-
-    return pirtos;
-  }
-
-  public static class Options {
-
-    private static final String PREFIX = "recipes.exportQueue.";
-    static final long DEFAULT_BUFFER_SIZE = 1 << 20;
-    static final int DEFAULT_BUCKETS_PER_TABLET = 10;
-
-    int numBuckets;
-    Integer bucketsPerTablet = null;
-    Long bufferSize;
-
-    String keyType;
-    String valueType;
-    String exporterType;
-    String queueId;
-
-    Options(String queueId, SimpleConfiguration appConfig) {
-      this.queueId = queueId;
-
-      this.numBuckets = appConfig.getInt(PREFIX + queueId + ".buckets");
-      this.exporterType = appConfig.getString(PREFIX + queueId + ".exporter");
-      this.keyType = appConfig.getString(PREFIX + queueId + ".key");
-      this.valueType = appConfig.getString(PREFIX + queueId + ".val");
-      this.bufferSize = appConfig.getLong(PREFIX + queueId + ".bufferSize", DEFAULT_BUFFER_SIZE);
-      this.bucketsPerTablet =
-          appConfig.getInt(PREFIX + queueId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
-    }
-
-    public Options(String queueId, String exporterType, String keyType, String valueType,
-        int buckets) {
-      Preconditions.checkArgument(buckets > 0);
-
-      this.queueId = queueId;
-      this.numBuckets = buckets;
-      this.exporterType = exporterType;
-      this.keyType = keyType;
-      this.valueType = valueType;
-    }
-
-
-    public <K, V> Options(String queueId, Class<? extends Exporter<K, V>> exporter,
-        Class<K> keyType, Class<V> valueType, int buckets) {
-      this(queueId, exporter.getName(), keyType.getName(), valueType.getName(), buckets);
-    }
-
-    /**
-     * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
-     * be used to actually deserialize and process the updates. This limit does not account for
-     * object overhead in java, which can be significant.
-     *
-     * <p>
-     * The way memory read is calculated is by summing the length of serialized key and value byte
-     * arrays. Once this sum exceeds the configured memory limit, no more export key values are
-     * processed in the current transaction. When not everything is processed, the observer
-     * processing exports will notify itself causing another transaction to continue processing
-     * later.
-     */
-    public Options setBufferSize(long bufferSize) {
-      Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
-      this.bufferSize = bufferSize;
-      return this;
-    }
-
-    long getBufferSize() {
-      if (bufferSize == null) {
-        return DEFAULT_BUFFER_SIZE;
-      }
-
-      return bufferSize;
-    }
-
-    /**
-     * Sets the number of buckets per tablet to generate. This affects how many split points will be
-     * generated when optimizing the Accumulo table.
-     *
-     */
-    public Options setBucketsPerTablet(int bucketsPerTablet) {
-      Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
-          + bucketsPerTablet);
-      this.bucketsPerTablet = bucketsPerTablet;
-      return this;
-    }
-
-    int getBucketsPerTablet() {
-      if (bucketsPerTablet == null) {
-        return DEFAULT_BUCKETS_PER_TABLET;
-      }
-
-      return bucketsPerTablet;
-    }
-
-    void save(SimpleConfiguration appConfig) {
-      appConfig.setProperty(PREFIX + queueId + ".buckets", numBuckets + "");
-      appConfig.setProperty(PREFIX + queueId + ".exporter", exporterType + "");
-      appConfig.setProperty(PREFIX + queueId + ".key", keyType);
-      appConfig.setProperty(PREFIX + queueId + ".val", valueType);
-
-      if (bufferSize != null) {
-        appConfig.setProperty(PREFIX + queueId + ".bufferSize", bufferSize);
-      }
-      if (bucketsPerTablet != null) {
-        appConfig.setProperty(PREFIX + queueId + ".bucketsPerTablet", bucketsPerTablet);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java
deleted file mode 100644
index b81e9d1..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Iterator;
-
-import org.apache.fluo.api.observer.Observer.Context;
-
-public abstract class Exporter<K, V> {
-
-  public void init(String queueId, Context observerContext) throws Exception {}
-
-  /**
-   * Must be able to handle same key being exported multiple times and key being exported out of
-   * order. The sequence number is meant to help with this.
-   *
-   * <p>
-   * If multiple export entries with the same key are passed in, then the entries with the same key
-   * will be consecutive and in ascending sequence order.
-   *
-   * <p>
-   * If the call to process exports is unexpectedly terminated, it will be called again later with
-   * at least the same data. For example suppose an exporter was passed the following entries.
-   *
-   * <ul>
-   * <li>key=0 sequence=9 value=abc
-   * <li>key=1 sequence=13 value=d
-   * <li>key=1 sequence=17 value=e
-   * <li>key=1 sequence=23 value=f
-   * <li>key=2 sequence=19 value=x
-   * </ul>
-   *
-   * <p>
-   * Assume the exporter exports some of these and then fails before completing all of them. The
-   * next time its called it will be passed what it saw before, but it could also be passed more.
-   *
-   * <ul>
-   * <li>key=0 sequence=9 value=abc
-   * <li>key=1 sequence=13 value=d
-   * <li>key=1 sequence=17 value=e
-   * <li>key=1 sequence=23 value=f
-   * <li>key=1 sequence=29 value=g
-   * <li>key=2 sequence=19 value=x
-   * <li>key=2 sequence=77 value=y
-   * </ul>
-   *
-   */
-  protected abstract void processExports(Iterator<SequencedExport<K, V>> exports);
-
-  // TODO add close
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java
deleted file mode 100644
index a862a8e..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-public class SequencedExport<K, V> extends Export<K, V> {
-  private final long seq;
-
-  SequencedExport(K k, V v, long seq) {
-    super(k, v);
-    this.seq = seq;
-  }
-
-  public long getSequence() {
-    return seq;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java b/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java
deleted file mode 100644
index ded289c..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.impl;
-
-public class BucketUtil {
-  public static String genBucketId(int bucket, int maxBucket) {
-    int bucketLen = Integer.toHexString(maxBucket).length();
-    // TODO printf is slow
-    return String.format("%0" + bucketLen + "x", bucket);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java b/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java
deleted file mode 100644
index bc7bffd..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java
+++ /dev/null
@@ -1,657 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.map;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
-import com.google.common.hash.Hashing;
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.BytesBuilder;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.recipes.common.Pirtos;
-import org.apache.fluo.recipes.common.RowRange;
-import org.apache.fluo.recipes.common.TransientRegistry;
-import org.apache.fluo.recipes.impl.BucketUtil;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-
-/**
- * See the project level documentation for information about this recipe.
- */
-public class CollisionFreeMap<K, V> {
-
-  private static final String UPDATE_RANGE_END = ":u:~";
-
-  private static final String DATA_RANGE_END = ":d:~";
-
-  private String mapId;
-
-  private Class<K> keyType;
-  private Class<V> valType;
-  private SimpleSerializer serializer;
-  private Combiner<K, V> combiner;
-  UpdateObserver<K, V> updateObserver;
-  private long bufferSize;
-
-  static final Column UPDATE_COL = new Column("u", "v");
-  static final Column NEXT_COL = new Column("u", "next");
-
-  private int numBuckets = -1;
-
-  @SuppressWarnings("unchecked")
-  CollisionFreeMap(Options opts, SimpleSerializer serializer) throws Exception {
-
-    this.mapId = opts.mapId;
-    // TODO defer loading classes
-    // TODO centralize class loading
-    // TODO try to check type params
-    this.numBuckets = opts.numBuckets;
-    this.keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType);
-    this.valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType);
-    this.combiner =
-        (Combiner<K, V>) getClass().getClassLoader().loadClass(opts.combinerType).newInstance();
-    this.serializer = serializer;
-    if (opts.updateObserverType != null) {
-      this.updateObserver =
-          getClass().getClassLoader().loadClass(opts.updateObserverType)
-              .asSubclass(UpdateObserver.class).newInstance();
-    } else {
-      this.updateObserver = new NullUpdateObserver<>();
-    }
-    this.bufferSize = opts.getBufferSize();
-  }
-
-  private V deserVal(Bytes val) {
-    return serializer.deserialize(val.toArray(), valType);
-  }
-
-  private Bytes getKeyFromUpdateRow(Bytes prefix, Bytes row) {
-    return row.subSequence(prefix.length(), row.length() - 8);
-  }
-
-  void process(TransactionBase tx, Bytes ntfyRow, Column col) throws Exception {
-
-    Bytes nextKey = tx.get(ntfyRow, NEXT_COL);
-
-    ScannerConfiguration sc = new ScannerConfiguration();
-
-    if (nextKey != null) {
-      Bytes startRow =
-          Bytes.newBuilder(ntfyRow.length() + nextKey.length()).append(ntfyRow).append(nextKey)
-              .toBytes();
-      Span tmpSpan = Span.prefix(ntfyRow);
-      Span nextSpan =
-          new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(),
-              tmpSpan.isEndInclusive());
-      sc.setSpan(nextSpan);
-    } else {
-      sc.setSpan(Span.prefix(ntfyRow));
-    }
-
-    sc.setSpan(Span.prefix(ntfyRow));
-    sc.fetchColumn(UPDATE_COL.getFamily(), UPDATE_COL.getQualifier());
-    RowIterator iter = tx.get(sc);
-
-    Map<Bytes, List<Bytes>> updates = new HashMap<>();
-
-    long approxMemUsed = 0;
-
-    Bytes partiallyReadKey = null;
-
-    if (iter.hasNext()) {
-      Bytes lastKey = null;
-      while (iter.hasNext() && approxMemUsed < bufferSize) {
-        Entry<Bytes, ColumnIterator> rowCol = iter.next();
-        Bytes curRow = rowCol.getKey();
-
-        tx.delete(curRow, UPDATE_COL);
-
-        Bytes serializedKey = getKeyFromUpdateRow(ntfyRow, curRow);
-        lastKey = serializedKey;
-
-        List<Bytes> updateList = updates.get(serializedKey);
-        if (updateList == null) {
-          updateList = new ArrayList<>();
-          updates.put(serializedKey, updateList);
-        }
-
-        Bytes val = rowCol.getValue().next().getValue();
-        updateList.add(val);
-
-        approxMemUsed += curRow.length();
-        approxMemUsed += val.length();
-      }
-
-      if (iter.hasNext()) {
-        Entry<Bytes, ColumnIterator> rowCol = iter.next();
-        Bytes curRow = rowCol.getKey();
-
-        // check if more updates for last key
-        if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) {
-          // there are still more updates for this key
-          partiallyReadKey = lastKey;
-
-          // start next time at the current key
-          tx.set(ntfyRow, NEXT_COL, partiallyReadKey);
-        } else {
-          // start next time at the next possible key
-          Bytes nextPossible =
-              Bytes.newBuilder(lastKey.length() + 1).append(lastKey).append(new byte[] {0})
-                  .toBytes();
-          tx.set(ntfyRow, NEXT_COL, nextPossible);
-        }
-
-        // may not read all data because of mem limit, so notify self
-        tx.setWeakNotification(ntfyRow, col);
-      } else if (nextKey != null) {
-        // clear nextKey
-        tx.delete(ntfyRow, NEXT_COL);
-      }
-    } else if (nextKey != null) {
-      tx.delete(ntfyRow, NEXT_COL);
-    }
-
-    byte[] dataPrefix = ntfyRow.toArray();
-    // TODO this is awful... no sanity check... hard to read
-    dataPrefix[Bytes.of(mapId).length() + 1] = 'd';
-
-    BytesBuilder rowBuilder = Bytes.newBuilder();
-    rowBuilder.append(dataPrefix);
-    int rowPrefixLen = rowBuilder.getLength();
-
-    Set<Bytes> keysToFetch = updates.keySet();
-    if (partiallyReadKey != null) {
-      final Bytes prk = partiallyReadKey;
-      keysToFetch = Sets.filter(keysToFetch, b -> !b.equals(prk));
-    }
-    Map<Bytes, Map<Column, Bytes>> currentVals = getCurrentValues(tx, rowBuilder, keysToFetch);
-
-    ArrayList<Update<K, V>> updatesToReport = new ArrayList<>(updates.size());
-
-    for (Entry<Bytes, List<Bytes>> entry : updates.entrySet()) {
-      rowBuilder.setLength(rowPrefixLen);
-      Bytes currentValueRow = rowBuilder.append(entry.getKey()).toBytes();
-      Bytes currVal =
-          currentVals.getOrDefault(currentValueRow, Collections.emptyMap()).get(DATA_COLUMN);
-
-      Iterator<V> ui = Iterators.transform(entry.getValue().iterator(), this::deserVal);
-
-      K kd = serializer.deserialize(entry.getKey().toArray(), keyType);
-
-      if (partiallyReadKey != null && partiallyReadKey.equals(entry.getKey())) {
-        // not all updates were read for this key, so requeue the combined updates as an update
-        Optional<V> nv = combiner.combine(kd, ui);
-        if (nv.isPresent()) {
-          update(tx, Collections.singletonMap(kd, nv.get()));
-        }
-      } else {
-        Optional<V> nv = combiner.combine(kd, concat(ui, currVal));
-        Bytes newVal = nv.isPresent() ? Bytes.of(serializer.serialize(nv.get())) : null;
-        if (newVal != null ^ currVal != null || (currVal != null && !currVal.equals(newVal))) {
-          if (newVal == null) {
-            tx.delete(currentValueRow, DATA_COLUMN);
-          } else {
-            tx.set(currentValueRow, DATA_COLUMN, newVal);
-          }
-
-          Optional<V> cvd = Optional.ofNullable(currVal).map(this::deserVal);
-          updatesToReport.add(new Update<>(kd, cvd, nv));
-        }
-      }
-    }
-
-    // TODO could clear these as converted to objects to avoid double memory usage
-    updates.clear();
-    currentVals.clear();
-
-    if (updatesToReport.size() > 0) {
-      updateObserver.updatingValues(tx, updatesToReport.iterator());
-    }
-  }
-
-  private static final Column DATA_COLUMN = new Column("data", "current");
-
-  private Map<Bytes, Map<Column, Bytes>> getCurrentValues(TransactionBase tx, BytesBuilder prefix,
-      Set<Bytes> keySet) {
-
-    Set<Bytes> rows = new HashSet<>();
-
-    int prefixLen = prefix.getLength();
-    for (Bytes key : keySet) {
-      prefix.setLength(prefixLen);
-      rows.add(prefix.append(key).toBytes());
-    }
-
-    try {
-      return tx.get(rows, Collections.singleton(DATA_COLUMN));
-    } catch (IllegalArgumentException e) {
-      System.out.println(rows.size());
-      throw e;
-    }
-  }
-
-  private Iterator<V> concat(Iterator<V> updates, Bytes currentVal) {
-    if (currentVal == null) {
-      return updates;
-    }
-
-    return Iterators.concat(updates, Iterators.singletonIterator(deserVal(currentVal)));
-  }
-
-  /**
-   * This method will retrieve the current value for key and any outstanding updates and combine
-   * them using the configured {@link Combiner}. The result from the combiner is returned.
-   */
-  public V get(SnapshotBase tx, K key) {
-
-    byte[] k = serializer.serialize(key);
-
-    int hash = Hashing.murmur3_32().hashBytes(k).asInt();
-    String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
-
-
-    BytesBuilder rowBuilder = Bytes.newBuilder();
-    rowBuilder.append(mapId).append(":u:").append(bucketId).append(":").append(k);
-
-    ScannerConfiguration sc = new ScannerConfiguration();
-    sc.setSpan(Span.prefix(rowBuilder.toBytes()));
-
-    RowIterator iter = tx.get(sc);
-
-    Iterator<V> ui;
-
-    if (iter.hasNext()) {
-      ui = Iterators.transform(iter, e -> deserVal(e.getValue().next().getValue()));
-    } else {
-      ui = Collections.<V>emptyList().iterator();
-    }
-
-    rowBuilder.setLength(mapId.length());
-    rowBuilder.append(":d:").append(bucketId).append(":").append(k);
-
-    Bytes dataRow = rowBuilder.toBytes();
-
-    Bytes cv = tx.get(dataRow, DATA_COLUMN);
-
-    if (!ui.hasNext()) {
-      if (cv == null) {
-        return null;
-      } else {
-        return deserVal(cv);
-      }
-    }
-
-    return combiner.combine(key, concat(ui, cv)).orElse(null);
-  }
-
-  String getId() {
-    return mapId;
-  }
-
-  /**
-   * Queues updates for a collision free map. These updates will be made by an Observer executing
-   * another transaction. This method will not collide with other transaction queuing updates for
-   * the same keys.
-   *
-   * @param tx This transaction will be used to make the updates.
-   * @param updates The keys in the map should correspond to keys in the collision free map being
-   *        updated. The values in the map will be queued for updating.
-   */
-  public void update(TransactionBase tx, Map<K, V> updates) {
-    Preconditions.checkState(numBuckets > 0, "Not initialized");
-
-    Set<String> buckets = new HashSet<>();
-
-    BytesBuilder rowBuilder = Bytes.newBuilder();
-    rowBuilder.append(mapId).append(":u:");
-    int prefixLength = rowBuilder.getLength();
-
-    byte[] startTs = encSeq(tx.getStartTimestamp());
-
-    for (Entry<K, V> entry : updates.entrySet()) {
-      byte[] k = serializer.serialize(entry.getKey());
-      int hash = Hashing.murmur3_32().hashBytes(k).asInt();
-      String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
-
-      // reset to the common row prefix
-      rowBuilder.setLength(prefixLength);
-
-      Bytes row = rowBuilder.append(bucketId).append(":").append(k).append(startTs).toBytes();
-      Bytes val = Bytes.of(serializer.serialize(entry.getValue()));
-
-      // TODO set if not exists would be comforting here.... but
-      // collisions on bucketId+key+uuid should never occur
-      tx.set(row, UPDATE_COL, val);
-
-      buckets.add(bucketId);
-    }
-
-    for (String bucketId : buckets) {
-      rowBuilder.setLength(prefixLength);
-      rowBuilder.append(bucketId).append(":");
-
-      Bytes row = rowBuilder.toBytes();
-
-      tx.setWeakNotification(row, new Column("fluoRecipes", "cfm:" + mapId));
-    }
-  }
-
-
-  public static <K2, V2> CollisionFreeMap<K2, V2> getInstance(String mapId,
-      SimpleConfiguration appConf) {
-    Options opts = new Options(mapId, appConf);
-    try {
-      return new CollisionFreeMap<>(opts, SimpleSerializer.getInstance(appConf));
-    } catch (Exception e) {
-      // TODO
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * A @link {@link CollisionFreeMap} stores data in its own data format in the Fluo table. When
-   * initializing a Fluo table with something like Map Reduce or Spark, data will need to be written
-   * in this format. Thats the purpose of this method, it provide a simple class that can do this
-   * conversion.
-   *
-   */
-  public static <K2, V2> Initializer<K2, V2> getInitializer(String mapId, int numBuckets,
-      SimpleSerializer serializer) {
-    return new Initializer<>(mapId, numBuckets, serializer);
-  }
-
-
-  /**
-   * @see CollisionFreeMap#getInitializer(String, int, SimpleSerializer)
-   */
-  public static class Initializer<K2, V2> implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private String mapId;
-
-    private SimpleSerializer serializer;
-
-    private int numBuckets = -1;
-
-    private Initializer(String mapId, int numBuckets, SimpleSerializer serializer) {
-      this.mapId = mapId;
-      this.numBuckets = numBuckets;
-      this.serializer = serializer;
-    }
-
-    public RowColumnValue convert(K2 key, V2 val) {
-      byte[] k = serializer.serialize(key);
-      int hash = Hashing.murmur3_32().hashBytes(k).asInt();
-      String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
-
-      BytesBuilder bb = Bytes.newBuilder();
-      Bytes row = bb.append(mapId).append(":d:").append(bucketId).append(":").append(k).toBytes();
-      byte[] v = serializer.serialize(val);
-
-      return new RowColumnValue(row, DATA_COLUMN, Bytes.of(v));
-    }
-  }
-
-  public static class Options {
-
-    static final long DEFAULT_BUFFER_SIZE = 1 << 22;
-    static final int DEFAULT_BUCKETS_PER_TABLET = 10;
-
-    int numBuckets;
-    Integer bucketsPerTablet = null;
-
-    Long bufferSize;
-
-    String keyType;
-    String valueType;
-    String combinerType;
-    String updateObserverType;
-    String mapId;
-
-    private static final String PREFIX = "recipes.cfm.";
-
-    Options(String mapId, SimpleConfiguration appConfig) {
-      this.mapId = mapId;
-
-      this.numBuckets = appConfig.getInt(PREFIX + mapId + ".buckets");
-      this.combinerType = appConfig.getString(PREFIX + mapId + ".combiner");
-      this.keyType = appConfig.getString(PREFIX + mapId + ".key");
-      this.valueType = appConfig.getString(PREFIX + mapId + ".val");
-      this.updateObserverType = appConfig.getString(PREFIX + mapId + ".updateObserver", null);
-      this.bufferSize = appConfig.getLong(PREFIX + mapId + ".bufferSize", DEFAULT_BUFFER_SIZE);
-      this.bucketsPerTablet =
-          appConfig.getInt(PREFIX + mapId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
-    }
-
-    public Options(String mapId, String combinerType, String keyType, String valType, int buckets) {
-      Preconditions.checkArgument(buckets > 0);
-      Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
-
-      this.mapId = mapId;
-      this.numBuckets = buckets;
-      this.combinerType = combinerType;
-      this.updateObserverType = null;
-      this.keyType = keyType;
-      this.valueType = valType;
-    }
-
-    public Options(String mapId, String combinerType, String updateObserverType, String keyType,
-        String valueType, int buckets) {
-      Preconditions.checkArgument(buckets > 0);
-      Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
-
-      this.mapId = mapId;
-      this.numBuckets = buckets;
-      this.combinerType = combinerType;
-      this.updateObserverType = updateObserverType;
-      this.keyType = keyType;
-      this.valueType = valueType;
-    }
-
-    /**
-     * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
-     * be used to actually deserialize and process the updates. This limit does not account for
-     * object overhead in java, which can be significant.
-     *
-     * <p>
-     * The way memory read is calculated is by summing the length of serialized key and value byte
-     * arrays. Once this sum exceeds the configured memory limit, no more update key values are
-     * processed in the current transaction. When not everything is processed, the observer
-     * processing updates will notify itself causing another transaction to continue processing
-     * later
-     */
-    public Options setBufferSize(long bufferSize) {
-      Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
-      this.bufferSize = bufferSize;
-      return this;
-    }
-
-    long getBufferSize() {
-      if (bufferSize == null) {
-        return DEFAULT_BUFFER_SIZE;
-      }
-
-      return bufferSize;
-    }
-
-    /**
-     * Sets the number of buckets per tablet to generate. This affects how many split points will be
-     * generated when optimizing the Accumulo table.
-     *
-     */
-    public Options setBucketsPerTablet(int bucketsPerTablet) {
-      Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
-          + bucketsPerTablet);
-      this.bucketsPerTablet = bucketsPerTablet;
-      return this;
-    }
-
-    int getBucketsPerTablet() {
-      if (bucketsPerTablet == null) {
-        return DEFAULT_BUCKETS_PER_TABLET;
-      }
-
-      return bucketsPerTablet;
-    }
-
-    public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner, Class<K> keyType,
-        Class<V> valueType, int buckets) {
-      this(mapId, combiner.getName(), keyType.getName(), valueType.getName(), buckets);
-    }
-
-    public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner,
-        Class<? extends UpdateObserver<K, V>> updateObserver, Class<K> keyType, Class<V> valueType,
-        int buckets) {
-      this(mapId, combiner.getName(), updateObserver.getName(), keyType.getName(), valueType
-          .getName(), buckets);
-    }
-
-    void save(SimpleConfiguration appConfig) {
-      appConfig.setProperty(PREFIX + mapId + ".buckets", numBuckets + "");
-      appConfig.setProperty(PREFIX + mapId + ".combiner", combinerType + "");
-      appConfig.setProperty(PREFIX + mapId + ".key", keyType);
-      appConfig.setProperty(PREFIX + mapId + ".val", valueType);
-      if (updateObserverType != null) {
-        appConfig.setProperty(PREFIX + mapId + ".updateObserver", updateObserverType + "");
-      }
-      if (bufferSize != null) {
-        appConfig.setProperty(PREFIX + mapId + ".bufferSize", bufferSize);
-      }
-      if (bucketsPerTablet != null) {
-        appConfig.setProperty(PREFIX + mapId + ".bucketsPerTablet", bucketsPerTablet);
-      }
-    }
-  }
-
-  /**
-   * This method configures a collision free map for use. It must be called before initializing
-   * Fluo.
-   */
-  public static void configure(FluoConfiguration fluoConfig, Options opts) {
-    opts.save(fluoConfig.getAppConfiguration());
-    fluoConfig.addObserver(new ObserverConfiguration(CollisionFreeMapObserver.class.getName())
-        .setParameters(ImmutableMap.of("mapId", opts.mapId)));
-
-    Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
-    Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
-
-    new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("cfm." + opts.mapId,
-        new RowRange(dataRangeEnd, updateRangeEnd));
-  }
-
-  /**
-   * Return suggested Fluo table optimizations for all previously configured collision free maps.
-   *
-   * @param appConfig Must pass in the application configuration obtained from
-   *        {@code FluoClient.getAppConfiguration()} or
-   *        {@code FluoConfiguration.getAppConfiguration()}
-   */
-  public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
-    HashSet<String> mapIds = new HashSet<>();
-    appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
-        k -> mapIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
-
-    Pirtos pirtos = new Pirtos();
-    mapIds.forEach(mid -> pirtos.merge(getTableOptimizations(mid, appConfig)));
-
-    return pirtos;
-  }
-
-  /**
-   * Return suggested Fluo table optimizations for the specified collisiong free map.
-   *
-   * @param appConfig Must pass in the application configuration obtained from
-   *        {@code FluoClient.getAppConfiguration()} or
-   *        {@code FluoConfiguration.getAppConfiguration()}
-   */
-  public static Pirtos getTableOptimizations(String mapId, SimpleConfiguration appConfig) {
-    Options opts = new Options(mapId, appConfig);
-
-    BytesBuilder rowBuilder = Bytes.newBuilder();
-    rowBuilder.append(mapId);
-
-    List<Bytes> dataSplits = new ArrayList<>();
-    for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
-      String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
-      rowBuilder.setLength(mapId.length());
-      dataSplits.add(rowBuilder.append(":d:").append(bucketId).toBytes());
-    }
-    Collections.sort(dataSplits);
-
-    List<Bytes> updateSplits = new ArrayList<>();
-    for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
-      String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
-      rowBuilder.setLength(mapId.length());
-      updateSplits.add(rowBuilder.append(":u:").append(bucketId).toBytes());
-    }
-    Collections.sort(updateSplits);
-
-    Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
-    Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
-
-    List<Bytes> splits = new ArrayList<>();
-    splits.add(dataRangeEnd);
-    splits.add(updateRangeEnd);
-    splits.addAll(dataSplits);
-    splits.addAll(updateSplits);
-
-    Pirtos pirtos = new Pirtos();
-    pirtos.setSplits(splits);
-
-    pirtos.setTabletGroupingRegex(Pattern.quote(mapId + ":") + "[du]:");
-
-    return pirtos;
-  }
-
-  private static byte[] encSeq(long l) {
-    byte[] ret = new byte[8];
-    ret[0] = (byte) (l >>> 56);
-    ret[1] = (byte) (l >>> 48);
-    ret[2] = (byte) (l >>> 40);
-    ret[3] = (byte) (l >>> 32);
-    ret[4] = (byte) (l >>> 24);
-    ret[5] = (byte) (l >>> 16);
-    ret[6] = (byte) (l >>> 8);
-    ret[7] = (byte) (l >>> 0);
-    return ret;
-  }
-}