You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 22:07:58 UTC

[10/10] incubator-fluo-recipes git commit: Merge remote-tracking branch 'mike/package-refactor'

Merge remote-tracking branch 'mike/package-refactor'

Conflicts:
	docs/row-hasher.md
	modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
	modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
	modules/core/src/main/java/org/apache/fluo/recipes/common/Pirtos.java
	modules/core/src/main/java/org/apache/fluo/recipes/common/TableOptimizations.java
	modules/core/src/main/java/org/apache/fluo/recipes/core/common/Pirtos.java
	modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
	modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
	modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
	modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
	modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/22354d0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/22354d0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/22354d0f

Branch: refs/heads/master
Commit: 22354d0f7f03f532ec093b7ece61b5ae9fb070e9
Parents: a8b85f3 083e4af
Author: Keith Turner <kt...@apache.org>
Authored: Fri Jul 15 18:01:55 2016 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jul 15 18:01:55 2016 -0400

----------------------------------------------------------------------
 docs/export-queue.md                            |   2 +-
 docs/row-hasher.md                              |   4 +-
 docs/serialization.md                           |   2 +-
 docs/transient.md                               |   2 +-
 .../recipes/accumulo/cmds/CompactTransient.java |  13 +-
 .../recipes/accumulo/cmds/OptimizeTable.java    |   6 +-
 .../recipes/accumulo/export/AccumuloExport.java |   1 +
 .../accumulo/export/AccumuloExporter.java       |   5 +-
 .../accumulo/export/DifferenceExport.java       |   1 +
 .../accumulo/export/ReplicationExport.java      |   5 +-
 .../accumulo/export/SharedBatchWriter.java      |  14 +-
 .../fluo/recipes/accumulo/export/TableInfo.java |   3 +
 .../recipes/accumulo/ops/TableOperations.java   |   9 +-
 .../apache/fluo/recipes/common/RowRange.java    |  82 ---
 .../fluo/recipes/common/TableOptimizations.java |  82 ---
 .../fluo/recipes/common/TransientRegistry.java  |  79 ---
 .../fluo/recipes/core/common/RowRange.java      |  85 +++
 .../recipes/core/common/TableOptimizations.java |  84 +++
 .../recipes/core/common/TransientRegistry.java  |  80 +++
 .../fluo/recipes/core/data/RowHasher.java       | 137 ++++
 .../apache/fluo/recipes/core/export/Export.java |  41 ++
 .../fluo/recipes/core/export/ExportBucket.java  | 203 ++++++
 .../fluo/recipes/core/export/ExportEntry.java   |  22 +
 .../recipes/core/export/ExportObserver.java     | 143 ++++
 .../fluo/recipes/core/export/ExportQueue.java   | 277 ++++++++
 .../fluo/recipes/core/export/Exporter.java      |  67 ++
 .../recipes/core/export/SequencedExport.java    |  32 +
 .../fluo/recipes/core/impl/BucketUtil.java      |  24 +
 .../fluo/recipes/core/map/CollisionFreeMap.java | 657 +++++++++++++++++++
 .../core/map/CollisionFreeMapObserver.java      |  54 ++
 .../apache/fluo/recipes/core/map/Combiner.java  |  34 +
 .../recipes/core/map/NullUpdateObserver.java    |  25 +
 .../apache/fluo/recipes/core/map/Update.java    |  46 ++
 .../fluo/recipes/core/map/UpdateObserver.java   |  35 +
 .../core/serialization/SimpleSerializer.java    |  59 ++
 .../fluo/recipes/core/transaction/LogEntry.java | 116 ++++
 .../core/transaction/RecordingTransaction.java  |  66 ++
 .../transaction/RecordingTransactionBase.java   | 252 +++++++
 .../fluo/recipes/core/transaction/TxLog.java    |  81 +++
 .../apache/fluo/recipes/core/types/Encoder.java |  86 +++
 .../fluo/recipes/core/types/StringEncoder.java  |  86 +++
 .../fluo/recipes/core/types/TypeLayer.java      | 488 ++++++++++++++
 .../fluo/recipes/core/types/TypedLoader.java    |  45 ++
 .../fluo/recipes/core/types/TypedObserver.java  |  46 ++
 .../fluo/recipes/core/types/TypedSnapshot.java  |  38 ++
 .../recipes/core/types/TypedSnapshotBase.java   | 555 ++++++++++++++++
 .../recipes/core/types/TypedTransaction.java    |  46 ++
 .../core/types/TypedTransactionBase.java        | 278 ++++++++
 .../org/apache/fluo/recipes/data/RowHasher.java | 135 ----
 .../org/apache/fluo/recipes/export/Export.java  |  38 --
 .../fluo/recipes/export/ExportBucket.java       | 203 ------
 .../apache/fluo/recipes/export/ExportEntry.java |  22 -
 .../fluo/recipes/export/ExportObserver.java     | 140 ----
 .../apache/fluo/recipes/export/ExportQueue.java | 274 --------
 .../apache/fluo/recipes/export/Exporter.java    |  64 --
 .../fluo/recipes/export/SequencedExport.java    |  29 -
 .../apache/fluo/recipes/impl/BucketUtil.java    |  24 -
 .../fluo/recipes/map/CollisionFreeMap.java      | 657 -------------------
 .../recipes/map/CollisionFreeMapObserver.java   |  53 --
 .../org/apache/fluo/recipes/map/Combiner.java   |  31 -
 .../fluo/recipes/map/NullUpdateObserver.java    |  25 -
 .../org/apache/fluo/recipes/map/Update.java     |  43 --
 .../apache/fluo/recipes/map/UpdateObserver.java |  34 -
 .../recipes/serialization/SimpleSerializer.java |  56 --
 .../fluo/recipes/transaction/LogEntry.java      | 114 ----
 .../transaction/RecordingTransaction.java       |  64 --
 .../transaction/RecordingTransactionBase.java   | 250 -------
 .../apache/fluo/recipes/transaction/TxLog.java  |  79 ---
 .../org/apache/fluo/recipes/types/Encoder.java  |  86 ---
 .../fluo/recipes/types/StringEncoder.java       |  86 ---
 .../apache/fluo/recipes/types/TypeLayer.java    | 488 --------------
 .../apache/fluo/recipes/types/TypedLoader.java  |  45 --
 .../fluo/recipes/types/TypedObserver.java       |  46 --
 .../fluo/recipes/types/TypedSnapshot.java       |  38 --
 .../fluo/recipes/types/TypedSnapshotBase.java   | 555 ----------------
 .../fluo/recipes/types/TypedTransaction.java    |  46 --
 .../recipes/types/TypedTransactionBase.java     | 278 --------
 .../fluo/recipes/common/TestGrouping.java       |  95 ---
 .../recipes/common/TransientRegistryTest.java   |  48 --
 .../fluo/recipes/core/common/TestGrouping.java  |  93 +++
 .../core/common/TransientRegistryTest.java      |  48 ++
 .../fluo/recipes/core/data/RowHasherTest.java   |  62 ++
 .../recipes/core/export/DocumentLoader.java     |  36 +
 .../recipes/core/export/DocumentObserver.java   |  63 ++
 .../recipes/core/export/ExportBufferIT.java     | 106 +++
 .../fluo/recipes/core/export/ExportQueueIT.java | 114 ++++
 .../recipes/core/export/ExportTestBase.java     | 286 ++++++++
 .../recipes/core/export/GsonSerializer.java     |  42 ++
 .../fluo/recipes/core/export/OptionsTest.java   |  51 ++
 .../fluo/recipes/core/export/RefInfo.java       |  26 +
 .../fluo/recipes/core/export/RefUpdates.java    |  43 ++
 .../fluo/recipes/core/map/BigUpdateIT.java      | 214 ++++++
 .../recipes/core/map/CollisionFreeMapIT.java    | 361 ++++++++++
 .../fluo/recipes/core/map/DocumentLoader.java   |  35 +
 .../fluo/recipes/core/map/DocumentObserver.java |  89 +++
 .../fluo/recipes/core/map/OptionsTest.java      |  51 ++
 .../fluo/recipes/core/map/SplitsTest.java       |  76 +++
 .../fluo/recipes/core/map/TestSerializer.java   |  45 ++
 .../recipes/core/map/WordCountCombiner.java     |  36 +
 .../recipes/core/map/WordCountObserver.java     |  47 ++
 .../transaction/RecordingTransactionTest.java   | 227 +++++++
 .../fluo/recipes/core/types/MockSnapshot.java   |  30 +
 .../recipes/core/types/MockSnapshotBase.java    | 202 ++++++
 .../recipes/core/types/MockTransaction.java     |  36 +
 .../recipes/core/types/MockTransactionBase.java |  90 +++
 .../fluo/recipes/core/types/TypeLayerTest.java  | 494 ++++++++++++++
 .../apache/fluo/recipes/data/RowHasherTest.java |  62 --
 .../fluo/recipes/export/DocumentLoader.java     |  36 -
 .../fluo/recipes/export/DocumentObserver.java   |  63 --
 .../fluo/recipes/export/ExportBufferIT.java     | 106 ---
 .../fluo/recipes/export/ExportQueueIT.java      | 114 ----
 .../fluo/recipes/export/ExportTestBase.java     | 286 --------
 .../fluo/recipes/export/GsonSerializer.java     |  42 --
 .../apache/fluo/recipes/export/OptionsTest.java |  51 --
 .../org/apache/fluo/recipes/export/RefInfo.java |  26 -
 .../apache/fluo/recipes/export/RefUpdates.java  |  43 --
 .../apache/fluo/recipes/map/BigUpdateIT.java    | 214 ------
 .../fluo/recipes/map/CollisionFreeMapIT.java    | 361 ----------
 .../apache/fluo/recipes/map/DocumentLoader.java |  35 -
 .../fluo/recipes/map/DocumentObserver.java      |  89 ---
 .../apache/fluo/recipes/map/OptionsTest.java    |  51 --
 .../org/apache/fluo/recipes/map/SplitsTest.java |  76 ---
 .../apache/fluo/recipes/map/TestSerializer.java |  45 --
 .../fluo/recipes/map/WordCountCombiner.java     |  36 -
 .../fluo/recipes/map/WordCountObserver.java     |  47 --
 .../transaction/RecordingTransactionTest.java   | 227 -------
 .../apache/fluo/recipes/types/MockSnapshot.java |  30 -
 .../fluo/recipes/types/MockSnapshotBase.java    | 202 ------
 .../fluo/recipes/types/MockTransaction.java     |  36 -
 .../fluo/recipes/types/MockTransactionBase.java |  90 ---
 .../fluo/recipes/types/TypeLayerTest.java       | 494 --------------
 .../recipes/kryo/KryoSimplerSerializer.java     |   5 +-
 .../serialization/KryoSimpleSerializerTest.java |  45 ++
 .../serialization/KryoSimpleSerializerTest.java |  45 --
 .../recipes/spark/AccumuloRangePartitioner.java |   3 +
 .../fluo/recipes/spark/FluoSparkHelper.java     |   2 +
 .../fluo/recipes/spark/FluoSparkTestUtil.java   |   2 +
 .../fluo/recipes/test/AccumuloExportITBase.java |   4 +-
 .../apache/fluo/recipes/test/FluoITHelper.java  |   2 +
 .../recipes/test/export/AccumuloExporterIT.java |   2 +-
 .../test/export/AccumuloReplicatorIT.java       |  10 +-
 141 files changed, 7395 insertions(+), 7334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/docs/row-hasher.md
----------------------------------------------------------------------
diff --cc docs/row-hasher.md
index 78aab23,8db8af6..fdd8218
--- a/docs/row-hasher.md
+++ b/docs/row-hasher.md
@@@ -31,8 -31,8 +31,8 @@@ balancing of the prefix
  
  ```java
  import org.apache.fluo.api.data.Bytes;
- import org.apache.fluo.recipes.common.TableOptimizations;
- import org.apache.fluo.recipes.data.RowHasher;
 -import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ import org.apache.fluo.recipes.core.data.RowHasher;
  
  public class RowHasherExample {
  

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
----------------------------------------------------------------------
diff --cc modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
index 7910bdb,2a56cfb..92651bd
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
@@@ -19,8 -19,11 +19,11 @@@ import javax.inject.Inject
  
  import org.apache.fluo.api.config.FluoConfiguration;
  import org.apache.fluo.recipes.accumulo.ops.TableOperations;
- import org.apache.fluo.recipes.common.TableOptimizations;
 -import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
  
+ /**
+  * @since 1.0.0
+  */
  public class OptimizeTable {
  
    // when run with fluo exec command, the applications fluo config will be injected
@@@ -33,9 -36,7 +36,8 @@@
        System.exit(-1);
      }
  
- 
 -    TableOperations.optimizeTable(fluoConfig, Pirtos.getConfiguredOptimizations(fluoConfig));
 +    TableOperations.optimizeTable(fluoConfig,
 +        TableOptimizations.getConfiguredOptimizations(fluoConfig));
      System.out.println("Finished optimizing table");
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
----------------------------------------------------------------------
diff --cc modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
index c4fd07f,3cc418c..e433317
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
@@@ -28,9 -28,9 +28,9 @@@ import org.apache.fluo.api.client.FluoF
  import org.apache.fluo.api.config.FluoConfiguration;
  import org.apache.fluo.api.config.SimpleConfiguration;
  import org.apache.fluo.api.data.Bytes;
- import org.apache.fluo.recipes.common.TableOptimizations;
- import org.apache.fluo.recipes.common.RowRange;
- import org.apache.fluo.recipes.common.TransientRegistry;
 -import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ import org.apache.fluo.recipes.core.common.RowRange;
+ import org.apache.fluo.recipes.core.common.TransientRegistry;
  import org.apache.hadoop.io.Text;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java
index 0000000,0000000..4657366
new file mode 100644
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java
@@@ -1,0 -1,0 +1,84 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
++ * agreements. See the NOTICE file distributed with this work for additional information regarding
++ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance with the License. You may obtain a
++ * copy of the License at
++ * 
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * 
++ * Unless required by applicable law or agreed to in writing, software distributed under the License
++ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
++ * or implied. See the License for the specific language governing permissions and limitations under
++ * the License.
++ */
++
++package org.apache.fluo.recipes.core.common;
++
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.List;
++import java.util.Objects;
++
++import org.apache.fluo.api.client.FluoClient;
++import org.apache.fluo.api.client.FluoFactory;
++import org.apache.fluo.api.config.FluoConfiguration;
++import org.apache.fluo.api.config.SimpleConfiguration;
++import org.apache.fluo.api.data.Bytes;
++import org.apache.fluo.recipes.core.export.ExportQueue;
++import org.apache.fluo.recipes.core.map.CollisionFreeMap;
++
++/**
++ * Post initialization recommended table optimizations.
++ *
++ * @since 1.0.0
++ */
++public class TableOptimizations {
++  private List<Bytes> splits = new ArrayList<>();
++  private String tabletGroupingRegex = "";
++
++  public void setSplits(List<Bytes> splits) {
++    this.splits.clear();
++    this.splits.addAll(splits);
++  }
++
++  /**
++   * @return A recommended set of splits points to add to a Fluo table after initialization.
++   */
++  public List<Bytes> getSplits() {
++    return Collections.unmodifiableList(splits);
++  }
++
++  public void setTabletGroupingRegex(String tgr) {
++    Objects.requireNonNull(tgr);
++    this.tabletGroupingRegex = tgr;
++  }
++
++  public String getTabletGroupingRegex() {
++    return "(" + tabletGroupingRegex + ").*";
++  }
++
++  public void merge(TableOptimizations other) {
++    splits.addAll(other.splits);
++    if (tabletGroupingRegex.length() > 0 && other.tabletGroupingRegex.length() > 0) {
++      tabletGroupingRegex += "|" + other.tabletGroupingRegex;
++    } else {
++      tabletGroupingRegex += other.tabletGroupingRegex;
++    }
++  }
++
++  /**
++   * A utility method to get table optimizations for all configured recipes.
++   */
++  public static TableOptimizations getConfiguredOptimizations(FluoConfiguration fluoConfig) {
++    try (FluoClient client = FluoFactory.newClient(fluoConfig)) {
++      SimpleConfiguration appConfig = client.getAppConfiguration();
++      TableOptimizations tableOptim = new TableOptimizations();
++
++      tableOptim.merge(ExportQueue.getTableOptimizations(appConfig));
++      tableOptim.merge(CollisionFreeMap.getTableOptimizations(appConfig));
++
++      return tableOptim;
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
index 0000000,ace7e7e..e40ce9b
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
@@@ -1,0 -1,137 +1,137 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+  * agreements. See the NOTICE file distributed with this work for additional information regarding
+  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance with the License. You may obtain a
+  * copy of the License at
+  * 
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * 
+  * Unless required by applicable law or agreed to in writing, software distributed under the License
+  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+  * or implied. See the License for the specific language governing permissions and limitations under
+  * the License.
+  */
+ 
+ package org.apache.fluo.recipes.core.data;
+ 
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.regex.Pattern;
+ 
+ import com.google.common.base.Preconditions;
+ import com.google.common.base.Strings;
+ import com.google.common.hash.Hashing;
+ import org.apache.fluo.api.data.Bytes;
+ import org.apache.fluo.api.data.BytesBuilder;
 -import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ 
+ /**
+  * This recipe provides code to help add a hash of the row as a prefix of the row. Using this recipe
+  * rows are structured like the following.
+  * 
+  * <p>
+  * {@code <prefix>:<fixed len row hash>:<user row>}
+  * 
+  * <p>
+  * The recipe also provides code the help generate split points and configure balancing of the
+  * prefix.
+  * 
+  * <p>
+  * The project documentation has more information.
+  *
+  * @since 1.0.0
+  */
+ public class RowHasher {
+ 
+   private static final int HASH_LEN = 4;
+ 
 -  public Pirtos getTableOptimizations(int numTablets) {
++  public TableOptimizations getTableOptimizations(int numTablets) {
+ 
+     List<Bytes> splits = new ArrayList<>(numTablets - 1);
+ 
+     int numSplits = numTablets - 1;
+     int distance = (((int) Math.pow(Character.MAX_RADIX, HASH_LEN) - 1) / numTablets) + 1;
+     int split = distance;
+     for (int i = 0; i < numSplits; i++) {
+       splits.add(Bytes.of(prefix
+           + Strings.padStart(Integer.toString(split, Character.MAX_RADIX), HASH_LEN, '0')));
+       split += distance;
+     }
+ 
+     splits.add(Bytes.of(prefix + "~"));
+ 
+ 
 -    Pirtos pirtos = new Pirtos();
 -    pirtos.setSplits(splits);
 -    pirtos.setTabletGroupingRegex(Pattern.quote(prefix.toString()));
++    TableOptimizations tableOptim = new TableOptimizations();
++    tableOptim.setSplits(splits);
++    tableOptim.setTabletGroupingRegex(Pattern.quote(prefix.toString()));
+ 
 -    return pirtos;
++    return tableOptim;
+   }
+ 
+ 
+   private Bytes prefix;
+ 
+   public RowHasher(String prefix) {
+     this.prefix = Bytes.of(prefix + ":");
+   }
+ 
+   /**
+    * @return Returns input with prefix and hash of input prepended.
+    */
+   public Bytes addHash(String row) {
+     return addHash(Bytes.of(row));
+   }
+ 
+   /**
+    * @return Returns input with prefix and hash of input prepended.
+    */
+   public Bytes addHash(Bytes row) {
+     BytesBuilder builder = Bytes.newBuilder(prefix.length() + 5 + row.length());
+     builder.append(prefix);
+     builder.append(genHash(row));
+     builder.append(":");
+     builder.append(row);
+     return builder.toBytes();
+   }
+ 
+   private boolean hasHash(Bytes row) {
+     for (int i = prefix.length(); i < prefix.length() + HASH_LEN; i++) {
+       byte b = row.byteAt(i);
+       boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9');
+       if (!isAlphaNum) {
+         return false;
+       }
+     }
+ 
+     if (row.byteAt(prefix.length() - 1) != ':' || row.byteAt(prefix.length() + HASH_LEN) != ':') {
+       return false;
+     }
+ 
+     return true;
+   }
+ 
+   /**
+    * @return Returns input with prefix and hash stripped from beginning.
+    */
+   public Bytes removeHash(Bytes row) {
+     Preconditions.checkArgument(row.length() >= prefix.length() + 5,
+         "Row is shorter than expected " + row);
+     Preconditions.checkArgument(row.subSequence(0, prefix.length()).equals(prefix),
+         "Row does not have expected prefix " + row);
+     Preconditions.checkArgument(hasHash(row), "Row does not have expected hash " + row);
+     return row.subSequence(prefix.length() + 5, row.length());
+   }
+ 
+   private static String genHash(Bytes row) {
+     int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt();
+     hash = hash & 0x7fffffff;
+     // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is
+     // nice for debugging.
+     String hashString =
+         Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0');
+     hashString = hashString.substring(hashString.length() - HASH_LEN);
+ 
+     return hashString;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
index 0000000,ac04f80..dffa713
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
@@@ -1,0 -1,276 +1,277 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+  * agreements. See the NOTICE file distributed with this work for additional information regarding
+  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance with the License. You may obtain a
+  * copy of the License at
+  * 
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * 
+  * Unless required by applicable law or agreed to in writing, software distributed under the License
+  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+  * or implied. See the License for the specific language governing permissions and limitations under
+  * the License.
+  */
+ 
+ package org.apache.fluo.recipes.core.export;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.regex.Pattern;
+ 
+ import com.google.common.base.Preconditions;
+ import com.google.common.hash.Hashing;
+ import org.apache.fluo.api.client.TransactionBase;
+ import org.apache.fluo.api.config.FluoConfiguration;
+ import org.apache.fluo.api.config.ObserverConfiguration;
+ import org.apache.fluo.api.config.SimpleConfiguration;
+ import org.apache.fluo.api.data.Bytes;
 -import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ import org.apache.fluo.recipes.core.common.RowRange;
+ import org.apache.fluo.recipes.core.common.TransientRegistry;
+ import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+ 
+ /**
+  * @since 1.0.0
+  */
+ public class ExportQueue<K, V> {
+ 
+   private static final String RANGE_BEGIN = "#";
+   private static final String RANGE_END = ":~";
+ 
+   private int numBuckets;
+   private SimpleSerializer serializer;
+   private String queueId;
+ 
+   // usage hint : could be created once in an observers init method
+   // usage hint : maybe have a queue for each type of data being exported???
+   // maybe less queues are
+   // more efficient though because more batching at export time??
+   ExportQueue(Options opts, SimpleSerializer serializer) throws Exception {
+     // TODO sanity check key type based on type params
+     // TODO defer creating classes until needed.. so that its not done during Fluo init
+     this.queueId = opts.queueId;
+     this.numBuckets = opts.numBuckets;
+     this.serializer = serializer;
+   }
+ 
+   public void add(TransactionBase tx, K key, V value) {
+     addAll(tx, Collections.singleton(new Export<>(key, value)).iterator());
+   }
+ 
+   public void addAll(TransactionBase tx, Iterator<Export<K, V>> exports) {
+ 
+     Set<Integer> bucketsNotified = new HashSet<>();
+     while (exports.hasNext()) {
+       Export<K, V> export = exports.next();
+ 
+       byte[] k = serializer.serialize(export.getKey());
+       byte[] v = serializer.serialize(export.getValue());
+ 
+       int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+       int bucketId = Math.abs(hash % numBuckets);
+ 
+       ExportBucket bucket = new ExportBucket(tx, queueId, bucketId, numBuckets);
+       bucket.add(tx.getStartTimestamp(), k, v);
+ 
+       if (!bucketsNotified.contains(bucketId)) {
+         bucket.notifyExportObserver();
+         bucketsNotified.add(bucketId);
+       }
+     }
+   }
+ 
+   public static <K2, V2> ExportQueue<K2, V2> getInstance(String exportQueueId,
+       SimpleConfiguration appConfig) {
+     Options opts = new Options(exportQueueId, appConfig);
+     try {
+       return new ExportQueue<>(opts, SimpleSerializer.getInstance(appConfig));
+     } catch (Exception e) {
+       // TODO
+       throw new RuntimeException(e);
+     }
+   }
+ 
+   /**
+    * Call this method before initializing Fluo.
+    *
+    * @param fluoConfig The configuration that will be used to initialize fluo.
+    */
+   public static void configure(FluoConfiguration fluoConfig, Options opts) {
+     SimpleConfiguration appConfig = fluoConfig.getAppConfiguration();
+     opts.save(appConfig);
+ 
+     fluoConfig.addObserver(new ObserverConfiguration(ExportObserver.class.getName())
+         .setParameters(Collections.singletonMap("queueId", opts.queueId)));
+ 
+     Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN);
+     Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END);
+ 
+     new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("exportQueue."
+         + opts.queueId, new RowRange(exportRangeStart, exportRangeStop));
+   }
+ 
+   /**
+    * Return suggested Fluo table optimizations for all previously configured export queues.
+    *
+    * @param appConfig Must pass in the application configuration obtained from
+    *        {@code FluoClient.getAppConfiguration()} or
+    *        {@code FluoConfiguration.getAppConfiguration()}
+    */
+ 
 -  public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
++  public static TableOptimizations getTableOptimizations(SimpleConfiguration appConfig) {
+     HashSet<String> queueIds = new HashSet<>();
+     appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
+         k -> queueIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
+ 
 -    Pirtos pirtos = new Pirtos();
 -    queueIds.forEach(qid -> pirtos.merge(getTableOptimizations(qid, appConfig)));
++    TableOptimizations tableOptim = new TableOptimizations();
++    queueIds.forEach(qid -> tableOptim.merge(getTableOptimizations(qid, appConfig)));
+ 
 -    return pirtos;
++    return tableOptim;
+   }
+ 
+   /**
+    * Return suggested Fluo table optimizations for the specified export queue.
+    *
+    * @param appConfig Must pass in the application configuration obtained from
+    *        {@code FluoClient.getAppConfiguration()} or
+    *        {@code FluoConfiguration.getAppConfiguration()}
+    */
 -  public static Pirtos getTableOptimizations(String queueId, SimpleConfiguration appConfig) {
++  public static TableOptimizations getTableOptimizations(String queueId,
++      SimpleConfiguration appConfig) {
+     Options opts = new Options(queueId, appConfig);
+ 
+     List<Bytes> splits = new ArrayList<>();
+ 
+     Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN);
+     Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END);
+ 
+     splits.add(exportRangeStart);
+     splits.add(exportRangeStop);
+ 
+     List<Bytes> exportSplits = new ArrayList<>();
+     for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+       exportSplits.add(ExportBucket.generateBucketRow(opts.queueId, i, opts.numBuckets));
+     }
+     Collections.sort(exportSplits);
+     splits.addAll(exportSplits);
+ 
 -    Pirtos pirtos = new Pirtos();
 -    pirtos.setSplits(splits);
++    TableOptimizations tableOptim = new TableOptimizations();
++    tableOptim.setSplits(splits);
+ 
+     // the tablet with end row <queueId># does not contain any data for the export queue and
+     // should not be grouped with the export queue
 -    pirtos.setTabletGroupingRegex(Pattern.quote(queueId + ":"));
++    tableOptim.setTabletGroupingRegex(Pattern.quote(queueId + ":"));
+ 
 -    return pirtos;
++    return tableOptim;
+   }
+ 
+   public static class Options {
+ 
+     private static final String PREFIX = "recipes.exportQueue.";
+     static final long DEFAULT_BUFFER_SIZE = 1 << 20;
+     static final int DEFAULT_BUCKETS_PER_TABLET = 10;
+ 
+     int numBuckets;
+     Integer bucketsPerTablet = null;
+     Long bufferSize;
+ 
+     String keyType;
+     String valueType;
+     String exporterType;
+     String queueId;
+ 
+     Options(String queueId, SimpleConfiguration appConfig) {
+       this.queueId = queueId;
+ 
+       this.numBuckets = appConfig.getInt(PREFIX + queueId + ".buckets");
+       this.exporterType = appConfig.getString(PREFIX + queueId + ".exporter");
+       this.keyType = appConfig.getString(PREFIX + queueId + ".key");
+       this.valueType = appConfig.getString(PREFIX + queueId + ".val");
+       this.bufferSize = appConfig.getLong(PREFIX + queueId + ".bufferSize", DEFAULT_BUFFER_SIZE);
+       this.bucketsPerTablet =
+           appConfig.getInt(PREFIX + queueId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
+     }
+ 
+     public Options(String queueId, String exporterType, String keyType, String valueType,
+         int buckets) {
+       Preconditions.checkArgument(buckets > 0);
+ 
+       this.queueId = queueId;
+       this.numBuckets = buckets;
+       this.exporterType = exporterType;
+       this.keyType = keyType;
+       this.valueType = valueType;
+     }
+ 
+ 
+     public <K, V> Options(String queueId, Class<? extends Exporter<K, V>> exporter,
+         Class<K> keyType, Class<V> valueType, int buckets) {
+       this(queueId, exporter.getName(), keyType.getName(), valueType.getName(), buckets);
+     }
+ 
+     /**
+      * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
+      * be used to actually deserialize and process the updates. This limit does not account for
+      * object overhead in java, which can be significant.
+      *
+      * <p>
+      * The way memory read is calculated is by summing the length of serialized key and value byte
+      * arrays. Once this sum exceeds the configured memory limit, no more export key values are
+      * processed in the current transaction. When not everything is processed, the observer
+      * processing exports will notify itself causing another transaction to continue processing
+      * later.
+      */
+     public Options setBufferSize(long bufferSize) {
+       Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
+       this.bufferSize = bufferSize;
+       return this;
+     }
+ 
+     long getBufferSize() {
+       if (bufferSize == null) {
+         return DEFAULT_BUFFER_SIZE;
+       }
+ 
+       return bufferSize;
+     }
+ 
+     /**
+      * Sets the number of buckets per tablet to generate. This affects how many split points will be
+      * generated when optimizing the Accumulo table.
+      *
+      */
+     public Options setBucketsPerTablet(int bucketsPerTablet) {
+       Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
+           + bucketsPerTablet);
+       this.bucketsPerTablet = bucketsPerTablet;
+       return this;
+     }
+ 
+     int getBucketsPerTablet() {
+       if (bucketsPerTablet == null) {
+         return DEFAULT_BUCKETS_PER_TABLET;
+       }
+ 
+       return bucketsPerTablet;
+     }
+ 
+     void save(SimpleConfiguration appConfig) {
+       appConfig.setProperty(PREFIX + queueId + ".buckets", numBuckets + "");
+       appConfig.setProperty(PREFIX + queueId + ".exporter", exporterType + "");
+       appConfig.setProperty(PREFIX + queueId + ".key", keyType);
+       appConfig.setProperty(PREFIX + queueId + ".val", valueType);
+ 
+       if (bufferSize != null) {
+         appConfig.setProperty(PREFIX + queueId + ".bufferSize", bufferSize);
+       }
+       if (bucketsPerTablet != null) {
+         appConfig.setProperty(PREFIX + queueId + ".bucketsPerTablet", bucketsPerTablet);
+       }
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
index 0000000,6cbc658..2fe4a7c
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
@@@ -1,0 -1,657 +1,657 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+  * agreements. See the NOTICE file distributed with this work for additional information regarding
+  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance with the License. You may obtain a
+  * copy of the License at
+  * 
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * 
+  * Unless required by applicable law or agreed to in writing, software distributed under the License
+  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+  * or implied. See the License for the specific language governing permissions and limitations under
+  * the License.
+  */
+ 
+ package org.apache.fluo.recipes.core.map;
+ 
+ import java.io.Serializable;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.Optional;
+ import java.util.Set;
+ import java.util.regex.Pattern;
+ 
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.ImmutableMap;
+ import com.google.common.collect.Iterators;
+ import com.google.common.collect.Sets;
+ import com.google.common.hash.Hashing;
+ import org.apache.fluo.api.client.SnapshotBase;
+ import org.apache.fluo.api.client.TransactionBase;
+ import org.apache.fluo.api.config.FluoConfiguration;
+ import org.apache.fluo.api.config.ObserverConfiguration;
+ import org.apache.fluo.api.config.ScannerConfiguration;
+ import org.apache.fluo.api.config.SimpleConfiguration;
+ import org.apache.fluo.api.data.Bytes;
+ import org.apache.fluo.api.data.BytesBuilder;
+ import org.apache.fluo.api.data.Column;
+ import org.apache.fluo.api.data.RowColumn;
+ import org.apache.fluo.api.data.RowColumnValue;
+ import org.apache.fluo.api.data.Span;
+ import org.apache.fluo.api.iterator.ColumnIterator;
+ import org.apache.fluo.api.iterator.RowIterator;
 -import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ import org.apache.fluo.recipes.core.common.RowRange;
+ import org.apache.fluo.recipes.core.common.TransientRegistry;
+ import org.apache.fluo.recipes.core.impl.BucketUtil;
+ import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+ 
+ /**
+  * See the project level documentation for information about this recipe.
+  *
+  * @since 1.0.0
+  */
+ public class CollisionFreeMap<K, V> {
+ 
+   private static final String UPDATE_RANGE_END = ":u:~";
+ 
+   private static final String DATA_RANGE_END = ":d:~";
+ 
+   private String mapId;
+ 
+   private Class<K> keyType;
+   private Class<V> valType;
+   private SimpleSerializer serializer;
+   private Combiner<K, V> combiner;
+   UpdateObserver<K, V> updateObserver;
+   private long bufferSize;
+ 
+   static final Column UPDATE_COL = new Column("u", "v");
+   static final Column NEXT_COL = new Column("u", "next");
+ 
+   private int numBuckets = -1;
+ 
+   @SuppressWarnings("unchecked")
+   CollisionFreeMap(Options opts, SimpleSerializer serializer) throws Exception {
+ 
+     this.mapId = opts.mapId;
+     // TODO defer loading classes
+     // TODO centralize class loading
+     // TODO try to check type params
+     this.numBuckets = opts.numBuckets;
+     this.keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType);
+     this.valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType);
+     this.combiner =
+         (Combiner<K, V>) getClass().getClassLoader().loadClass(opts.combinerType).newInstance();
+     this.serializer = serializer;
+     if (opts.updateObserverType != null) {
+       this.updateObserver =
+           getClass().getClassLoader().loadClass(opts.updateObserverType)
+               .asSubclass(UpdateObserver.class).newInstance();
+     } else {
+       this.updateObserver = new NullUpdateObserver<>();
+     }
+     this.bufferSize = opts.getBufferSize();
+   }
+ 
+   private V deserVal(Bytes val) {
+     return serializer.deserialize(val.toArray(), valType);
+   }
+ 
+   private Bytes getKeyFromUpdateRow(Bytes prefix, Bytes row) {
+     return row.subSequence(prefix.length(), row.length() - 8);
+   }
+ 
+   void process(TransactionBase tx, Bytes ntfyRow, Column col) throws Exception {
+ 
+     Bytes nextKey = tx.get(ntfyRow, NEXT_COL);
+ 
+     ScannerConfiguration sc = new ScannerConfiguration();
+ 
+     if (nextKey != null) {
+       Bytes startRow =
+           Bytes.newBuilder(ntfyRow.length() + nextKey.length()).append(ntfyRow).append(nextKey)
+               .toBytes();
+       Span tmpSpan = Span.prefix(ntfyRow);
+       Span nextSpan =
+           new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(),
+               tmpSpan.isEndInclusive());
+       sc.setSpan(nextSpan);
+     } else {
+       sc.setSpan(Span.prefix(ntfyRow));
+     }
+ 
+     sc.setSpan(Span.prefix(ntfyRow));
+     sc.fetchColumn(UPDATE_COL.getFamily(), UPDATE_COL.getQualifier());
+     RowIterator iter = tx.get(sc);
+ 
+     Map<Bytes, List<Bytes>> updates = new HashMap<>();
+ 
+     long approxMemUsed = 0;
+ 
+     Bytes partiallyReadKey = null;
+ 
+     if (iter.hasNext()) {
+       Bytes lastKey = null;
+       while (iter.hasNext() && approxMemUsed < bufferSize) {
+         Entry<Bytes, ColumnIterator> rowCol = iter.next();
+         Bytes curRow = rowCol.getKey();
+ 
+         tx.delete(curRow, UPDATE_COL);
+ 
+         Bytes serializedKey = getKeyFromUpdateRow(ntfyRow, curRow);
+         lastKey = serializedKey;
+ 
+         List<Bytes> updateList = updates.get(serializedKey);
+         if (updateList == null) {
+           updateList = new ArrayList<>();
+           updates.put(serializedKey, updateList);
+         }
+ 
+         Bytes val = rowCol.getValue().next().getValue();
+         updateList.add(val);
+ 
+         approxMemUsed += curRow.length();
+         approxMemUsed += val.length();
+       }
+ 
+       if (iter.hasNext()) {
+         Entry<Bytes, ColumnIterator> rowCol = iter.next();
+         Bytes curRow = rowCol.getKey();
+ 
+         // check if more updates for last key
+         if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) {
+           // there are still more updates for this key
+           partiallyReadKey = lastKey;
+ 
+           // start next time at the current key
+           tx.set(ntfyRow, NEXT_COL, partiallyReadKey);
+         } else {
+           // start next time at the next possible key
+           Bytes nextPossible =
+               Bytes.newBuilder(lastKey.length() + 1).append(lastKey).append(new byte[] {0})
+                   .toBytes();
+           tx.set(ntfyRow, NEXT_COL, nextPossible);
+         }
+ 
+         // may not read all data because of mem limit, so notify self
+         tx.setWeakNotification(ntfyRow, col);
+       } else if (nextKey != null) {
+         // clear nextKey
+         tx.delete(ntfyRow, NEXT_COL);
+       }
+     } else if (nextKey != null) {
+       tx.delete(ntfyRow, NEXT_COL);
+     }
+ 
+     byte[] dataPrefix = ntfyRow.toArray();
+     // TODO this is awful... no sanity check... hard to read
+     dataPrefix[Bytes.of(mapId).length() + 1] = 'd';
+ 
+     BytesBuilder rowBuilder = Bytes.newBuilder();
+     rowBuilder.append(dataPrefix);
+     int rowPrefixLen = rowBuilder.getLength();
+ 
+     Set<Bytes> keysToFetch = updates.keySet();
+     if (partiallyReadKey != null) {
+       final Bytes prk = partiallyReadKey;
+       keysToFetch = Sets.filter(keysToFetch, b -> !b.equals(prk));
+     }
+     Map<Bytes, Map<Column, Bytes>> currentVals = getCurrentValues(tx, rowBuilder, keysToFetch);
+ 
+     ArrayList<Update<K, V>> updatesToReport = new ArrayList<>(updates.size());
+ 
+     for (Entry<Bytes, List<Bytes>> entry : updates.entrySet()) {
+       rowBuilder.setLength(rowPrefixLen);
+       Bytes currentValueRow = rowBuilder.append(entry.getKey()).toBytes();
+       Bytes currVal =
+           currentVals.getOrDefault(currentValueRow, Collections.emptyMap()).get(DATA_COLUMN);
+ 
+       Iterator<V> ui = Iterators.transform(entry.getValue().iterator(), this::deserVal);
+ 
+       K kd = serializer.deserialize(entry.getKey().toArray(), keyType);
+ 
+       if (partiallyReadKey != null && partiallyReadKey.equals(entry.getKey())) {
+         // not all updates were read for this key, so requeue the combined updates as an update
+         Optional<V> nv = combiner.combine(kd, ui);
+         if (nv.isPresent()) {
+           update(tx, Collections.singletonMap(kd, nv.get()));
+         }
+       } else {
+         Optional<V> nv = combiner.combine(kd, concat(ui, currVal));
+         Bytes newVal = nv.isPresent() ? Bytes.of(serializer.serialize(nv.get())) : null;
+         if (newVal != null ^ currVal != null || (currVal != null && !currVal.equals(newVal))) {
+           if (newVal == null) {
+             tx.delete(currentValueRow, DATA_COLUMN);
+           } else {
+             tx.set(currentValueRow, DATA_COLUMN, newVal);
+           }
+ 
+           Optional<V> cvd = Optional.ofNullable(currVal).map(this::deserVal);
+           updatesToReport.add(new Update<>(kd, cvd, nv));
+         }
+       }
+     }
+ 
+     // TODO could clear these as converted to objects to avoid double memory usage
+     updates.clear();
+     currentVals.clear();
+ 
+     if (updatesToReport.size() > 0) {
+       updateObserver.updatingValues(tx, updatesToReport.iterator());
+     }
+   }
+ 
+   private static final Column DATA_COLUMN = new Column("data", "current");
+ 
+   private Map<Bytes, Map<Column, Bytes>> getCurrentValues(TransactionBase tx, BytesBuilder prefix,
+       Set<Bytes> keySet) {
+ 
+     Set<Bytes> rows = new HashSet<>();
+ 
+     int prefixLen = prefix.getLength();
+     for (Bytes key : keySet) {
+       prefix.setLength(prefixLen);
+       rows.add(prefix.append(key).toBytes());
+     }
+ 
+     try {
+       return tx.get(rows, Collections.singleton(DATA_COLUMN));
+     } catch (IllegalArgumentException e) {
+       System.out.println(rows.size());
+       throw e;
+     }
+   }
+ 
+   private Iterator<V> concat(Iterator<V> updates, Bytes currentVal) {
+     if (currentVal == null) {
+       return updates;
+     }
+ 
+     return Iterators.concat(updates, Iterators.singletonIterator(deserVal(currentVal)));
+   }
+ 
+   /**
+    * This method will retrieve the current value for key and any outstanding updates and combine
+    * them using the configured {@link Combiner}. The result from the combiner is returned.
+    */
+   public V get(SnapshotBase tx, K key) {
+ 
+     byte[] k = serializer.serialize(key);
+ 
+     int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+     String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+ 
+ 
+     BytesBuilder rowBuilder = Bytes.newBuilder();
+     rowBuilder.append(mapId).append(":u:").append(bucketId).append(":").append(k);
+ 
+     ScannerConfiguration sc = new ScannerConfiguration();
+     sc.setSpan(Span.prefix(rowBuilder.toBytes()));
+ 
+     RowIterator iter = tx.get(sc);
+ 
+     Iterator<V> ui;
+ 
+     if (iter.hasNext()) {
+       ui = Iterators.transform(iter, e -> deserVal(e.getValue().next().getValue()));
+     } else {
+       ui = Collections.<V>emptyList().iterator();
+     }
+ 
+     rowBuilder.setLength(mapId.length());
+     rowBuilder.append(":d:").append(bucketId).append(":").append(k);
+ 
+     Bytes dataRow = rowBuilder.toBytes();
+ 
+     Bytes cv = tx.get(dataRow, DATA_COLUMN);
+ 
+     if (!ui.hasNext()) {
+       if (cv == null) {
+         return null;
+       } else {
+         return deserVal(cv);
+       }
+     }
+ 
+     return combiner.combine(key, concat(ui, cv)).orElse(null);
+   }
+ 
+   String getId() {
+     return mapId;
+   }
+ 
+   /**
+    * Queues updates for a collision free map. These updates will be made by an Observer executing
+    * another transaction. This method will not collide with other transaction queuing updates for
+    * the same keys.
+    *
+    * @param tx This transaction will be used to make the updates.
+    * @param updates The keys in the map should correspond to keys in the collision free map being
+    *        updated. The values in the map will be queued for updating.
+    */
+   public void update(TransactionBase tx, Map<K, V> updates) {
+     Preconditions.checkState(numBuckets > 0, "Not initialized");
+ 
+     Set<String> buckets = new HashSet<>();
+ 
+     BytesBuilder rowBuilder = Bytes.newBuilder();
+     rowBuilder.append(mapId).append(":u:");
+     int prefixLength = rowBuilder.getLength();
+ 
+     byte[] startTs = encSeq(tx.getStartTimestamp());
+ 
+     for (Entry<K, V> entry : updates.entrySet()) {
+       byte[] k = serializer.serialize(entry.getKey());
+       int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+       String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+ 
+       // reset to the common row prefix
+       rowBuilder.setLength(prefixLength);
+ 
+       Bytes row = rowBuilder.append(bucketId).append(":").append(k).append(startTs).toBytes();
+       Bytes val = Bytes.of(serializer.serialize(entry.getValue()));
+ 
+       // TODO set if not exists would be comforting here.... but
+       // collisions on bucketId+key+uuid should never occur
+       tx.set(row, UPDATE_COL, val);
+ 
+       buckets.add(bucketId);
+     }
+ 
+     for (String bucketId : buckets) {
+       rowBuilder.setLength(prefixLength);
+       rowBuilder.append(bucketId).append(":");
+ 
+       Bytes row = rowBuilder.toBytes();
+ 
+       tx.setWeakNotification(row, new Column("fluoRecipes", "cfm:" + mapId));
+     }
+   }
+ 
+   public static <K2, V2> CollisionFreeMap<K2, V2> getInstance(String mapId,
+       SimpleConfiguration appConf) {
+     Options opts = new Options(mapId, appConf);
+     try {
+       return new CollisionFreeMap<>(opts, SimpleSerializer.getInstance(appConf));
+     } catch (Exception e) {
+       // TODO
+       throw new RuntimeException(e);
+     }
+   }
+ 
+   /**
+    * A @link {@link CollisionFreeMap} stores data in its own data format in the Fluo table. When
+    * initializing a Fluo table with something like Map Reduce or Spark, data will need to be written
+    * in this format. Thats the purpose of this method, it provide a simple class that can do this
+    * conversion.
+    *
+    */
+   public static <K2, V2> Initializer<K2, V2> getInitializer(String mapId, int numBuckets,
+       SimpleSerializer serializer) {
+     return new Initializer<>(mapId, numBuckets, serializer);
+   }
+ 
+   /**
+    * @see CollisionFreeMap#getInitializer(String, int, SimpleSerializer)
+    */
+   public static class Initializer<K2, V2> implements Serializable {
+ 
+     private static final long serialVersionUID = 1L;
+ 
+     private String mapId;
+ 
+     private SimpleSerializer serializer;
+ 
+     private int numBuckets = -1;
+ 
+     private Initializer(String mapId, int numBuckets, SimpleSerializer serializer) {
+       this.mapId = mapId;
+       this.numBuckets = numBuckets;
+       this.serializer = serializer;
+     }
+ 
+     public RowColumnValue convert(K2 key, V2 val) {
+       byte[] k = serializer.serialize(key);
+       int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+       String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+ 
+       BytesBuilder bb = Bytes.newBuilder();
+       Bytes row = bb.append(mapId).append(":d:").append(bucketId).append(":").append(k).toBytes();
+       byte[] v = serializer.serialize(val);
+ 
+       return new RowColumnValue(row, DATA_COLUMN, Bytes.of(v));
+     }
+   }
+ 
+   public static class Options {
+ 
+     static final long DEFAULT_BUFFER_SIZE = 1 << 22;
+     static final int DEFAULT_BUCKETS_PER_TABLET = 10;
+ 
+     int numBuckets;
+     Integer bucketsPerTablet = null;
+ 
+     Long bufferSize;
+ 
+     String keyType;
+     String valueType;
+     String combinerType;
+     String updateObserverType;
+     String mapId;
+ 
+     private static final String PREFIX = "recipes.cfm.";
+ 
+     Options(String mapId, SimpleConfiguration appConfig) {
+       this.mapId = mapId;
+ 
+       this.numBuckets = appConfig.getInt(PREFIX + mapId + ".buckets");
+       this.combinerType = appConfig.getString(PREFIX + mapId + ".combiner");
+       this.keyType = appConfig.getString(PREFIX + mapId + ".key");
+       this.valueType = appConfig.getString(PREFIX + mapId + ".val");
+       this.updateObserverType = appConfig.getString(PREFIX + mapId + ".updateObserver", null);
+       this.bufferSize = appConfig.getLong(PREFIX + mapId + ".bufferSize", DEFAULT_BUFFER_SIZE);
+       this.bucketsPerTablet =
+           appConfig.getInt(PREFIX + mapId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
+     }
+ 
+     public Options(String mapId, String combinerType, String keyType, String valType, int buckets) {
+       Preconditions.checkArgument(buckets > 0);
+       Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
+ 
+       this.mapId = mapId;
+       this.numBuckets = buckets;
+       this.combinerType = combinerType;
+       this.updateObserverType = null;
+       this.keyType = keyType;
+       this.valueType = valType;
+     }
+ 
+     public Options(String mapId, String combinerType, String updateObserverType, String keyType,
+         String valueType, int buckets) {
+       Preconditions.checkArgument(buckets > 0);
+       Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
+ 
+       this.mapId = mapId;
+       this.numBuckets = buckets;
+       this.combinerType = combinerType;
+       this.updateObserverType = updateObserverType;
+       this.keyType = keyType;
+       this.valueType = valueType;
+     }
+ 
+     /**
+      * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
+      * be used to actually deserialize and process the updates. This limit does not account for
+      * object overhead in java, which can be significant.
+      *
+      * <p>
+      * The way memory read is calculated is by summing the length of serialized key and value byte
+      * arrays. Once this sum exceeds the configured memory limit, no more update key values are
+      * processed in the current transaction. When not everything is processed, the observer
+      * processing updates will notify itself causing another transaction to continue processing
+      * later
+      */
+     public Options setBufferSize(long bufferSize) {
+       Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
+       this.bufferSize = bufferSize;
+       return this;
+     }
+ 
+     long getBufferSize() {
+       if (bufferSize == null) {
+         return DEFAULT_BUFFER_SIZE;
+       }
+ 
+       return bufferSize;
+     }
+ 
+     /**
+      * Sets the number of buckets per tablet to generate. This affects how many split points will be
+      * generated when optimizing the Accumulo table.
+      *
+      */
+     public Options setBucketsPerTablet(int bucketsPerTablet) {
+       Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
+           + bucketsPerTablet);
+       this.bucketsPerTablet = bucketsPerTablet;
+       return this;
+     }
+ 
+     int getBucketsPerTablet() {
+       if (bucketsPerTablet == null) {
+         return DEFAULT_BUCKETS_PER_TABLET;
+       }
+ 
+       return bucketsPerTablet;
+     }
+ 
+     public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner, Class<K> keyType,
+         Class<V> valueType, int buckets) {
+       this(mapId, combiner.getName(), keyType.getName(), valueType.getName(), buckets);
+     }
+ 
+     public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner,
+         Class<? extends UpdateObserver<K, V>> updateObserver, Class<K> keyType, Class<V> valueType,
+         int buckets) {
+       this(mapId, combiner.getName(), updateObserver.getName(), keyType.getName(), valueType
+           .getName(), buckets);
+     }
+ 
+     void save(SimpleConfiguration appConfig) {
+       appConfig.setProperty(PREFIX + mapId + ".buckets", numBuckets + "");
+       appConfig.setProperty(PREFIX + mapId + ".combiner", combinerType + "");
+       appConfig.setProperty(PREFIX + mapId + ".key", keyType);
+       appConfig.setProperty(PREFIX + mapId + ".val", valueType);
+       if (updateObserverType != null) {
+         appConfig.setProperty(PREFIX + mapId + ".updateObserver", updateObserverType + "");
+       }
+       if (bufferSize != null) {
+         appConfig.setProperty(PREFIX + mapId + ".bufferSize", bufferSize);
+       }
+       if (bucketsPerTablet != null) {
+         appConfig.setProperty(PREFIX + mapId + ".bucketsPerTablet", bucketsPerTablet);
+       }
+     }
+   }
+ 
+   /**
+    * This method configures a collision free map for use. It must be called before initializing
+    * Fluo.
+    */
+   public static void configure(FluoConfiguration fluoConfig, Options opts) {
+     opts.save(fluoConfig.getAppConfiguration());
+     fluoConfig.addObserver(new ObserverConfiguration(CollisionFreeMapObserver.class.getName())
+         .setParameters(ImmutableMap.of("mapId", opts.mapId)));
+ 
+     Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
+     Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
+ 
+     new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("cfm." + opts.mapId,
+         new RowRange(dataRangeEnd, updateRangeEnd));
+   }
+ 
+   /**
+    * Return suggested Fluo table optimizations for all previously configured collision free maps.
+    *
+    * @param appConfig Must pass in the application configuration obtained from
+    *        {@code FluoClient.getAppConfiguration()} or
+    *        {@code FluoConfiguration.getAppConfiguration()}
+    */
 -  public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
++  public static TableOptimizations getTableOptimizations(SimpleConfiguration appConfig) {
+     HashSet<String> mapIds = new HashSet<>();
+     appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
+         k -> mapIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
+ 
 -    Pirtos pirtos = new Pirtos();
 -    mapIds.forEach(mid -> pirtos.merge(getTableOptimizations(mid, appConfig)));
++    TableOptimizations tableOptim = new TableOptimizations();
++    mapIds.forEach(mid -> tableOptim.merge(getTableOptimizations(mid, appConfig)));
+ 
 -    return pirtos;
++    return tableOptim;
+   }
+ 
+   /**
+    * Return suggested Fluo table optimizations for the specified collisiong free map.
+    *
+    * @param appConfig Must pass in the application configuration obtained from
+    *        {@code FluoClient.getAppConfiguration()} or
+    *        {@code FluoConfiguration.getAppConfiguration()}
+    */
 -  public static Pirtos getTableOptimizations(String mapId, SimpleConfiguration appConfig) {
++  public static TableOptimizations getTableOptimizations(String mapId, SimpleConfiguration appConfig) {
+     Options opts = new Options(mapId, appConfig);
+ 
+     BytesBuilder rowBuilder = Bytes.newBuilder();
+     rowBuilder.append(mapId);
+ 
+     List<Bytes> dataSplits = new ArrayList<>();
+     for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+       String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
+       rowBuilder.setLength(mapId.length());
+       dataSplits.add(rowBuilder.append(":d:").append(bucketId).toBytes());
+     }
+     Collections.sort(dataSplits);
+ 
+     List<Bytes> updateSplits = new ArrayList<>();
+     for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+       String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
+       rowBuilder.setLength(mapId.length());
+       updateSplits.add(rowBuilder.append(":u:").append(bucketId).toBytes());
+     }
+     Collections.sort(updateSplits);
+ 
+     Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
+     Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
+ 
+     List<Bytes> splits = new ArrayList<>();
+     splits.add(dataRangeEnd);
+     splits.add(updateRangeEnd);
+     splits.addAll(dataSplits);
+     splits.addAll(updateSplits);
+ 
 -    Pirtos pirtos = new Pirtos();
 -    pirtos.setSplits(splits);
++    TableOptimizations tableOptim = new TableOptimizations();
++    tableOptim.setSplits(splits);
+ 
 -    pirtos.setTabletGroupingRegex(Pattern.quote(mapId + ":") + "[du]:");
++    tableOptim.setTabletGroupingRegex(Pattern.quote(mapId + ":") + "[du]:");
+ 
 -    return pirtos;
++    return tableOptim;
+   }
+ 
+   private static byte[] encSeq(long l) {
+     byte[] ret = new byte[8];
+     ret[0] = (byte) (l >>> 56);
+     ret[1] = (byte) (l >>> 48);
+     ret[2] = (byte) (l >>> 40);
+     ret[3] = (byte) (l >>> 32);
+     ret[4] = (byte) (l >>> 24);
+     ret[5] = (byte) (l >>> 16);
+     ret[6] = (byte) (l >>> 8);
+     ret[7] = (byte) (l >>> 0);
+     return ret;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
index 0000000,5a1f5fe..8ceda12
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
@@@ -1,0 -1,92 +1,93 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+  * agreements. See the NOTICE file distributed with this work for additional information regarding
+  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance with the License. You may obtain a
+  * copy of the License at
+  * 
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * 
+  * Unless required by applicable law or agreed to in writing, software distributed under the License
+  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+  * or implied. See the License for the specific language governing permissions and limitations under
+  * the License.
+  */
+ 
+ package org.apache.fluo.recipes.core.common;
+ 
+ import java.util.Set;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ import com.google.common.collect.ImmutableSet;
+ import org.apache.fluo.api.config.FluoConfiguration;
+ import org.apache.fluo.api.data.Bytes;
+ import org.apache.fluo.recipes.core.export.ExportQueue;
+ import org.apache.fluo.recipes.core.map.CollisionFreeMap;
+ import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ public class TestGrouping {
+   @Test
+   public void testTabletGrouping() {
+     FluoConfiguration conf = new FluoConfiguration();
+ 
+     CollisionFreeMap.configure(conf, new Options("m1", "ct", "kt", "vt", 119));
+     CollisionFreeMap.configure(conf, new Options("m2", "ct", "kt", "vt", 3));
+ 
+     ExportQueue.configure(conf, new ExportQueue.Options("eq1", "et", "kt", "vt", 7));
+     ExportQueue.configure(conf, new ExportQueue.Options("eq2", "et", "kt", "vt", 3));
+ 
 -    Pirtos pirtos = CollisionFreeMap.getTableOptimizations(conf.getAppConfiguration());
 -    pirtos.merge(ExportQueue.getTableOptimizations(conf.getAppConfiguration()));
++    TableOptimizations tableOptim =
++        CollisionFreeMap.getTableOptimizations(conf.getAppConfiguration());
++    tableOptim.merge(ExportQueue.getTableOptimizations(conf.getAppConfiguration()));
+ 
 -    Pattern pattern = Pattern.compile(pirtos.getTabletGroupingRegex());
++    Pattern pattern = Pattern.compile(tableOptim.getTabletGroupingRegex());
+ 
+     Assert.assertEquals("m1:u:", group(pattern, "m1:u:f0c"));
+     Assert.assertEquals("m1:d:", group(pattern, "m1:d:f0c"));
+     Assert.assertEquals("m2:u:", group(pattern, "m2:u:abc"));
+     Assert.assertEquals("m2:d:", group(pattern, "m2:d:590"));
+     Assert.assertEquals("none", group(pattern, "m3:d:590"));
+ 
+     Assert.assertEquals("eq1:", group(pattern, "eq1:f0c"));
+     Assert.assertEquals("eq2:", group(pattern, "eq2:f0c"));
+     Assert.assertEquals("none", group(pattern, "eq3:f0c"));
+ 
+     // validate the assumptions this test is making
 -    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq1#")));
 -    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq2#")));
 -    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq1:~")));
 -    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq2:~")));
 -    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m1:u:~")));
 -    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m1:d:~")));
 -    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m2:u:~")));
 -    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m2:d:~")));
++    Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("eq1#")));
++    Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("eq2#")));
++    Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("eq1:~")));
++    Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("eq2:~")));
++    Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("m1:u:~")));
++    Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("m1:d:~")));
++    Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("m2:u:~")));
++    Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("m2:d:~")));
+ 
+     Set<String> expectedGroups =
+         ImmutableSet.of("m1:u:", "m1:d:", "m2:u:", "m2:d:", "eq1:", "eq2:");
+ 
+     // ensure all splits group as expected
 -    for (Bytes split : pirtos.getSplits()) {
++    for (Bytes split : tableOptim.getSplits()) {
+       String g = group(pattern, split.toString());
+ 
+       if (expectedGroups.contains(g)) {
+         Assert.assertTrue(split.toString().startsWith(g));
+       } else {
+         Assert.assertEquals("none", g);
+         Assert.assertTrue(split.toString().equals("eq1#") || split.toString().equals("eq2#"));
+       }
+ 
+     }
+ 
+   }
+ 
+   private String group(Pattern pattern, String endRow) {
+     Matcher m = pattern.matcher(endRow);
+     if (m.matches() && m.groupCount() == 1) {
+       return m.group(1);
+     }
+     return "none";
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
index 0000000,a359598..8259469
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
@@@ -1,0 -1,75 +1,76 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+  * agreements. See the NOTICE file distributed with this work for additional information regarding
+  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance with the License. You may obtain a
+  * copy of the License at
+  * 
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * 
+  * Unless required by applicable law or agreed to in writing, software distributed under the License
+  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+  * or implied. See the License for the specific language governing permissions and limitations under
+  * the License.
+  */
+ 
+ package org.apache.fluo.recipes.core.map;
+ 
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.List;
+ 
+ import com.google.common.collect.Lists;
+ import org.apache.fluo.api.config.FluoConfiguration;
+ import org.apache.fluo.api.data.Bytes;
 -import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ public class SplitsTest {
+   private static List<Bytes> sort(List<Bytes> in) {
+     ArrayList<Bytes> out = new ArrayList<>(in);
+     Collections.sort(out);
+     return out;
+   }
+ 
+   @Test
+   public void testSplits() {
+ 
+     Options opts = new Options("foo", WordCountCombiner.class, String.class, Long.class, 3);
+     opts.setBucketsPerTablet(1);
+     FluoConfiguration fluoConfig = new FluoConfiguration();
+     CollisionFreeMap.configure(fluoConfig, opts);
+ 
 -    Pirtos pirtos1 =
++    TableOptimizations tableOptim1 =
+         CollisionFreeMap.getTableOptimizations("foo", fluoConfig.getAppConfiguration());
+     List<Bytes> expected1 =
+         Lists.transform(
+             Arrays.asList("foo:d:1", "foo:d:2", "foo:d:~", "foo:u:1", "foo:u:2", "foo:u:~"),
+             Bytes::of);
+ 
 -    Assert.assertEquals(expected1, sort(pirtos1.getSplits()));
++    Assert.assertEquals(expected1, sort(tableOptim1.getSplits()));
+ 
+     Options opts2 = new Options("bar", WordCountCombiner.class, String.class, Long.class, 6);
+     opts2.setBucketsPerTablet(2);
+     CollisionFreeMap.configure(fluoConfig, opts2);
+ 
 -    Pirtos pirtos2 =
++    TableOptimizations tableOptim2 =
+         CollisionFreeMap.getTableOptimizations("bar", fluoConfig.getAppConfiguration());
+     List<Bytes> expected2 =
+         Lists.transform(
+             Arrays.asList("bar:d:2", "bar:d:4", "bar:d:~", "bar:u:2", "bar:u:4", "bar:u:~"),
+             Bytes::of);
 -    Assert.assertEquals(expected2, sort(pirtos2.getSplits()));
++    Assert.assertEquals(expected2, sort(tableOptim2.getSplits()));
+ 
 -    Pirtos pirtos3 = CollisionFreeMap.getTableOptimizations(fluoConfig.getAppConfiguration());
++    TableOptimizations tableOptim3 =
++        CollisionFreeMap.getTableOptimizations(fluoConfig.getAppConfiguration());
+ 
+     ArrayList<Bytes> expected3 = new ArrayList<>(expected2);
+     expected3.addAll(expected1);
+ 
 -    Assert.assertEquals(expected3, sort(pirtos3.getSplits()));
++    Assert.assertEquals(expected3, sort(tableOptim3.getSplits()));
+ 
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
----------------------------------------------------------------------
diff --cc modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
index 07cff67,c1adc3b..00795f4
--- a/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
+++ b/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
@@@ -31,7 -31,7 +31,7 @@@ import org.apache.fluo.api.client.FluoF
  import org.apache.fluo.api.config.FluoConfiguration;
  import org.apache.fluo.api.mini.MiniFluo;
  import org.apache.fluo.recipes.accumulo.ops.TableOperations;
- import org.apache.fluo.recipes.common.TableOptimizations;
 -import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
  import org.junit.After;
  import org.junit.AfterClass;
  import org.junit.Before;