You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 22:07:51 UTC

[03/10] incubator-fluo-recipes git commit: Updated package names in core module

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
new file mode 100644
index 0000000..5a1f5fe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.common;
+
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.recipes.core.export.ExportQueue;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestGrouping {
+  @Test
+  public void testTabletGrouping() {
+    FluoConfiguration conf = new FluoConfiguration();
+
+    CollisionFreeMap.configure(conf, new Options("m1", "ct", "kt", "vt", 119));
+    CollisionFreeMap.configure(conf, new Options("m2", "ct", "kt", "vt", 3));
+
+    ExportQueue.configure(conf, new ExportQueue.Options("eq1", "et", "kt", "vt", 7));
+    ExportQueue.configure(conf, new ExportQueue.Options("eq2", "et", "kt", "vt", 3));
+
+    Pirtos pirtos = CollisionFreeMap.getTableOptimizations(conf.getAppConfiguration());
+    pirtos.merge(ExportQueue.getTableOptimizations(conf.getAppConfiguration()));
+
+    Pattern pattern = Pattern.compile(pirtos.getTabletGroupingRegex());
+
+    Assert.assertEquals("m1:u:", group(pattern, "m1:u:f0c"));
+    Assert.assertEquals("m1:d:", group(pattern, "m1:d:f0c"));
+    Assert.assertEquals("m2:u:", group(pattern, "m2:u:abc"));
+    Assert.assertEquals("m2:d:", group(pattern, "m2:d:590"));
+    Assert.assertEquals("none", group(pattern, "m3:d:590"));
+
+    Assert.assertEquals("eq1:", group(pattern, "eq1:f0c"));
+    Assert.assertEquals("eq2:", group(pattern, "eq2:f0c"));
+    Assert.assertEquals("none", group(pattern, "eq3:f0c"));
+
+    // validate the assumptions this test is making
+    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq1#")));
+    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq2#")));
+    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq1:~")));
+    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq2:~")));
+    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m1:u:~")));
+    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m1:d:~")));
+    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m2:u:~")));
+    Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m2:d:~")));
+
+    Set<String> expectedGroups =
+        ImmutableSet.of("m1:u:", "m1:d:", "m2:u:", "m2:d:", "eq1:", "eq2:");
+
+    // ensure all splits group as expected
+    for (Bytes split : pirtos.getSplits()) {
+      String g = group(pattern, split.toString());
+
+      if (expectedGroups.contains(g)) {
+        Assert.assertTrue(split.toString().startsWith(g));
+      } else {
+        Assert.assertEquals("none", g);
+        Assert.assertTrue(split.toString().equals("eq1#") || split.toString().equals("eq2#"));
+      }
+
+    }
+
+  }
+
+  private String group(Pattern pattern, String endRow) {
+    Matcher m = pattern.matcher(endRow);
+    if (m.matches() && m.groupCount() == 1) {
+      return m.group(1);
+    }
+    return "none";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TransientRegistryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TransientRegistryTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TransientRegistryTest.java
new file mode 100644
index 0000000..a1be5c9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TransientRegistryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.common;
+
+import java.util.HashSet;
+
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TransientRegistryTest {
+  @Test
+  public void testBasic() {
+    FluoConfiguration fluoConfig = new FluoConfiguration();
+
+    HashSet<RowRange> expected = new HashSet<>();
+
+    TransientRegistry tr = new TransientRegistry(fluoConfig.getAppConfiguration());
+
+    RowRange rr1 = new RowRange(Bytes.of("pr1:g"), Bytes.of("pr1:q"));
+    tr.addTransientRange("foo", rr1);
+    expected.add(rr1);
+
+    tr = new TransientRegistry(fluoConfig.getAppConfiguration());
+    Assert.assertEquals(expected, new HashSet<>(tr.getTransientRanges()));
+
+    RowRange rr2 = new RowRange(Bytes.of("pr2:j"), Bytes.of("pr2:m"));
+    tr.addTransientRange("bar", rr2);
+    expected.add(rr2);
+
+    tr = new TransientRegistry(fluoConfig.getAppConfiguration());
+    Assert.assertEquals(expected, new HashSet<>(tr.getTransientRanges()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java
new file mode 100644
index 0000000..9250d73
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.data;
+
+import org.apache.fluo.api.data.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RowHasherTest {
+
+  @Test
+  public void testBadPrefixes() {
+    String[] badPrefixes =
+        {"q:she6:test1", "q:she6:test1", "p:Mhe6:test1", "p;she6:test1", "p:she6;test1",
+            "p;she6;test1", "p:+he6:test1", "p:s?e6:test1", "p:sh{6:test1", "p:sh6:"};
+
+    RowHasher rh = new RowHasher("p");
+    for (String badPrefix : badPrefixes) {
+      try {
+        rh.removeHash(Bytes.of(badPrefix));
+        Assert.fail();
+      } catch (IllegalArgumentException e) {
+      }
+    }
+  }
+
+  @Test
+  public void testBasic() {
+    RowHasher rh = new RowHasher("p");
+    Assert.assertTrue(rh.removeHash(rh.addHash("abc")).toString().equals("abc"));
+    rh = new RowHasher("p2");
+    Assert.assertTrue(rh.removeHash(rh.addHash("abc")).toString().equals("abc"));
+
+    Assert.assertTrue(rh.addHash("abc").toString().startsWith("p2:"));
+
+    // test to ensure hash is stable over time
+    Assert.assertEquals("p2:she6:test1", rh.addHash("test1").toString());
+    Assert.assertEquals("p2:hgt0:0123456789abcdefghijklmnopqrstuvwxyz",
+        rh.addHash("0123456789abcdefghijklmnopqrstuvwxyz").toString());
+    Assert.assertEquals("p2:fluo:86ce3b094982c6a", rh.addHash("86ce3b094982c6a").toString());
+  }
+
+  @Test
+  public void testBalancerRegex() {
+    RowHasher rh = new RowHasher("p");
+    String regex = rh.getTableOptimizations(3).getTabletGroupingRegex();
+    Assert.assertEquals("(\\Qp:\\E).*", regex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java
new file mode 100644
index 0000000..5e7f224
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.fluo.recipes.core.types.TypedLoader;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentLoader extends TypedLoader {
+
+  String docid;
+  String refs[];
+
+  DocumentLoader(String docid, String... refs) {
+    this.docid = docid;
+    this.refs = refs;
+  }
+
+  @Override
+  public void load(TypedTransactionBase tx, Context context) throws Exception {
+    tx.mutate().row("d:" + docid).fam("content").qual("new").set(StringUtils.join(refs, " "));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java
new file mode 100644
index 0000000..c4c11d8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.export.ExportTestBase.RefExporter;
+import org.apache.fluo.recipes.core.types.TypedObserver;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentObserver extends TypedObserver {
+
+  ExportQueue<String, RefUpdates> refExportQueue;
+
+  @Override
+  public void init(Context context) throws Exception {
+    refExportQueue = ExportQueue.getInstance(RefExporter.QUEUE_ID, context.getAppConfiguration());
+  }
+
+  @Override
+  public ObservedColumn getObservedColumn() {
+    return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG);
+  }
+
+  @Override
+  public void process(TypedTransactionBase tx, Bytes row, Column col) {
+    String newContent = tx.get().row(row).col(col).toString();
+    Set<String> newRefs = new HashSet<>(Arrays.asList(newContent.split(" ")));
+    Set<String> currentRefs =
+        new HashSet<>(Arrays.asList(tx.get().row(row).fam("content").qual("current").toString("")
+            .split(" ")));
+
+    Set<String> addedRefs = new HashSet<>(newRefs);
+    addedRefs.removeAll(currentRefs);
+
+    Set<String> deletedRefs = new HashSet<>(currentRefs);
+    deletedRefs.removeAll(newRefs);
+
+    String key = row.toString().substring(2);
+    RefUpdates val = new RefUpdates(addedRefs, deletedRefs);
+
+    refExportQueue.add(tx, key, val);
+
+    tx.mutate().row(row).fam("content").qual("current").set(newContent);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java
new file mode 100644
index 0000000..d54982b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExportBufferIT extends ExportTestBase {
+
+  @Override
+  protected int getNumBuckets() {
+    return 2;
+  }
+
+  @Override
+  protected Integer getBufferSize() {
+    return 1024;
+  }
+
+  @Test
+  public void testSmallExportBuffer() {
+    // try setting the export buffer size small. Make sure everything is exported.
+
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      ExportQueue<String, RefUpdates> refExportQueue =
+          ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
+      try (Transaction tx = fc.newTransaction()) {
+        for (int i = 0; i < 1000; i++) {
+          refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 10, i + 20), ns(new int[0])));
+        }
+
+        tx.commit();
+      }
+    }
+
+    miniFluo.waitForObservers();
+
+    Map<String, Set<String>> erefs = getExportedReferees();
+    Map<String, Set<String>> expected = new HashMap<>();
+
+    for (int i = 0; i < 1000; i++) {
+      expected.computeIfAbsent(nk(i + 10), s -> new HashSet<>()).add(nk(i));
+      expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
+    }
+
+    assertEquals(expected, erefs);
+    int prevNumExportCalls = getNumExportCalls();
+    Assert.assertTrue(prevNumExportCalls > 10); // with small buffer there should be lots of exports
+                                                // calls
+
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      ExportQueue<String, RefUpdates> refExportQueue =
+          ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
+      try (Transaction tx = fc.newTransaction()) {
+        for (int i = 0; i < 1000; i++) {
+          refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 12), ns(i + 10)));
+        }
+
+        tx.commit();
+      }
+    }
+
+    miniFluo.waitForObservers();
+
+    erefs = getExportedReferees();
+    expected = new HashMap<>();
+
+    for (int i = 0; i < 1000; i++) {
+      expected.computeIfAbsent(nk(i + 12), s -> new HashSet<>()).add(nk(i));
+      expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
+    }
+
+    assertEquals(expected, erefs);
+    prevNumExportCalls = getNumExportCalls() - prevNumExportCalls;
+    Assert.assertTrue(prevNumExportCalls > 10);
+  }
+
+  public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual) {
+    if (!expected.equals(actual)) {
+      System.out.println("*** diff ***");
+      diff(expected, actual);
+      Assert.fail();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java
new file mode 100644
index 0000000..b4e167c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExportQueueIT extends ExportTestBase {
+
+  @Test
+  public void testExport() {
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0999", "0005", "0002"));
+        loader.execute(new DocumentLoader("0002", "0999", "0042"));
+        loader.execute(new DocumentLoader("0005", "0999", "0042"));
+        loader.execute(new DocumentLoader("0042", "0999"));
+      }
+
+      miniFluo.waitForObservers();
+
+      Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
+      Assert.assertEquals(ns("0999"), getExportedReferees("0002"));
+      Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
+      Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0999", "0005", "0042"));
+      }
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0999", "0005"));
+      }
+
+      miniFluo.waitForObservers();
+
+      Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
+      Assert.assertEquals(ns(new String[0]), getExportedReferees("0002"));
+      Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
+      Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0042", "0999", "0002", "0005"));
+        loader.execute(new DocumentLoader("0005", "0002"));
+      }
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0005", "0003"));
+      }
+
+      miniFluo.waitForObservers();
+
+      Assert.assertEquals(ns("0002", "0042"), getExportedReferees("0999"));
+      Assert.assertEquals(ns("0042"), getExportedReferees("0002"));
+      Assert.assertEquals(ns("0005"), getExportedReferees("0003"));
+      Assert.assertEquals(ns("0999", "0042"), getExportedReferees("0005"));
+      Assert.assertEquals(ns("0002"), getExportedReferees("0042"));
+
+    }
+  }
+
+  @Test
+  public void exportStressTest() {
+    FluoConfiguration config = new FluoConfiguration(miniFluo.getClientConfiguration());
+    config.setLoaderQueueSize(100);
+    config.setLoaderThreads(20);
+
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+
+      loadRandom(fc, 1000, 500);
+
+      miniFluo.waitForObservers();
+
+      diff(getFluoReferees(fc), getExportedReferees());
+
+      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+
+      loadRandom(fc, 1000, 500);
+
+      miniFluo.waitForObservers();
+
+      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+
+      loadRandom(fc, 1000, 10000);
+
+      miniFluo.waitForObservers();
+
+      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+
+      loadRandom(fc, 1000, 10000);
+
+      miniFluo.waitForObservers();
+
+      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
new file mode 100644
index 0000000..c1cf3ce
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+public class ExportTestBase {
+
+  private static Map<String, Map<String, RefInfo>> globalExports = new HashMap<>();
+  private static int exportCalls = 0;
+
+  protected static Set<String> getExportedReferees(String node) {
+    synchronized (globalExports) {
+      Set<String> ret = new HashSet<>();
+
+      Map<String, RefInfo> referees = globalExports.get(node);
+
+      if (referees == null) {
+        return ret;
+      }
+
+      referees.forEach((k, v) -> {
+        if (!v.deleted)
+          ret.add(k);
+      });
+
+      return ret;
+    }
+  }
+
+  protected static Map<String, Set<String>> getExportedReferees() {
+    synchronized (globalExports) {
+
+      Map<String, Set<String>> ret = new HashMap<>();
+
+      for (String k : globalExports.keySet()) {
+        Set<String> referees = getExportedReferees(k);
+        if (referees.size() > 0) {
+          ret.put(k, referees);
+        }
+      }
+
+      return ret;
+    }
+  }
+
+  protected static int getNumExportCalls() {
+    synchronized (globalExports) {
+      return exportCalls;
+    }
+  }
+
+  public static class RefExporter extends Exporter<String, RefUpdates> {
+
+    public static final String QUEUE_ID = "req";
+
+    private void updateExports(String key, long seq, String addedRef, boolean deleted) {
+      Map<String, RefInfo> referees = globalExports.computeIfAbsent(addedRef, k -> new HashMap<>());
+      referees.compute(key, (k, v) -> (v == null || v.seq < seq) ? new RefInfo(seq, deleted) : v);
+    }
+
+    @Override
+    protected void processExports(Iterator<SequencedExport<String, RefUpdates>> exportIterator) {
+      ArrayList<SequencedExport<String, RefUpdates>> exportList = new ArrayList<>();
+      Iterators.addAll(exportList, exportIterator);
+
+      synchronized (globalExports) {
+        exportCalls++;
+
+        for (SequencedExport<String, RefUpdates> se : exportList) {
+          for (String addedRef : se.getValue().getAddedRefs()) {
+            updateExports(se.getKey(), se.getSequence(), addedRef, false);
+          }
+
+          for (String deletedRef : se.getValue().getDeletedRefs()) {
+            updateExports(se.getKey(), se.getSequence(), deletedRef, true);
+          }
+        }
+      }
+    }
+  }
+
+  protected MiniFluo miniFluo;
+
+  protected int getNumBuckets() {
+    return 13;
+  }
+
+  protected Integer getBufferSize() {
+    return null;
+  }
+
+  @Before
+  public void setUpFluo() throws Exception {
+    FileUtils.deleteQuietly(new File("target/mini"));
+
+    FluoConfiguration props = new FluoConfiguration();
+    props.setApplicationName("eqt");
+    props.setWorkerThreads(20);
+    props.setMiniDataDir("target/mini");
+
+    ObserverConfiguration doc = new ObserverConfiguration(DocumentObserver.class.getName());
+    props.addObserver(doc);
+
+    SimpleSerializer.setSetserlializer(props, GsonSerializer.class);
+
+    ExportQueue.Options exportQueueOpts =
+        new ExportQueue.Options(RefExporter.QUEUE_ID, RefExporter.class, String.class,
+            RefUpdates.class, getNumBuckets());
+
+    if (getBufferSize() != null) {
+      exportQueueOpts.setBufferSize(getBufferSize());
+    }
+
+    ExportQueue.configure(props, exportQueueOpts);
+
+    miniFluo = FluoFactory.newMiniFluo(props);
+
+    globalExports.clear();
+    exportCalls = 0;
+  }
+
+  @After
+  public void tearDownFluo() throws Exception {
+    if (miniFluo != null) {
+      miniFluo.close();
+    }
+  }
+
+  protected static Set<String> ns(String... sa) {
+    return new HashSet<>(Arrays.asList(sa));
+  }
+
+  protected static String nk(int i) {
+    return String.format("%06d", i);
+  }
+
+  protected static Set<String> ns(int... ia) {
+    HashSet<String> ret = new HashSet<>();
+    for (int i : ia) {
+      ret.add(nk(i));
+    }
+    return ret;
+  }
+
+  public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual,
+      FluoClient fc) {
+    if (!expected.equals(actual)) {
+      System.out.println("*** diff ***");
+      diff(expected, actual);
+      System.out.println("*** fluo dump ***");
+      dump(fc);
+      System.out.println("*** map dump ***");
+
+      Assert.fail();
+    }
+  }
+
+  protected void loadRandom(FluoClient fc, int num, int maxDocId) {
+    try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+      Random rand = new Random();
+
+      for (int i = 0; i < num; i++) {
+        String docid = String.format("%05d", rand.nextInt(maxDocId));
+        String[] refs = new String[rand.nextInt(20) + 1];
+        for (int j = 0; j < refs.length; j++) {
+          refs[j] = String.format("%05d", rand.nextInt(maxDocId));
+        }
+
+        loader.execute(new DocumentLoader(docid, refs));
+      }
+    }
+  }
+
+  protected void diff(Map<String, Set<String>> fr, Map<String, Set<String>> er) {
+    HashSet<String> allKeys = new HashSet<>(fr.keySet());
+    allKeys.addAll(er.keySet());
+
+    for (String k : allKeys) {
+      Set<String> s1 = fr.getOrDefault(k, Collections.emptySet());
+      Set<String> s2 = er.getOrDefault(k, Collections.emptySet());
+
+      HashSet<String> sub1 = new HashSet<>(s1);
+      sub1.removeAll(s2);
+
+      HashSet<String> sub2 = new HashSet<>(s2);
+      sub2.removeAll(s1);
+
+      if (sub1.size() > 0 || sub2.size() > 0) {
+        System.out.println(k + " " + sub1 + " " + sub2);
+      }
+
+    }
+  }
+
+  protected Map<String, Set<String>> getFluoReferees(FluoClient fc) {
+    Map<String, Set<String>> fluoReferees = new HashMap<>();
+
+    try (Snapshot snap = fc.newSnapshot()) {
+      ScannerConfiguration scannerConfig = new ScannerConfiguration();
+      scannerConfig.fetchColumn(Bytes.of("content"), Bytes.of("current"));
+      scannerConfig.setSpan(Span.prefix("d:"));
+      RowIterator scanner = snap.get(scannerConfig);
+      while (scanner.hasNext()) {
+        Entry<Bytes, ColumnIterator> row = scanner.next();
+        ColumnIterator colIter = row.getValue();
+
+        String docid = row.getKey().toString().substring(2);
+
+        while (colIter.hasNext()) {
+          Entry<Column, Bytes> entry = colIter.next();
+
+          String[] refs = entry.getValue().toString().split(" ");
+
+          for (String ref : refs) {
+            if (ref.isEmpty())
+              continue;
+
+            fluoReferees.computeIfAbsent(ref, k -> new HashSet<>()).add(docid);
+          }
+        }
+      }
+    }
+    return fluoReferees;
+  }
+
+  public static void dump(FluoClient fc) {
+    try (Snapshot snap = fc.newSnapshot()) {
+      RowIterator scanner = snap.get(new ScannerConfiguration());
+      while (scanner.hasNext()) {
+        Entry<Bytes, ColumnIterator> row = scanner.next();
+        ColumnIterator colIter = row.getValue();
+
+        while (colIter.hasNext()) {
+          Entry<Column, Bytes> entry = colIter.next();
+
+          System.out.println("row:[" + row.getKey() + "]  col:[" + entry.getKey() + "]  val:["
+              + entry.getValue() + "]");
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java
new file mode 100644
index 0000000..2d45ff3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.nio.charset.StandardCharsets;
+
+import com.google.gson.Gson;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+public class GsonSerializer implements SimpleSerializer {
+
+  private Gson gson = new Gson();
+
+  @Override
+  public void init(SimpleConfiguration appConfig) {
+
+  }
+
+  @Override
+  public <T> byte[] serialize(T obj) {
+    return gson.toJson(obj).getBytes(StandardCharsets.UTF_8);
+  }
+
+  @Override
+  public <T> T deserialize(byte[] serObj, Class<T> clazz) {
+    return gson.fromJson(new String(serObj, StandardCharsets.UTF_8), clazz);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
new file mode 100644
index 0000000..b07caea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.recipes.core.export.ExportQueue.Options;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OptionsTest {
+  @Test
+  public void testExportQueueOptions() {
+    FluoConfiguration conf = new FluoConfiguration();
+
+    ExportQueue.configure(conf, new Options("Q1", "ET", "KT", "VT", 100));
+    ExportQueue.configure(conf, new Options("Q2", "ET2", "KT2", "VT2", 200).setBucketsPerTablet(20)
+        .setBufferSize(1000000));
+
+    Options opts1 = new Options("Q1", conf.getAppConfiguration());
+
+    Assert.assertEquals(opts1.exporterType, "ET");
+    Assert.assertEquals(opts1.keyType, "KT");
+    Assert.assertEquals(opts1.valueType, "VT");
+    Assert.assertEquals(opts1.numBuckets, 100);
+    Assert.assertEquals(opts1.bucketsPerTablet.intValue(), Options.DEFAULT_BUCKETS_PER_TABLET);
+    Assert.assertEquals(opts1.bufferSize.intValue(), Options.DEFAULT_BUFFER_SIZE);
+
+    Options opts2 = new Options("Q2", conf.getAppConfiguration());
+
+    Assert.assertEquals(opts2.exporterType, "ET2");
+    Assert.assertEquals(opts2.keyType, "KT2");
+    Assert.assertEquals(opts2.valueType, "VT2");
+    Assert.assertEquals(opts2.numBuckets, 200);
+    Assert.assertEquals(opts2.bucketsPerTablet.intValue(), 20);
+    Assert.assertEquals(opts2.bufferSize.intValue(), 1000000);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java
new file mode 100644
index 0000000..f4d1c76
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+class RefInfo {
+  long seq;
+  boolean deleted;
+
+  public RefInfo(long seq, boolean deleted) {
+    this.seq = seq;
+    this.deleted = deleted;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java
new file mode 100644
index 0000000..efa94dd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.Set;
+
+public class RefUpdates {
+  private Set<String> addedRefs;
+  private Set<String> deletedRefs;
+
+  public RefUpdates() {}
+
+  public RefUpdates(Set<String> addedRefs, Set<String> deletedRefs) {
+    this.addedRefs = addedRefs;
+    this.deletedRefs = deletedRefs;
+  }
+
+  public Set<String> getAddedRefs() {
+    return addedRefs;
+  }
+
+  public Set<String> getDeletedRefs() {
+    return deletedRefs;
+  }
+
+  @Override
+  public String toString() {
+    return "added:" + addedRefs + " deleted:" + deletedRefs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
new file mode 100644
index 0000000..e5f7d55
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+import org.apache.fluo.recipes.core.types.StringEncoder;
+import org.apache.fluo.recipes.core.types.TypeLayer;
+import org.apache.fluo.recipes.core.types.TypedSnapshot;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test configures a small buffer size and verifies that multiple passes are made to process
+ * updates.
+ */
+public class BigUpdateIT {
+  private static final TypeLayer tl = new TypeLayer(new StringEncoder());
+
+  private MiniFluo miniFluo;
+
+  private CollisionFreeMap<String, Long> wcMap;
+
+  static final String MAP_ID = "bu";
+
+  public static class LongCombiner implements Combiner<String, Long> {
+
+    @Override
+    public Optional<Long> combine(String key, Iterator<Long> updates) {
+      long[] count = new long[] {0};
+      updates.forEachRemaining(l -> count[0] += l);
+      return Optional.of(count[0]);
+    }
+  }
+
+  static final Column DSCOL = new Column("debug", "sum");
+
+  private static AtomicInteger globalUpdates = new AtomicInteger(0);
+
+  public static class MyObserver extends UpdateObserver<String, Long> {
+
+    @Override
+    public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
+      TypedTransactionBase ttx = tl.wrap(tx);
+
+      Map<String, Long> expectedOld = new HashMap<>();
+
+
+      while (updates.hasNext()) {
+        Update<String, Long> update = updates.next();
+
+        if (update.getOldValue().isPresent()) {
+          expectedOld.put("side:" + update.getKey(), update.getOldValue().get());
+        }
+
+        ttx.mutate().row("side:" + update.getKey()).col(DSCOL).set(update.getNewValue().get());
+      }
+
+      // get last values set to verify same as passed in old value
+      Map<String, Long> actualOld =
+          Maps.transformValues(
+              ttx.get().rowsString(expectedOld.keySet()).columns(ImmutableSet.of(DSCOL))
+                  .toStringMap(), m -> m.get(DSCOL).toLong());
+
+      MapDifference<String, Long> diff = Maps.difference(expectedOld, actualOld);
+
+      Assert.assertTrue(diff.toString(), diff.areEqual());
+
+      globalUpdates.incrementAndGet();
+    }
+  }
+
+  @Before
+  public void setUpFluo() throws Exception {
+    FileUtils.deleteQuietly(new File("target/mini"));
+
+    FluoConfiguration props = new FluoConfiguration();
+    props.setApplicationName("eqt");
+    props.setWorkerThreads(20);
+    props.setMiniDataDir("target/mini");
+
+    SimpleSerializer.setSetserlializer(props, TestSerializer.class);
+
+    CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, LongCombiner.class,
+        MyObserver.class, String.class, Long.class, 2).setBufferSize(1 << 10));
+
+    miniFluo = FluoFactory.newMiniFluo(props);
+
+    wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration());
+
+    globalUpdates.set(0);
+  }
+
+  @After
+  public void tearDownFluo() throws Exception {
+    if (miniFluo != null) {
+      miniFluo.close();
+    }
+  }
+
+  @Test
+  public void testBigUpdates() {
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      updateMany(fc);
+
+      miniFluo.waitForObservers();
+
+      int numUpdates = 0;
+
+      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
+        checkUpdates(snap, 1, 1000);
+        numUpdates = globalUpdates.get();
+        // there are two buckets, expect update processing at least twice per bucket
+        Assert.assertTrue(numUpdates >= 4);
+      }
+
+      updateMany(fc);
+      updateMany(fc);
+
+      miniFluo.waitForObservers();
+
+      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
+        checkUpdates(snap, 3, 1000);
+        numUpdates = globalUpdates.get() - numUpdates;
+        Assert.assertTrue(numUpdates >= 4);
+      }
+
+      for (int i = 0; i < 10; i++) {
+        updateMany(fc);
+      }
+
+      miniFluo.waitForObservers();
+
+      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
+        checkUpdates(snap, 13, 1000);
+        numUpdates = globalUpdates.get() - numUpdates;
+        Assert.assertTrue(numUpdates >= 4);
+      }
+    }
+  }
+
+  private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) {
+    RowIterator iter = snap.get(new ScannerConfiguration().setSpan(Span.prefix("side:")));
+
+    int row = 0;
+
+    while (iter.hasNext()) {
+      Entry<Bytes, ColumnIterator> entry = iter.next();
+
+      Assert.assertEquals(String.format("side:%06d", row++), entry.getKey().toString());
+
+      ColumnIterator colITer = entry.getValue();
+      while (colITer.hasNext()) {
+        Entry<Column, Bytes> entry2 = colITer.next();
+        Assert.assertEquals(new Column("debug", "sum"), entry2.getKey());
+        Assert.assertEquals("row : " + entry.getKey(), "" + expectedVal, entry2.getValue()
+            .toString());
+      }
+    }
+
+    Assert.assertEquals(expectedRows, row);
+  }
+
+  private void updateMany(FluoClient fc) {
+    try (Transaction tx = fc.newTransaction()) {
+      Map<String, Long> updates = new HashMap<>();
+      for (int i = 0; i < 1000; i++) {
+        updates.put(String.format("%06d", i), 1L);
+      }
+
+      wcMap.update(tx, updates);
+      tx.commit();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
new file mode 100644
index 0000000..f7dbc89
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CollisionFreeMapIT {
+
+  private MiniFluo miniFluo;
+
+  private CollisionFreeMap<String, Long> wcMap;
+
+  static final String MAP_ID = "wcm";
+
+  @Before
+  public void setUpFluo() throws Exception {
+    FileUtils.deleteQuietly(new File("target/mini"));
+
+    FluoConfiguration props = new FluoConfiguration();
+    props.setApplicationName("eqt");
+    props.setWorkerThreads(20);
+    props.setMiniDataDir("target/mini");
+
+    props.addObserver(new ObserverConfiguration(DocumentObserver.class.getName()));
+
+    SimpleSerializer.setSetserlializer(props, TestSerializer.class);
+
+    CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, WordCountCombiner.class,
+        WordCountObserver.class, String.class, Long.class, 17));
+
+    miniFluo = FluoFactory.newMiniFluo(props);
+
+    wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration());
+  }
+
+  @After
+  public void tearDownFluo() throws Exception {
+    if (miniFluo != null) {
+      miniFluo.close();
+    }
+  }
+
+  private Map<String, Long> getComputedWordCounts(FluoClient fc) {
+    Map<String, Long> counts = new HashMap<>();
+
+    try (Snapshot snap = fc.newSnapshot()) {
+      RowIterator scanner = snap.get(new ScannerConfiguration().setSpan(Span.prefix("iwc:")));
+      while (scanner.hasNext()) {
+        Entry<Bytes, ColumnIterator> row = scanner.next();
+
+        String[] tokens = row.getKey().toString().split(":");
+        String word = tokens[2];
+        Long count = Long.valueOf(tokens[1]);
+
+        Assert.assertFalse("Word seen twice in index " + word, counts.containsKey(word));
+
+        counts.put(word, count);
+      }
+    }
+
+    return counts;
+  }
+
+  private Map<String, Long> computeWordCounts(FluoClient fc) {
+    Map<String, Long> counts = new HashMap<>();
+
+    try (Snapshot snap = fc.newSnapshot()) {
+      RowIterator scanner =
+          snap.get(new ScannerConfiguration().setSpan(Span.prefix("d:")).fetchColumn(
+              Bytes.of("content"), Bytes.of("current")));
+      while (scanner.hasNext()) {
+        Entry<Bytes, ColumnIterator> row = scanner.next();
+
+        ColumnIterator colIter = row.getValue();
+
+        while (colIter.hasNext()) {
+          Entry<Column, Bytes> entry = colIter.next();
+
+          String[] words = entry.getValue().toString().split("\\s+");
+          for (String word : words) {
+            if (word.isEmpty()) {
+              continue;
+            }
+
+            counts.merge(word, 1L, Long::sum);
+          }
+        }
+      }
+    }
+
+    return counts;
+  }
+
+  @Test
+  public void testGet() {
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      try (Transaction tx = fc.newTransaction()) {
+        wcMap.update(tx, ImmutableMap.of("cat", 2L, "dog", 5L));
+        tx.commit();
+      }
+
+      try (Transaction tx = fc.newTransaction()) {
+        wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L));
+        tx.commit();
+      }
+
+      try (Transaction tx = fc.newTransaction()) {
+        wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L, "fish", 2L));
+        tx.commit();
+      }
+
+      // try reading possibly before observer combines... will either see outstanding updates or a
+      // current value
+      try (Snapshot snap = fc.newSnapshot()) {
+        Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
+        Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog"));
+        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+      }
+
+      miniFluo.waitForObservers();
+
+      // in this case there should be no updates, only a current value
+      try (Snapshot snap = fc.newSnapshot()) {
+        Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
+        Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog"));
+        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+      }
+
+      Map<String, Long> expectedCounts = new HashMap<>();
+      expectedCounts.put("cat", 4L);
+      expectedCounts.put("dog", 7L);
+      expectedCounts.put("fish", 2L);
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (Transaction tx = fc.newTransaction()) {
+        wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", -7L));
+        tx.commit();
+      }
+
+      // there may be outstanding update and a current value for the key in this case
+      try (Snapshot snap = fc.newSnapshot()) {
+        Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat"));
+        Assert.assertNull(wcMap.get(snap, "dog"));
+        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+      }
+
+      miniFluo.waitForObservers();
+
+      try (Snapshot snap = fc.newSnapshot()) {
+        Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat"));
+        Assert.assertNull(wcMap.get(snap, "dog"));
+        Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+      }
+
+      expectedCounts.put("cat", 5L);
+      expectedCounts.remove("dog");
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+    }
+  }
+
+  @Test
+  public void testBasic() {
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0001", "dog cat"));
+        loader.execute(new DocumentLoader("0002", "cat hamster"));
+        loader.execute(new DocumentLoader("0003", "milk bread cat food"));
+        loader.execute(new DocumentLoader("0004", "zoo big cat"));
+      }
+
+      miniFluo.waitForObservers();
+
+      try (Snapshot snap = fc.newSnapshot()) {
+        Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
+        Assert.assertEquals((Long) 1L, wcMap.get(snap, "milk"));
+      }
+
+      Map<String, Long> expectedCounts = new HashMap<>();
+      expectedCounts.put("dog", 1L);
+      expectedCounts.put("cat", 4L);
+      expectedCounts.put("hamster", 1L);
+      expectedCounts.put("milk", 1L);
+      expectedCounts.put("bread", 1L);
+      expectedCounts.put("food", 1L);
+      expectedCounts.put("zoo", 1L);
+      expectedCounts.put("big", 1L);
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0001", "dog feline"));
+      }
+
+      miniFluo.waitForObservers();
+
+      expectedCounts.put("cat", 3L);
+      expectedCounts.put("feline", 1L);
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        // swap contents of two documents... should not change doc counts
+        loader.execute(new DocumentLoader("0003", "zoo big cat"));
+        loader.execute(new DocumentLoader("0004", "milk bread cat food"));
+      }
+
+      miniFluo.waitForObservers();
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0003", "zoo big cat"));
+        loader.execute(new DocumentLoader("0004", "zoo big cat"));
+      }
+
+      miniFluo.waitForObservers();
+
+      expectedCounts.put("zoo", 2L);
+      expectedCounts.put("big", 2L);
+      expectedCounts.remove("milk");
+      expectedCounts.remove("bread");
+      expectedCounts.remove("food");
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0002", "cat cat hamster hamster"));
+      }
+
+      miniFluo.waitForObservers();
+
+      expectedCounts.put("cat", 4L);
+      expectedCounts.put("hamster", 2L);
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        loader.execute(new DocumentLoader("0002", "dog hamster"));
+      }
+
+      miniFluo.waitForObservers();
+
+      expectedCounts.put("cat", 2L);
+      expectedCounts.put("hamster", 1L);
+      expectedCounts.put("dog", 2L);
+
+      Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+    }
+  }
+
+  private static String randDocId(Random rand) {
+    return String.format("%04d", rand.nextInt(5000));
+  }
+
+  private static String randomDocument(Random rand) {
+    StringBuilder sb = new StringBuilder();
+
+    String sep = "";
+    for (int i = 2; i < rand.nextInt(18); i++) {
+      sb.append(sep);
+      sep = " ";
+      sb.append(String.format("%05d", rand.nextInt(50000)));
+    }
+
+    return sb.toString();
+  }
+
+  public void diff(Map<String, Long> m1, Map<String, Long> m2) {
+    for (String word : m1.keySet()) {
+      Long v1 = m1.get(word);
+      Long v2 = m2.get(word);
+
+      if (v2 == null || !v1.equals(v2)) {
+        System.out.println(word + " " + v1 + " != " + v2);
+      }
+    }
+
+    for (String word : m2.keySet()) {
+      Long v1 = m1.get(word);
+      Long v2 = m2.get(word);
+
+      if (v1 == null) {
+        System.out.println(word + " null != " + v2);
+      }
+    }
+  }
+
+  @Test
+  public void testStress() throws Exception {
+    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+      Random rand = new Random();
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        for (int i = 0; i < 1000; i++) {
+          loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand)));
+        }
+      }
+
+      miniFluo.waitForObservers();
+      assertWordCountsEqual(fc);
+
+      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+        for (int i = 0; i < 100; i++) {
+          loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand)));
+        }
+      }
+
+      miniFluo.waitForObservers();
+      assertWordCountsEqual(fc);
+    }
+  }
+
+  private void assertWordCountsEqual(FluoClient fc) {
+    Map<String, Long> expected = computeWordCounts(fc);
+    Map<String, Long> actual = getComputedWordCounts(fc);
+    if (!expected.equals(actual)) {
+      diff(expected, actual);
+      Assert.fail();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java
new file mode 100644
index 0000000..54f5ee1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import org.apache.fluo.recipes.core.types.TypedLoader;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentLoader extends TypedLoader {
+
+  String docid;
+  String doc;
+
+  DocumentLoader(String docid, String doc) {
+    this.docid = docid;
+    this.doc = doc;
+  }
+
+  @Override
+  public void load(TypedTransactionBase tx, Context context) throws Exception {
+    tx.mutate().row("d:" + docid).fam("content").qual("new").set(doc);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java
new file mode 100644
index 0000000..2c79f45
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.types.TypedObserver;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentObserver extends TypedObserver {
+
+  CollisionFreeMap<String, Long> wcm;
+
+  @Override
+  public void init(Context context) throws Exception {
+    wcm = CollisionFreeMap.getInstance(CollisionFreeMapIT.MAP_ID, context.getAppConfiguration());
+  }
+
+  @Override
+  public ObservedColumn getObservedColumn() {
+    return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG);
+  }
+
+  static Map<String, Long> getWordCounts(String doc) {
+    Map<String, Long> wordCounts = new HashMap<>();
+    String[] words = doc.split(" ");
+    for (String word : words) {
+      if (word.isEmpty()) {
+        continue;
+      }
+      wordCounts.merge(word, 1L, Long::sum);
+    }
+
+    return wordCounts;
+  }
+
+  @Override
+  public void process(TypedTransactionBase tx, Bytes row, Column col) {
+    String newContent = tx.get().row(row).col(col).toString();
+    String currentContent = tx.get().row(row).fam("content").qual("current").toString("");
+
+    Map<String, Long> newWordCounts = getWordCounts(newContent);
+    Map<String, Long> currentWordCounts = getWordCounts(currentContent);
+
+    Map<String, Long> changes = calculateChanges(newWordCounts, currentWordCounts);
+
+    wcm.update(tx, changes);
+
+    tx.mutate().row(row).fam("content").qual("current").set(newContent);
+  }
+
+  private static Map<String, Long> calculateChanges(Map<String, Long> newCounts,
+      Map<String, Long> currCounts) {
+    Map<String, Long> changes = new HashMap<>();
+
+    // guava Maps class
+    MapDifference<String, Long> diffs = Maps.difference(currCounts, newCounts);
+
+    // compute the diffs for words that changed
+    changes.putAll(Maps.transformValues(diffs.entriesDiffering(), vDiff -> vDiff.rightValue()
+        - vDiff.leftValue()));
+
+    // add all new words
+    changes.putAll(diffs.entriesOnlyOnRight());
+
+    // subtract all words no longer present
+    changes.putAll(Maps.transformValues(diffs.entriesOnlyOnLeft(), l -> l * -1));
+
+    return changes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java
new file mode 100644
index 0000000..6587f6e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OptionsTest {
+  @Test
+  public void testExportQueueOptions() {
+    FluoConfiguration conf = new FluoConfiguration();
+
+    CollisionFreeMap.configure(conf, new Options("Q1", "CT", "KT", "VT", 100));
+    CollisionFreeMap.configure(conf, new Options("Q2", "CT2", "KT2", "VT2", 200)
+        .setBucketsPerTablet(20).setBufferSize(1000000));
+
+    Options opts1 = new Options("Q1", conf.getAppConfiguration());
+
+    Assert.assertEquals(opts1.combinerType, "CT");
+    Assert.assertEquals(opts1.keyType, "KT");
+    Assert.assertEquals(opts1.valueType, "VT");
+    Assert.assertEquals(opts1.numBuckets, 100);
+    Assert.assertEquals(opts1.bucketsPerTablet.intValue(), Options.DEFAULT_BUCKETS_PER_TABLET);
+    Assert.assertEquals(opts1.bufferSize.intValue(), Options.DEFAULT_BUFFER_SIZE);
+
+    Options opts2 = new Options("Q2", conf.getAppConfiguration());
+
+    Assert.assertEquals(opts2.combinerType, "CT2");
+    Assert.assertEquals(opts2.keyType, "KT2");
+    Assert.assertEquals(opts2.valueType, "VT2");
+    Assert.assertEquals(opts2.numBuckets, 200);
+    Assert.assertEquals(opts2.bucketsPerTablet.intValue(), 20);
+    Assert.assertEquals(opts2.bufferSize.intValue(), 1000000);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
new file mode 100644
index 0000000..a359598
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.recipes.core.common.Pirtos;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SplitsTest {
+  private static List<Bytes> sort(List<Bytes> in) {
+    ArrayList<Bytes> out = new ArrayList<>(in);
+    Collections.sort(out);
+    return out;
+  }
+
+  @Test
+  public void testSplits() {
+
+    Options opts = new Options("foo", WordCountCombiner.class, String.class, Long.class, 3);
+    opts.setBucketsPerTablet(1);
+    FluoConfiguration fluoConfig = new FluoConfiguration();
+    CollisionFreeMap.configure(fluoConfig, opts);
+
+    Pirtos pirtos1 =
+        CollisionFreeMap.getTableOptimizations("foo", fluoConfig.getAppConfiguration());
+    List<Bytes> expected1 =
+        Lists.transform(
+            Arrays.asList("foo:d:1", "foo:d:2", "foo:d:~", "foo:u:1", "foo:u:2", "foo:u:~"),
+            Bytes::of);
+
+    Assert.assertEquals(expected1, sort(pirtos1.getSplits()));
+
+    Options opts2 = new Options("bar", WordCountCombiner.class, String.class, Long.class, 6);
+    opts2.setBucketsPerTablet(2);
+    CollisionFreeMap.configure(fluoConfig, opts2);
+
+    Pirtos pirtos2 =
+        CollisionFreeMap.getTableOptimizations("bar", fluoConfig.getAppConfiguration());
+    List<Bytes> expected2 =
+        Lists.transform(
+            Arrays.asList("bar:d:2", "bar:d:4", "bar:d:~", "bar:u:2", "bar:u:4", "bar:u:~"),
+            Bytes::of);
+    Assert.assertEquals(expected2, sort(pirtos2.getSplits()));
+
+    Pirtos pirtos3 = CollisionFreeMap.getTableOptimizations(fluoConfig.getAppConfiguration());
+
+    ArrayList<Bytes> expected3 = new ArrayList<>(expected2);
+    expected3.addAll(expected1);
+
+    Assert.assertEquals(expected3, sort(pirtos3.getSplits()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java
new file mode 100644
index 0000000..b59705a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+public class TestSerializer implements SimpleSerializer {
+
+  @Override
+  public <T> byte[] serialize(T obj) {
+    return obj.toString().getBytes();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T> T deserialize(byte[] serObj, Class<T> clazz) {
+    if (clazz.equals(Long.class)) {
+      return (T) Long.valueOf(new String(serObj));
+    }
+
+    if (clazz.equals(String.class)) {
+      return (T) new String(serObj);
+    }
+
+    throw new IllegalArgumentException();
+  }
+
+  @Override
+  public void init(SimpleConfiguration appConfig) {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java
new file mode 100644
index 0000000..f757c10
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+public class WordCountCombiner implements Combiner<String, Long> {
+  @Override
+  public Optional<Long> combine(String key, Iterator<Long> updates) {
+    long sum = 0;
+
+    while (updates.hasNext()) {
+      sum += updates.next();
+    }
+
+    if (sum == 0) {
+      return Optional.empty();
+    } else {
+      return Optional.of(sum);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java
new file mode 100644
index 0000000..221083c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+public class WordCountObserver extends UpdateObserver<String, Long> {
+
+  @Override
+  public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
+
+    while (updates.hasNext()) {
+      Update<String, Long> update = updates.next();
+
+      Optional<Long> oldVal = update.getOldValue();
+      Optional<Long> newVal = update.getNewValue();
+
+      if (oldVal.isPresent()) {
+        String oldRow = String.format("iwc:%09d:%s", oldVal.get(), update.getKey());
+        tx.delete(Bytes.of(oldRow), new Column(Bytes.EMPTY, Bytes.EMPTY));
+      }
+
+      if (newVal.isPresent()) {
+        String newRow = String.format("iwc:%09d:%s", newVal.get(), update.getKey());
+        tx.set(Bytes.of(newRow), new Column(Bytes.EMPTY, Bytes.EMPTY), Bytes.EMPTY);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
new file mode 100644
index 0000000..7c09b6e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.transaction;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.recipes.core.types.StringEncoder;
+import org.apache.fluo.recipes.core.types.TypeLayer;
+import org.apache.fluo.recipes.core.types.TypedTransaction;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class RecordingTransactionTest {
+
+  private Transaction tx;
+  private RecordingTransaction rtx;
+  private TypeLayer tl = new TypeLayer(new StringEncoder());
+
+  @Before
+  public void setUp() {
+    tx = mock(Transaction.class);
+    rtx = RecordingTransaction.wrap(tx);
+  }
+
+  @Test
+  public void testTx() {
+    rtx.set(Bytes.of("r1"), new Column("cf1"), Bytes.of("v1"));
+    rtx.set(Bytes.of("r2"), new Column("cf2", "cq2"), Bytes.of("v2"));
+    rtx.delete(Bytes.of("r3"), new Column("cf3"));
+    expect(tx.get(Bytes.of("r4"), new Column("cf4"))).andReturn(Bytes.of("v4"));
+    replay(tx);
+    rtx.get(Bytes.of("r4"), new Column("cf4"));
+
+    List<LogEntry> entries = rtx.getTxLog().getLogEntries();
+    Assert.assertEquals(4, entries.size());
+    Assert.assertEquals("LogEntry{op=SET, row=r1, col=cf1  , value=v1}", entries.get(0).toString());
+    Assert.assertEquals("LogEntry{op=SET, row=r2, col=cf2 cq2 , value=v2}", entries.get(1)
+        .toString());
+    Assert
+        .assertEquals("LogEntry{op=DELETE, row=r3, col=cf3  , value=}", entries.get(2).toString());
+    Assert.assertEquals("LogEntry{op=GET, row=r4, col=cf4  , value=v4}", entries.get(3).toString());
+    Assert.assertEquals("{r4 cf4  =v4}", rtx.getTxLog().getOperationMap(LogEntry.Operation.GET)
+        .toString());
+    Assert.assertEquals("{r2 cf2 cq2 =v2, r1 cf1  =v1}",
+        rtx.getTxLog().getOperationMap(LogEntry.Operation.SET).toString());
+    Assert.assertEquals("{r3 cf3  =}", rtx.getTxLog().getOperationMap(LogEntry.Operation.DELETE)
+        .toString());
+  }
+
+  @Test
+  public void testTypedTx() {
+    TypedTransaction ttx = tl.wrap(rtx);
+    ttx.mutate().row("r5").fam("cf5").qual("cq5").set("1");
+    ttx.mutate().row("r6").fam("cf6").qual("cq6").set("1");
+    List<LogEntry> entries = rtx.getTxLog().getLogEntries();
+    Assert.assertEquals(2, entries.size());
+    Assert.assertEquals("LogEntry{op=SET, row=r5, col=cf5 cq5 , value=1}", entries.get(0)
+        .toString());
+    Assert.assertEquals("LogEntry{op=SET, row=r6, col=cf6 cq6 , value=1}", entries.get(1)
+        .toString());
+  }
+
+  @Test
+  public void testFilter() {
+    rtx = RecordingTransaction.wrap(tx, le -> le.getColumn().getFamily().toString().equals("cfa"));
+    TypedTransaction ttx = tl.wrap(rtx);
+    ttx.mutate().row("r1").fam("cfa").qual("cq1").set("1");
+    ttx.mutate().row("r2").fam("cfb").qual("cq2").set("2");
+    ttx.mutate().row("r3").fam("cfa").qual("cq3").set("3");
+    List<LogEntry> entries = rtx.getTxLog().getLogEntries();
+    Assert.assertEquals(2, entries.size());
+    Assert.assertEquals("LogEntry{op=SET, row=r1, col=cfa cq1 , value=1}", entries.get(0)
+        .toString());
+    Assert.assertEquals("LogEntry{op=SET, row=r3, col=cfa cq3 , value=3}", entries.get(1)
+        .toString());
+  }
+
+  @Test
+  public void testClose() {
+    tx.close();
+    replay(tx);
+    rtx.close();
+    verify(tx);
+  }
+
+  @Test
+  public void testCommit() {
+    tx.commit();
+    replay(tx);
+    rtx.commit();
+    verify(tx);
+  }
+
+  @Test
+  public void testDelete() {
+    tx.delete(Bytes.of("r"), Column.EMPTY);
+    replay(tx);
+    rtx.delete(Bytes.of("r"), Column.EMPTY);
+    verify(tx);
+  }
+
+  @Test
+  public void testGet() {
+    expect(tx.get(Bytes.of("r"), Column.EMPTY)).andReturn(Bytes.of("v"));
+    replay(tx);
+    Assert.assertEquals(Bytes.of("v"), rtx.get(Bytes.of("r"), Column.EMPTY));
+    verify(tx);
+  }
+
+  @Test
+  public void testGetColumns() {
+    expect(tx.get(Bytes.of("r"), Collections.emptySet())).andReturn(Collections.emptyMap());
+    replay(tx);
+    Assert.assertEquals(Collections.emptyMap(), rtx.get(Bytes.of("r"), Collections.emptySet()));
+    verify(tx);
+  }
+
+  @Test
+  public void testGetRows() {
+    expect(tx.get(Collections.emptyList(), Collections.emptySet())).andReturn(
+        Collections.emptyMap());
+    replay(tx);
+    Assert.assertEquals(Collections.emptyMap(),
+        rtx.get(Collections.emptyList(), Collections.emptySet()));
+    verify(tx);
+  }
+
+  @Test
+  public void testGetScanNull() {
+    ScannerConfiguration scanConfig = new ScannerConfiguration();
+    expect(tx.get(scanConfig)).andReturn(null);
+    replay(tx);
+    Assert.assertNull(rtx.get(scanConfig));
+    verify(tx);
+  }
+
+  @Test
+  public void testGetScanIter() {
+    ScannerConfiguration scanConfig = new ScannerConfiguration();
+    expect(tx.get(scanConfig)).andReturn(new RowIterator() {
+
+      private boolean hasNextRow = true;
+
+      @Override
+      public boolean hasNext() {
+        return hasNextRow;
+      }
+
+      @Override
+      public Map.Entry<Bytes, ColumnIterator> next() {
+        hasNextRow = false;
+        return new AbstractMap.SimpleEntry<>(Bytes.of("r7"), new ColumnIterator() {
+
+          private boolean hasNextCol = true;
+
+          @Override
+          public boolean hasNext() {
+            return hasNextCol;
+          }
+
+          @Override
+          public Map.Entry<Column, Bytes> next() {
+            hasNextCol = false;
+            return new AbstractMap.SimpleEntry<>(new Column("cf7", "cq7"), Bytes.of("v7"));
+          }
+        });
+      }
+    });
+    replay(tx);
+    RowIterator rowIter = rtx.get(scanConfig);
+    Assert.assertNotNull(rowIter);
+    Assert.assertTrue(rtx.getTxLog().isEmpty());
+    Assert.assertTrue(rowIter.hasNext());
+    Map.Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
+    Assert.assertFalse(rowIter.hasNext());
+    Assert.assertEquals(Bytes.of("r7"), rowEntry.getKey());
+    ColumnIterator colIter = rowEntry.getValue();
+    Assert.assertTrue(colIter.hasNext());
+    Assert.assertTrue(rtx.getTxLog().isEmpty());
+    Map.Entry<Column, Bytes> colEntry = colIter.next();
+    Assert.assertFalse(rtx.getTxLog().isEmpty());
+    Assert.assertFalse(colIter.hasNext());
+    Assert.assertEquals(new Column("cf7", "cq7"), colEntry.getKey());
+    Assert.assertEquals(Bytes.of("v7"), colEntry.getValue());
+    List<LogEntry> entries = rtx.getTxLog().getLogEntries();
+    Assert.assertEquals(1, entries.size());
+    Assert.assertEquals("LogEntry{op=GET, row=r7, col=cf7 cq7 , value=v7}", entries.get(0)
+        .toString());
+    verify(tx);
+  }
+
+  @Test
+  public void testGetTimestamp() {
+    expect(tx.getStartTimestamp()).andReturn(5L);
+    replay(tx);
+    Assert.assertEquals(5L, rtx.getStartTimestamp());
+    verify(tx);
+  }
+}