You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 22:07:50 UTC

[02/10] incubator-fluo-recipes git commit: Updated package names in core module

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshot.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshot.java
new file mode 100644
index 0000000..64fa7c2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshot.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.client.Snapshot;
+
+public class MockSnapshot extends MockSnapshotBase implements Snapshot {
+
+  MockSnapshot(String... entries) {
+    super(entries);
+  }
+
+  @Override
+  public void close() {
+    // no resources need to be closed
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
new file mode 100644
index 0000000..d31b36c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.core.impl.TxStringUtil;
+
+public class MockSnapshotBase implements SnapshotBase {
+
+  final Map<Bytes, Map<Column, Bytes>> getData;
+
+  /**
+   * Initializes {@link #getData} using {@link #toRCVM(String...)}
+   */
+  MockSnapshotBase(String... entries) {
+    getData = toRCVM(entries);
+  }
+
+  @Override
+  public Bytes get(Bytes row, Column column) {
+    Map<Column, Bytes> cols = getData.get(row);
+    if (cols != null) {
+      return cols.get(column);
+    }
+
+    return null;
+  }
+
+  @Override
+  public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+    Map<Column, Bytes> ret = new HashMap<>();
+    Map<Column, Bytes> cols = getData.get(row);
+    if (cols != null) {
+      for (Column column : columns) {
+        Bytes val = cols.get(column);
+        if (val != null) {
+          ret.put(column, val);
+        }
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+
+    Map<Bytes, Map<Column, Bytes>> ret = new HashMap<>();
+
+    for (Bytes row : rows) {
+      Map<Column, Bytes> colMap = get(row, columns);
+      if (colMap != null && colMap.size() > 0) {
+        ret.put(row, colMap);
+      }
+    }
+
+    return ret;
+  }
+
+  @Override
+  public RowIterator get(ScannerConfiguration config) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * toRCVM stands for "To Row Column Value Map". This is a convenience function that takes strings
+   * of the format {@code <row>,<col fam>:<col qual>[:col vis],
+   * <value>} and generates a row, column, value map.
+   */
+  public static Map<Bytes, Map<Column, Bytes>> toRCVM(String... entries) {
+    Map<Bytes, Map<Column, Bytes>> ret = new HashMap<>();
+
+    for (String entry : entries) {
+      String[] rcv = entry.split(",");
+      if (rcv.length != 3 && !(rcv.length == 2 && entry.trim().endsWith(","))) {
+        throw new IllegalArgumentException(
+            "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+      }
+
+      Bytes row = Bytes.of(rcv[0]);
+      String[] colFields = rcv[1].split(":");
+
+      Column col;
+      if (colFields.length == 3) {
+        col = new Column(colFields[0], colFields[1], colFields[2]);
+      } else if (colFields.length == 2) {
+        col = new Column(colFields[0], colFields[1]);
+      } else {
+        throw new IllegalArgumentException(
+            "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+      }
+
+      Bytes val;
+      if (rcv.length == 2) {
+        val = Bytes.EMPTY;
+      } else {
+        val = Bytes.of(rcv[2]);
+      }
+
+      Map<Column, Bytes> cols = ret.get(row);
+      if (cols == null) {
+        cols = new HashMap<>();
+        ret.put(row, cols);
+      }
+
+      cols.put(col, val);
+    }
+    return ret;
+  }
+
+  /**
+   * toRCM stands for "To Row Column Map". This is a convenience function that takes strings of the
+   * format {@code <row>,<col fam>:<col qual>[:col vis]} and generates a row, column map.
+   */
+  public static Map<Bytes, Set<Column>> toRCM(String... entries) {
+    Map<Bytes, Set<Column>> ret = new HashMap<>();
+
+    for (String entry : entries) {
+      String[] rcv = entry.split(",");
+      if (rcv.length != 2) {
+        throw new IllegalArgumentException(
+            "expected <row>,<col fam>:<col qual>[:col vis] but saw : " + entry);
+      }
+
+      Bytes row = Bytes.of(rcv[0]);
+      String[] colFields = rcv[1].split(":");
+
+      Column col;
+      if (colFields.length == 3) {
+        col = new Column(colFields[0], colFields[1], colFields[2]);
+      } else if (colFields.length == 2) {
+        col = new Column(colFields[0], colFields[1]);
+      } else {
+        throw new IllegalArgumentException(
+            "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+      }
+
+      Set<Column> cols = ret.get(row);
+      if (cols == null) {
+        cols = new HashSet<>();
+        ret.put(row, cols);
+      }
+
+      cols.add(col);
+    }
+    return ret;
+  }
+
+  @Override
+  public long getStartTimestamp() {
+    throw new UnsupportedOperationException();
+  }
+
+
+  @Override
+  public String gets(String row, Column column) {
+    return TxStringUtil.gets(this, row, column);
+  }
+
+  @Override
+  public Map<Column, String> gets(String row, Set<Column> columns) {
+    return TxStringUtil.gets(this, row, columns);
+  }
+
+  @Override
+  public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+    return TxStringUtil.gets(this, rows, columns);
+  }
+
+  @Override
+  public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+    return TxStringUtil.gets(this, rowColumns);
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransaction.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransaction.java
new file mode 100644
index 0000000..750e0ee
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransaction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.exceptions.CommitException;
+
+public class MockTransaction extends MockTransactionBase implements Transaction {
+
+  MockTransaction(String... entries) {
+    super(entries);
+  }
+
+  @Override
+  public void commit() throws CommitException {
+    // does nothing
+  }
+
+  @Override
+  public void close() {
+    // no resources to close
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransactionBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransactionBase.java
new file mode 100644
index 0000000..05ab87e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransactionBase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+
+/**
+ * A very simple implementation of {@link TransactionBase} used for testing. All reads are serviced
+ * from {@link #getData}. Updates are stored in {@link #setData}, {@link #deletes}, or
+ * {@link #weakNotifications} depending on the update type.
+ */
+public class MockTransactionBase extends MockSnapshotBase implements TransactionBase {
+
+  final Map<Bytes, Map<Column, Bytes>> setData = new HashMap<>();
+  final Map<Bytes, Set<Column>> deletes = new HashMap<>();
+  final Map<Bytes, Set<Column>> weakNotifications = new HashMap<>();
+
+  MockTransactionBase(String... entries) {
+    super(entries);
+  }
+
+  @Override
+  public void setWeakNotification(Bytes row, Column col) {
+    Set<Column> cols = weakNotifications.get(row);
+    if (cols == null) {
+      cols = new HashSet<>();
+      weakNotifications.put(row, cols);
+    }
+
+    cols.add(col);
+  }
+
+  @Override
+  public void set(Bytes row, Column col, Bytes value) {
+    Map<Column, Bytes> cols = setData.get(row);
+    if (cols == null) {
+      cols = new HashMap<>();
+      setData.put(row, cols);
+    }
+
+    cols.put(col, value);
+  }
+
+  @Override
+  public void delete(Bytes row, Column col) {
+    Set<Column> cols = deletes.get(row);
+    if (cols == null) {
+      cols = new HashSet<>();
+      deletes.put(row, cols);
+    }
+
+    cols.add(col);
+  }
+
+  @Override
+  public void setWeakNotification(String row, Column col) {
+    setWeakNotification(Bytes.of(row), col);
+  }
+
+  @Override
+  public void set(String row, Column col, String value) throws AlreadySetException {
+    set(Bytes.of(row), col, Bytes.of(value));
+  }
+
+  @Override
+  public void delete(String row, Column col) {
+    delete(Bytes.of(row), col);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java
new file mode 100644
index 0000000..6b38e12
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.types.TypedSnapshotBase.Value;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TypeLayerTest {
+
+  @Test
+  public void testColumns() throws Exception {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockTransactionBase tt =
+        new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+            "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+    TypedTransactionBase ttx = tl.wrap(tt);
+
+    Map<Column, Value> results =
+        ttx.get().row("r2")
+            .columns(ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7")));
+
+    Assert.assertNull(results.get(new Column("cf2", "6")).toInteger());
+    Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0));
+    Assert.assertEquals(12, (int) results.get(new Column("cf2", "7")).toInteger());
+    Assert.assertEquals(12, results.get(new Column("cf2", "7")).toInteger(0));
+
+    Assert.assertEquals(1, results.size());
+
+    results =
+        ttx.get()
+            .row("r2")
+            .columns(
+                ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2",
+                    "8")));
+
+    Assert.assertNull(results.get(new Column("cf2", "6")).toInteger());
+    Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0));
+    Assert.assertEquals(12, (int) results.get(new Column("cf2", "7")).toInteger());
+    Assert.assertEquals(12, results.get(new Column("cf2", "7")).toInteger(0));
+    Assert.assertEquals(13, (int) results.get(new Column("cf2", "8")).toInteger());
+    Assert.assertEquals(13, results.get(new Column("cf2", "8")).toInteger(0));
+
+    Assert.assertEquals(2, results.size());
+
+    // test var args
+    Map<Column, Value> results2 =
+        ttx.get().row("r2")
+            .columns(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2", "8"));
+    Assert.assertEquals(results, results2);
+  }
+
+  @Test
+  public void testVis() throws Exception {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockTransactionBase tt = new MockTransactionBase("r1,cf1:cq1:A,v1", "r1,cf1:cq2:A&B,v2");
+
+    TypedTransactionBase ttx = tl.wrap(tt);
+
+    Assert.assertNull(ttx.get().row("r1").fam("cf1").qual("cq1").toString());
+    Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A").toString());
+    Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A".getBytes())
+        .toString());
+    Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A"))
+        .toString());
+    Assert.assertEquals("v1",
+        ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A".getBytes())).toString());
+
+    Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString());
+    Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes())
+        .toString());
+    Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B"))
+        .toString());
+    Assert.assertNull("v1",
+        ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes()))
+            .toString());
+
+    Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString("v3"));
+    Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes())
+        .toString("v3"));
+    Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B"))
+        .toString("v3"));
+    Assert.assertEquals(
+        "v3",
+        ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes()))
+            .toString("v3"));
+
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").set(3);
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).set(4);
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&D")).set(5);
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&F".getBytes())).set(7);
+
+    Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1:A&B,3", "r1,cf1:cq1:A&C,4",
+        "r1,cf1:cq1:A&D,5", "r1,cf1:cq1:A&F,7"), tt.setData);
+    tt.setData.clear();
+
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").delete();
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).delete();
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&D")).delete();
+    ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&F".getBytes())).delete();
+
+    Assert.assertEquals(MockTransactionBase.toRCM("r1,cf1:cq1:A&B", "r1,cf1:cq1:A&C",
+        "r1,cf1:cq1:A&D", "r1,cf1:cq1:A&F"), tt.deletes);
+    tt.deletes.clear();
+    Assert.assertEquals(0, tt.setData.size());
+    Assert.assertEquals(0, tt.weakNotifications.size());
+
+  }
+
+  @Test
+  public void testBuildColumn() {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0".getBytes()).qual("q0".getBytes())
+        .vis());
+    Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0").qual("q0").vis());
+    Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5).qual(7).vis());
+    Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5l).qual(7l).vis());
+    Assert.assertEquals(new Column("5", "7"), tl.bc().fam(Bytes.of("5")).qual(Bytes.of("7")).vis());
+    Assert.assertEquals(new Column("5", "7"),
+        tl.bc().fam(ByteBuffer.wrap("5".getBytes())).qual(ByteBuffer.wrap("7".getBytes())).vis());
+
+    Assert.assertEquals(new Column("f0", "q0", "A&B"),
+        tl.bc().fam("f0".getBytes()).qual("q0".getBytes()).vis("A&B"));
+    Assert.assertEquals(new Column("f0", "q0", "A&C"),
+        tl.bc().fam("f0").qual("q0").vis("A&C".getBytes()));
+    Assert.assertEquals(new Column("5", "7", "A&D"), tl.bc().fam(5).qual(7).vis(Bytes.of("A&D")));
+    Assert.assertEquals(new Column("5", "7", "A&D"),
+        tl.bc().fam(5).qual(7).vis(ByteBuffer.wrap("A&D".getBytes())));
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockSnapshot ms =
+        new MockSnapshot("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+            "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20",
+            "r3,cf3:cq3,28.195", "r4,cf4:cq4,true");
+
+    TypedSnapshot tts = tl.wrap(ms);
+
+    Assert.assertEquals("v1", tts.get().row("r1").fam("cf1").qual("cq1").toString());
+    Assert.assertEquals("v1", tts.get().row("r1").fam("cf1").qual("cq1").toString("b"));
+    Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual("8").toString());
+    Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual("8").toString("b"));
+    Assert.assertEquals("28.195", tts.get().row("r3").fam("cf3").qual("cq3").toString());
+    Assert.assertEquals("28.195", tts.get().row("r3").fam("cf3").qual("cq3").toString("b"));
+    Assert.assertEquals("true", tts.get().row("r4").fam("cf4").qual("cq4").toString());
+    Assert.assertEquals("true", tts.get().row("r4").fam("cf4").qual("cq4").toString("b"));
+
+    // try converting to different types
+    Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual(8).toString());
+    Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual(8).toString("b"));
+    Assert.assertEquals((Integer) 13, tts.get().row("r2").fam("cf2").qual(8).toInteger());
+    Assert.assertEquals(13, tts.get().row("r2").fam("cf2").qual(8).toInteger(14));
+    Assert.assertEquals((Long) 13l, tts.get().row("r2").fam("cf2").qual(8).toLong());
+    Assert.assertEquals(13l, tts.get().row("r2").fam("cf2").qual(8).toLong(14l));
+    Assert.assertEquals("13", new String(tts.get().row("r2").fam("cf2").qual(8).toBytes()));
+    Assert.assertEquals("13",
+        new String(tts.get().row("r2").fam("cf2").qual(8).toBytes("14".getBytes())));
+    Assert
+        .assertEquals("13", new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes()));
+    Assert.assertEquals("13",
+        new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes("14".getBytes())));
+    Assert.assertEquals("13",
+        Bytes.of(tts.get().row("r2").col(new Column("cf2", "8")).toByteBuffer()).toString());
+    Assert.assertEquals(
+        "13",
+        Bytes.of(
+            tts.get().row("r2").col(new Column("cf2", "8"))
+                .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString());
+
+    // test non-existent
+    Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toInteger());
+    Assert.assertEquals(14, tts.get().row("r2").fam("cf3").qual(8).toInteger(14));
+    Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toLong());
+    Assert.assertEquals(14l, tts.get().row("r2").fam("cf3").qual(8).toLong(14l));
+    Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toString());
+    Assert.assertEquals("14", tts.get().row("r2").fam("cf3").qual(8).toString("14"));
+    Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toBytes());
+    Assert.assertEquals("14",
+        new String(tts.get().row("r2").fam("cf3").qual(8).toBytes("14".getBytes())));
+    Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toBytes());
+    Assert.assertEquals("14",
+        new String(tts.get().row("r2").col(new Column("cf3", "8")).toBytes("14".getBytes())));
+    Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toByteBuffer());
+    Assert.assertEquals(
+        "14",
+        Bytes.of(
+            tts.get().row("r2").col(new Column("cf3", "8"))
+                .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString());
+
+    // test float & double
+    Assert.assertEquals((Float) 28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat());
+    Assert.assertEquals(28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat(39.383f), 0.0);
+    Assert.assertEquals((Double) 28.195d, tts.get().row("r3").fam("cf3").qual("cq3").toDouble());
+    Assert.assertEquals(28.195d, tts.get().row("r3").fam("cf3").qual("cq3").toDouble(39.383d), 0.0);
+
+    // test boolean
+    Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean());
+    Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean());
+    Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean(false));
+    Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean(false));
+
+    // try different types for row
+    Assert.assertEquals("20", tts.get().row(13).fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row(13l).fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13".getBytes()).fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row(ByteBuffer.wrap("13".getBytes())).fam("9").qual("17")
+        .toString());
+
+    // try different types for cf
+    Assert.assertEquals("20", tts.get().row("13").fam(9).qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam(9l).qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9".getBytes()).qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam(ByteBuffer.wrap("9".getBytes())).qual("17")
+        .toString());
+
+    // try different types for cq
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17l).toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17).toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17".getBytes()).toString());
+    Assert.assertEquals("20", tts.get().row("13").fam("9").qual(ByteBuffer.wrap("17".getBytes()))
+        .toString());
+
+    ms.close();
+    tts.close();
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockTransactionBase tt =
+        new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+            "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+    TypedTransactionBase ttx = tl.wrap(tt);
+
+    // test increments data
+    ttx.mutate().row("13").fam("9").qual("17").increment(1);
+    ttx.mutate().row("13").fam("9").qual(18).increment(2);
+    ttx.mutate().row("13").fam("9").qual(19l).increment(3);
+    ttx.mutate().row("13").fam("9").qual("20".getBytes()).increment(4);
+    ttx.mutate().row("13").fam("9").qual(Bytes.of("21")).increment(5); // increment non existent
+    ttx.mutate().row("13").col(new Column("9", "22")).increment(6); // increment non existent
+    ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7); // increment
+                                                                                         // non
+                                                                                         // existent
+
+    Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,22", "13,9:19,23",
+        "13,9:20,24", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData);
+    tt.setData.clear();
+
+    // test increments long
+    ttx.mutate().row("13").fam("9").qual("17").increment(1l);
+    ttx.mutate().row("13").fam("9").qual(18).increment(2l);
+    ttx.mutate().row("13").fam("9").qual(19l).increment(3l);
+    ttx.mutate().row("13").fam("9").qual("20".getBytes()).increment(4l);
+    ttx.mutate().row("13").fam("9").qual(Bytes.of("21")).increment(5l); // increment non existent
+    ttx.mutate().row("13").col(new Column("9", "22")).increment(6l); // increment non existent
+    ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7l); // increment
+                                                                                          // non
+                                                                                          // existent
+
+    Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,22", "13,9:19,23",
+        "13,9:20,24", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData);
+    tt.setData.clear();
+
+    // test setting data
+    ttx.mutate().row("13").fam("9").qual("16").set();
+    ttx.mutate().row("13").fam("9").qual("17").set(3);
+    ttx.mutate().row("13").fam("9").qual(18).set(4l);
+    ttx.mutate().row("13").fam("9").qual(19l).set("5");
+    ttx.mutate().row("13").fam("9").qual("20".getBytes()).set("6".getBytes());
+    ttx.mutate().row("13").col(new Column("9", "21")).set("7".getBytes());
+    ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes()))
+        .set(ByteBuffer.wrap("8".getBytes()));
+    ttx.mutate().row("13").fam("9").qual("23").set(2.54f);
+    ttx.mutate().row("13").fam("9").qual("24").set(-6.135d);
+    ttx.mutate().row("13").fam("9").qual("25").set(false);
+
+    Assert.assertEquals(MockTransactionBase.toRCVM("13,9:16,", "13,9:17,3", "13,9:18,4",
+        "13,9:19,5", "13,9:20,6", "13,9:21,7", "13,9:22,8", "13,9:23,2.54", "13,9:24,-6.135",
+        "13,9:25,false"), tt.setData);
+    tt.setData.clear();
+
+    // test deleting data
+    ttx.mutate().row("13").fam("9").qual("17").delete();
+    ttx.mutate().row("13").fam("9").qual(18).delete();
+    ttx.mutate().row("13").fam("9").qual(19l).delete();
+    ttx.mutate().row("13").fam("9").qual("20".getBytes()).delete();
+    ttx.mutate().row("13").col(new Column("9", "21")).delete();
+    ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).delete();
+
+    Assert
+        .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20",
+            "13,9:21", "13,9:22"), tt.deletes);
+    tt.deletes.clear();
+    Assert.assertEquals(0, tt.setData.size());
+    Assert.assertEquals(0, tt.weakNotifications.size());
+
+    // test weak notifications
+    ttx.mutate().row("13").fam("9").qual("17").weaklyNotify();
+    ttx.mutate().row("13").fam("9").qual(18).weaklyNotify();
+    ttx.mutate().row("13").fam("9").qual(19l).weaklyNotify();
+    ttx.mutate().row("13").fam("9").qual("20".getBytes()).weaklyNotify();
+    ttx.mutate().row("13").col(new Column("9", "21")).weaklyNotify();
+    ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).weaklyNotify();
+
+    Assert
+        .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20",
+            "13,9:21", "13,9:22"), tt.weakNotifications);
+    tt.weakNotifications.clear();
+    Assert.assertEquals(0, tt.setData.size());
+    Assert.assertEquals(0, tt.deletes.size());
+  }
+
+  @Test
+  public void testMultiRow() throws Exception {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockTransactionBase tt =
+        new MockTransactionBase("11,cf1:cq1,1", "11,cf1:cq2,2", "12,cf1:cq1,3", "12,cf1:cq2,4",
+            "13,cf1:cq1,5", "13,cf1:cq2,6");
+
+    TypedTransactionBase ttx = tl.wrap(tt);
+
+    Bytes br1 = Bytes.of("11");
+    Bytes br2 = Bytes.of("12");
+    Bytes br3 = Bytes.of("13");
+
+    Column c1 = new Column("cf1", "cq1");
+    Column c2 = new Column("cf1", "cq2");
+
+    Map<Bytes, Map<Column, Value>> map1 =
+        ttx.get().rows(Arrays.asList(br1, br2)).columns(c1).toBytesMap();
+
+    Assert.assertEquals(map1, ttx.get().rows(br1, br2).columns(c1).toBytesMap());
+
+    Assert.assertEquals("1", map1.get(br1).get(c1).toString());
+    Assert.assertEquals("1", map1.get(br1).get(c1).toString("5"));
+    Assert.assertEquals((Long) (1l), map1.get(br1).get(c1).toLong());
+    Assert.assertEquals(1l, map1.get(br1).get(c1).toLong(5));
+    Assert.assertEquals((Integer) (1), map1.get(br1).get(c1).toInteger());
+    Assert.assertEquals(1, map1.get(br1).get(c1).toInteger(5));
+
+    Assert.assertEquals("5", map1.get(br3).get(c1).toString("5"));
+    Assert.assertNull(map1.get(br3).get(c1).toString());
+    Assert.assertEquals(5l, map1.get(br3).get(c1).toLong(5l));
+    Assert.assertNull(map1.get(br3).get(c1).toLong());
+    Assert.assertEquals(5, map1.get(br1).get(c2).toInteger(5));
+    Assert.assertNull(map1.get(br1).get(c2).toInteger());
+
+    Assert.assertEquals(2, map1.size());
+    Assert.assertEquals(1, map1.get(br1).size());
+    Assert.assertEquals(1, map1.get(br2).size());
+    Assert.assertEquals("3", map1.get(br2).get(c1).toString());
+
+    Map<String, Map<Column, Value>> map2 =
+        ttx.get().rowsString(Arrays.asList("11", "13")).columns(c1).toStringMap();
+
+    Assert.assertEquals(map2, ttx.get().rowsString("11", "13").columns(c1).toStringMap());
+
+    Assert.assertEquals(2, map2.size());
+    Assert.assertEquals(1, map2.get("11").size());
+    Assert.assertEquals(1, map2.get("13").size());
+    Assert.assertEquals((Long) (1l), map2.get("11").get(c1).toLong());
+    Assert.assertEquals(5l, map2.get("13").get(c1).toLong(6));
+
+    Map<Long, Map<Column, Value>> map3 =
+        ttx.get().rowsLong(Arrays.asList(11l, 13l)).columns(c1).toLongMap();
+
+    Assert.assertEquals(map3, ttx.get().rowsLong(11l, 13l).columns(c1).toLongMap());
+
+    Assert.assertEquals(2, map3.size());
+    Assert.assertEquals(1, map3.get(11l).size());
+    Assert.assertEquals(1, map3.get(13l).size());
+    Assert.assertEquals((Long) (1l), map3.get(11l).get(c1).toLong());
+    Assert.assertEquals(5l, map3.get(13l).get(c1).toLong(6));
+
+    Map<Integer, Map<Column, Value>> map4 =
+        ttx.get().rowsInteger(Arrays.asList(11, 13)).columns(c1).toIntegerMap();
+
+    Assert.assertEquals(map4, ttx.get().rowsInteger(11, 13).columns(c1).toIntegerMap());
+
+    Assert.assertEquals(2, map4.size());
+    Assert.assertEquals(1, map4.get(11).size());
+    Assert.assertEquals(1, map4.get(13).size());
+    Assert.assertEquals((Long) (1l), map4.get(11).get(c1).toLong());
+    Assert.assertEquals(5l, map4.get(13).get(c1).toLong(6));
+
+    Map<Integer, Map<Column, Value>> map5 =
+        ttx.get().rowsBytes(Arrays.asList("11".getBytes(), "13".getBytes())).columns(c1)
+            .toIntegerMap();
+
+    Assert.assertEquals(map5, ttx.get().rowsBytes("11".getBytes(), "13".getBytes()).columns(c1)
+        .toIntegerMap());
+
+    Assert.assertEquals(2, map5.size());
+    Assert.assertEquals(1, map5.get(11).size());
+    Assert.assertEquals(1, map5.get(13).size());
+    Assert.assertEquals((Long) (1l), map5.get(11).get(c1).toLong());
+    Assert.assertEquals(5l, map5.get(13).get(c1).toLong(6));
+
+    Map<Integer, Map<Column, Value>> map6 =
+        ttx.get()
+            .rowsByteBuffers(
+                Arrays.asList(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes())))
+            .columns(c1).toIntegerMap();
+
+    Assert.assertEquals(
+        map6,
+        ttx.get()
+            .rowsByteBuffers(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes()))
+            .columns(c1).toIntegerMap());
+
+    Assert.assertEquals(2, map6.size());
+    Assert.assertEquals(1, map6.get(11).size());
+    Assert.assertEquals(1, map6.get(13).size());
+    Assert.assertEquals((Long) (1l), map6.get(11).get(c1).toLong());
+    Assert.assertEquals(5l, map6.get(13).get(c1).toLong(6));
+
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    TypeLayer tl = new TypeLayer(new StringEncoder());
+
+    MockTransactionBase tt =
+        new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+            "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+    TypedTransactionBase ttx = tl.wrap(tt);
+
+    Assert.assertEquals(Bytes.of("12"), ttx.get(Bytes.of("r2"), new Column("cf2", "7")));
+    Assert.assertNull(ttx.get(Bytes.of("r2"), new Column("cf2", "9")));
+
+    Map<Column, Bytes> map =
+        ttx.get(Bytes.of("r2"), ImmutableSet.of(new Column("cf2", "7"), new Column("cf2", "8")));
+    Assert.assertEquals(2, map.size());
+    Assert.assertEquals("12", map.get(new Column("cf2", "7")).toString());
+    Assert.assertEquals("13", map.get(new Column("cf2", "8")).toString());
+
+    map = ttx.get(Bytes.of("r6"), ImmutableSet.of(new Column("cf2", "7"), new Column("cf2", "8")));
+    Assert.assertEquals(0, map.size());
+
+    ttx.set(Bytes.of("r6"), new Column("cf2", "7"), Bytes.of("3"));
+    Assert.assertEquals(MockTransactionBase.toRCVM("r6,cf2:7,3"), tt.setData);
+    tt.setData.clear();
+
+    Map<Bytes, Map<Column, Bytes>> map2 =
+        ttx.get(ImmutableSet.of(Bytes.of("r1"), Bytes.of("r2")),
+            ImmutableSet.of(new Column("cf1", "cq1"), new Column("cf2", "8")));
+    Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1,v1", "r2,cf2:8,13"), map2);
+
+    ttx.delete(Bytes.of("r6"), new Column("cf2", "7"));
+    Assert.assertEquals(MockTransactionBase.toRCM("r6,cf2:7"), tt.deletes);
+    tt.deletes.clear();
+
+    ttx.setWeakNotification(Bytes.of("r6"), new Column("cf2", "8"));
+    Assert.assertEquals(MockTransactionBase.toRCM("r6,cf2:8"), tt.weakNotifications);
+    tt.weakNotifications.clear();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/data/RowHasherTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/data/RowHasherTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/data/RowHasherTest.java
deleted file mode 100644
index 3d52c51..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/data/RowHasherTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.data;
-
-import org.apache.fluo.api.data.Bytes;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class RowHasherTest {
-
-  @Test
-  public void testBadPrefixes() {
-    String[] badPrefixes =
-        {"q:she6:test1", "q:she6:test1", "p:Mhe6:test1", "p;she6:test1", "p:she6;test1",
-            "p;she6;test1", "p:+he6:test1", "p:s?e6:test1", "p:sh{6:test1", "p:sh6:"};
-
-    RowHasher rh = new RowHasher("p");
-    for (String badPrefix : badPrefixes) {
-      try {
-        rh.removeHash(Bytes.of(badPrefix));
-        Assert.fail();
-      } catch (IllegalArgumentException e) {
-      }
-    }
-  }
-
-  @Test
-  public void testBasic() {
-    RowHasher rh = new RowHasher("p");
-    Assert.assertTrue(rh.removeHash(rh.addHash("abc")).toString().equals("abc"));
-    rh = new RowHasher("p2");
-    Assert.assertTrue(rh.removeHash(rh.addHash("abc")).toString().equals("abc"));
-
-    Assert.assertTrue(rh.addHash("abc").toString().startsWith("p2:"));
-
-    // test to ensure hash is stable over time
-    Assert.assertEquals("p2:she6:test1", rh.addHash("test1").toString());
-    Assert.assertEquals("p2:hgt0:0123456789abcdefghijklmnopqrstuvwxyz",
-        rh.addHash("0123456789abcdefghijklmnopqrstuvwxyz").toString());
-    Assert.assertEquals("p2:fluo:86ce3b094982c6a", rh.addHash("86ce3b094982c6a").toString());
-  }
-
-  @Test
-  public void testBalancerRegex() {
-    RowHasher rh = new RowHasher("p");
-    String regex = rh.getTableOptimizations(3).getTabletGroupingRegex();
-    Assert.assertEquals("(\\Qp:\\E).*", regex);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentLoader.java
deleted file mode 100644
index 8fe2b19..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentLoader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.fluo.recipes.types.TypedLoader;
-import org.apache.fluo.recipes.types.TypedTransactionBase;
-
-public class DocumentLoader extends TypedLoader {
-
-  String docid;
-  String refs[];
-
-  DocumentLoader(String docid, String... refs) {
-    this.docid = docid;
-    this.refs = refs;
-  }
-
-  @Override
-  public void load(TypedTransactionBase tx, Context context) throws Exception {
-    tx.mutate().row("d:" + docid).fam("content").qual("new").set(StringUtils.join(refs, " "));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentObserver.java
deleted file mode 100644
index a31461c..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentObserver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.export.ExportTestBase.RefExporter;
-import org.apache.fluo.recipes.types.TypedObserver;
-import org.apache.fluo.recipes.types.TypedTransactionBase;
-
-public class DocumentObserver extends TypedObserver {
-
-  ExportQueue<String, RefUpdates> refExportQueue;
-
-  @Override
-  public void init(Context context) throws Exception {
-    refExportQueue = ExportQueue.getInstance(RefExporter.QUEUE_ID, context.getAppConfiguration());
-  }
-
-  @Override
-  public ObservedColumn getObservedColumn() {
-    return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG);
-  }
-
-  @Override
-  public void process(TypedTransactionBase tx, Bytes row, Column col) {
-    String newContent = tx.get().row(row).col(col).toString();
-    Set<String> newRefs = new HashSet<>(Arrays.asList(newContent.split(" ")));
-    Set<String> currentRefs =
-        new HashSet<>(Arrays.asList(tx.get().row(row).fam("content").qual("current").toString("")
-            .split(" ")));
-
-    Set<String> addedRefs = new HashSet<>(newRefs);
-    addedRefs.removeAll(currentRefs);
-
-    Set<String> deletedRefs = new HashSet<>(currentRefs);
-    deletedRefs.removeAll(newRefs);
-
-    String key = row.toString().substring(2);
-    RefUpdates val = new RefUpdates(addedRefs, deletedRefs);
-
-    refExportQueue.add(tx, key, val);
-
-    tx.mutate().row(row).fam("content").qual("current").set(newContent);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportBufferIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportBufferIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportBufferIT.java
deleted file mode 100644
index 53d838d..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportBufferIT.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Transaction;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ExportBufferIT extends ExportTestBase {
-
-  @Override
-  protected int getNumBuckets() {
-    return 2;
-  }
-
-  @Override
-  protected Integer getBufferSize() {
-    return 1024;
-  }
-
-  @Test
-  public void testSmallExportBuffer() {
-    // try setting the export buffer size small. Make sure everything is exported.
-
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      ExportQueue<String, RefUpdates> refExportQueue =
-          ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
-      try (Transaction tx = fc.newTransaction()) {
-        for (int i = 0; i < 1000; i++) {
-          refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 10, i + 20), ns(new int[0])));
-        }
-
-        tx.commit();
-      }
-    }
-
-    miniFluo.waitForObservers();
-
-    Map<String, Set<String>> erefs = getExportedReferees();
-    Map<String, Set<String>> expected = new HashMap<>();
-
-    for (int i = 0; i < 1000; i++) {
-      expected.computeIfAbsent(nk(i + 10), s -> new HashSet<>()).add(nk(i));
-      expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
-    }
-
-    assertEquals(expected, erefs);
-    int prevNumExportCalls = getNumExportCalls();
-    Assert.assertTrue(prevNumExportCalls > 10); // with small buffer there should be lots of exports
-                                                // calls
-
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      ExportQueue<String, RefUpdates> refExportQueue =
-          ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
-      try (Transaction tx = fc.newTransaction()) {
-        for (int i = 0; i < 1000; i++) {
-          refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 12), ns(i + 10)));
-        }
-
-        tx.commit();
-      }
-    }
-
-    miniFluo.waitForObservers();
-
-    erefs = getExportedReferees();
-    expected = new HashMap<>();
-
-    for (int i = 0; i < 1000; i++) {
-      expected.computeIfAbsent(nk(i + 12), s -> new HashSet<>()).add(nk(i));
-      expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
-    }
-
-    assertEquals(expected, erefs);
-    prevNumExportCalls = getNumExportCalls() - prevNumExportCalls;
-    Assert.assertTrue(prevNumExportCalls > 10);
-  }
-
-  public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual) {
-    if (!expected.equals(actual)) {
-      System.out.println("*** diff ***");
-      diff(expected, actual);
-      Assert.fail();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportQueueIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportQueueIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportQueueIT.java
deleted file mode 100644
index baa979a..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportQueueIT.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.LoaderExecutor;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ExportQueueIT extends ExportTestBase {
-
-  @Test
-  public void testExport() {
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0999", "0005", "0002"));
-        loader.execute(new DocumentLoader("0002", "0999", "0042"));
-        loader.execute(new DocumentLoader("0005", "0999", "0042"));
-        loader.execute(new DocumentLoader("0042", "0999"));
-      }
-
-      miniFluo.waitForObservers();
-
-      Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
-      Assert.assertEquals(ns("0999"), getExportedReferees("0002"));
-      Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
-      Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0999", "0005", "0042"));
-      }
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0999", "0005"));
-      }
-
-      miniFluo.waitForObservers();
-
-      Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
-      Assert.assertEquals(ns(new String[0]), getExportedReferees("0002"));
-      Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
-      Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0042", "0999", "0002", "0005"));
-        loader.execute(new DocumentLoader("0005", "0002"));
-      }
-
-      try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-        loader.execute(new DocumentLoader("0005", "0003"));
-      }
-
-      miniFluo.waitForObservers();
-
-      Assert.assertEquals(ns("0002", "0042"), getExportedReferees("0999"));
-      Assert.assertEquals(ns("0042"), getExportedReferees("0002"));
-      Assert.assertEquals(ns("0005"), getExportedReferees("0003"));
-      Assert.assertEquals(ns("0999", "0042"), getExportedReferees("0005"));
-      Assert.assertEquals(ns("0002"), getExportedReferees("0042"));
-
-    }
-  }
-
-  @Test
-  public void exportStressTest() {
-    FluoConfiguration config = new FluoConfiguration(miniFluo.getClientConfiguration());
-    config.setLoaderQueueSize(100);
-    config.setLoaderThreads(20);
-
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-
-      loadRandom(fc, 1000, 500);
-
-      miniFluo.waitForObservers();
-
-      diff(getFluoReferees(fc), getExportedReferees());
-
-      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-
-      loadRandom(fc, 1000, 500);
-
-      miniFluo.waitForObservers();
-
-      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-
-      loadRandom(fc, 1000, 10000);
-
-      miniFluo.waitForObservers();
-
-      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-
-      loadRandom(fc, 1000, 10000);
-
-      miniFluo.waitForObservers();
-
-      assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportTestBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportTestBase.java
deleted file mode 100644
index 2584df4..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportTestBase.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-
-import com.google.common.collect.Iterators;
-import org.apache.commons.io.FileUtils;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.LoaderExecutor;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-
-public class ExportTestBase {
-
-  private static Map<String, Map<String, RefInfo>> globalExports = new HashMap<>();
-  private static int exportCalls = 0;
-
-  protected static Set<String> getExportedReferees(String node) {
-    synchronized (globalExports) {
-      Set<String> ret = new HashSet<>();
-
-      Map<String, RefInfo> referees = globalExports.get(node);
-
-      if (referees == null) {
-        return ret;
-      }
-
-      referees.forEach((k, v) -> {
-        if (!v.deleted)
-          ret.add(k);
-      });
-
-      return ret;
-    }
-  }
-
-  protected static Map<String, Set<String>> getExportedReferees() {
-    synchronized (globalExports) {
-
-      Map<String, Set<String>> ret = new HashMap<>();
-
-      for (String k : globalExports.keySet()) {
-        Set<String> referees = getExportedReferees(k);
-        if (referees.size() > 0) {
-          ret.put(k, referees);
-        }
-      }
-
-      return ret;
-    }
-  }
-
-  protected static int getNumExportCalls() {
-    synchronized (globalExports) {
-      return exportCalls;
-    }
-  }
-
-  public static class RefExporter extends Exporter<String, RefUpdates> {
-
-    public static final String QUEUE_ID = "req";
-
-    private void updateExports(String key, long seq, String addedRef, boolean deleted) {
-      Map<String, RefInfo> referees = globalExports.computeIfAbsent(addedRef, k -> new HashMap<>());
-      referees.compute(key, (k, v) -> (v == null || v.seq < seq) ? new RefInfo(seq, deleted) : v);
-    }
-
-    @Override
-    protected void processExports(Iterator<SequencedExport<String, RefUpdates>> exportIterator) {
-      ArrayList<SequencedExport<String, RefUpdates>> exportList = new ArrayList<>();
-      Iterators.addAll(exportList, exportIterator);
-
-      synchronized (globalExports) {
-        exportCalls++;
-
-        for (SequencedExport<String, RefUpdates> se : exportList) {
-          for (String addedRef : se.getValue().getAddedRefs()) {
-            updateExports(se.getKey(), se.getSequence(), addedRef, false);
-          }
-
-          for (String deletedRef : se.getValue().getDeletedRefs()) {
-            updateExports(se.getKey(), se.getSequence(), deletedRef, true);
-          }
-        }
-      }
-    }
-  }
-
-  protected MiniFluo miniFluo;
-
-  protected int getNumBuckets() {
-    return 13;
-  }
-
-  protected Integer getBufferSize() {
-    return null;
-  }
-
-  @Before
-  public void setUpFluo() throws Exception {
-    FileUtils.deleteQuietly(new File("target/mini"));
-
-    FluoConfiguration props = new FluoConfiguration();
-    props.setApplicationName("eqt");
-    props.setWorkerThreads(20);
-    props.setMiniDataDir("target/mini");
-
-    ObserverConfiguration doc = new ObserverConfiguration(DocumentObserver.class.getName());
-    props.addObserver(doc);
-
-    SimpleSerializer.setSetserlializer(props, GsonSerializer.class);
-
-    ExportQueue.Options exportQueueOpts =
-        new ExportQueue.Options(RefExporter.QUEUE_ID, RefExporter.class, String.class,
-            RefUpdates.class, getNumBuckets());
-
-    if (getBufferSize() != null) {
-      exportQueueOpts.setBufferSize(getBufferSize());
-    }
-
-    ExportQueue.configure(props, exportQueueOpts);
-
-    miniFluo = FluoFactory.newMiniFluo(props);
-
-    globalExports.clear();
-    exportCalls = 0;
-  }
-
-  @After
-  public void tearDownFluo() throws Exception {
-    if (miniFluo != null) {
-      miniFluo.close();
-    }
-  }
-
-  protected static Set<String> ns(String... sa) {
-    return new HashSet<>(Arrays.asList(sa));
-  }
-
-  protected static String nk(int i) {
-    return String.format("%06d", i);
-  }
-
-  protected static Set<String> ns(int... ia) {
-    HashSet<String> ret = new HashSet<>();
-    for (int i : ia) {
-      ret.add(nk(i));
-    }
-    return ret;
-  }
-
-  public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual,
-      FluoClient fc) {
-    if (!expected.equals(actual)) {
-      System.out.println("*** diff ***");
-      diff(expected, actual);
-      System.out.println("*** fluo dump ***");
-      dump(fc);
-      System.out.println("*** map dump ***");
-
-      Assert.fail();
-    }
-  }
-
-  protected void loadRandom(FluoClient fc, int num, int maxDocId) {
-    try (LoaderExecutor loader = fc.newLoaderExecutor()) {
-      Random rand = new Random();
-
-      for (int i = 0; i < num; i++) {
-        String docid = String.format("%05d", rand.nextInt(maxDocId));
-        String[] refs = new String[rand.nextInt(20) + 1];
-        for (int j = 0; j < refs.length; j++) {
-          refs[j] = String.format("%05d", rand.nextInt(maxDocId));
-        }
-
-        loader.execute(new DocumentLoader(docid, refs));
-      }
-    }
-  }
-
-  protected void diff(Map<String, Set<String>> fr, Map<String, Set<String>> er) {
-    HashSet<String> allKeys = new HashSet<>(fr.keySet());
-    allKeys.addAll(er.keySet());
-
-    for (String k : allKeys) {
-      Set<String> s1 = fr.getOrDefault(k, Collections.emptySet());
-      Set<String> s2 = er.getOrDefault(k, Collections.emptySet());
-
-      HashSet<String> sub1 = new HashSet<>(s1);
-      sub1.removeAll(s2);
-
-      HashSet<String> sub2 = new HashSet<>(s2);
-      sub2.removeAll(s1);
-
-      if (sub1.size() > 0 || sub2.size() > 0) {
-        System.out.println(k + " " + sub1 + " " + sub2);
-      }
-
-    }
-  }
-
-  protected Map<String, Set<String>> getFluoReferees(FluoClient fc) {
-    Map<String, Set<String>> fluoReferees = new HashMap<>();
-
-    try (Snapshot snap = fc.newSnapshot()) {
-      ScannerConfiguration scannerConfig = new ScannerConfiguration();
-      scannerConfig.fetchColumn(Bytes.of("content"), Bytes.of("current"));
-      scannerConfig.setSpan(Span.prefix("d:"));
-      RowIterator scanner = snap.get(scannerConfig);
-      while (scanner.hasNext()) {
-        Entry<Bytes, ColumnIterator> row = scanner.next();
-        ColumnIterator colIter = row.getValue();
-
-        String docid = row.getKey().toString().substring(2);
-
-        while (colIter.hasNext()) {
-          Entry<Column, Bytes> entry = colIter.next();
-
-          String[] refs = entry.getValue().toString().split(" ");
-
-          for (String ref : refs) {
-            if (ref.isEmpty())
-              continue;
-
-            fluoReferees.computeIfAbsent(ref, k -> new HashSet<>()).add(docid);
-          }
-        }
-      }
-    }
-    return fluoReferees;
-  }
-
-  public static void dump(FluoClient fc) {
-    try (Snapshot snap = fc.newSnapshot()) {
-      RowIterator scanner = snap.get(new ScannerConfiguration());
-      while (scanner.hasNext()) {
-        Entry<Bytes, ColumnIterator> row = scanner.next();
-        ColumnIterator colIter = row.getValue();
-
-        while (colIter.hasNext()) {
-          Entry<Column, Bytes> entry = colIter.next();
-
-          System.out.println("row:[" + row.getKey() + "]  col:[" + entry.getKey() + "]  val:["
-              + entry.getValue() + "]");
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/GsonSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/GsonSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/GsonSerializer.java
deleted file mode 100644
index 61bb833..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/GsonSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.nio.charset.StandardCharsets;
-
-import com.google.gson.Gson;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-
-public class GsonSerializer implements SimpleSerializer {
-
-  private Gson gson = new Gson();
-
-  @Override
-  public void init(SimpleConfiguration appConfig) {
-
-  }
-
-  @Override
-  public <T> byte[] serialize(T obj) {
-    return gson.toJson(obj).getBytes(StandardCharsets.UTF_8);
-  }
-
-  @Override
-  public <T> T deserialize(byte[] serObj, Class<T> clazz) {
-    return gson.fromJson(new String(serObj, StandardCharsets.UTF_8), clazz);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/OptionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/OptionsTest.java
deleted file mode 100644
index 16e853b..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/OptionsTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.recipes.export.ExportQueue.Options;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class OptionsTest {
-  @Test
-  public void testExportQueueOptions() {
-    FluoConfiguration conf = new FluoConfiguration();
-
-    ExportQueue.configure(conf, new Options("Q1", "ET", "KT", "VT", 100));
-    ExportQueue.configure(conf, new Options("Q2", "ET2", "KT2", "VT2", 200).setBucketsPerTablet(20)
-        .setBufferSize(1000000));
-
-    Options opts1 = new Options("Q1", conf.getAppConfiguration());
-
-    Assert.assertEquals(opts1.exporterType, "ET");
-    Assert.assertEquals(opts1.keyType, "KT");
-    Assert.assertEquals(opts1.valueType, "VT");
-    Assert.assertEquals(opts1.numBuckets, 100);
-    Assert.assertEquals(opts1.bucketsPerTablet.intValue(), Options.DEFAULT_BUCKETS_PER_TABLET);
-    Assert.assertEquals(opts1.bufferSize.intValue(), Options.DEFAULT_BUFFER_SIZE);
-
-    Options opts2 = new Options("Q2", conf.getAppConfiguration());
-
-    Assert.assertEquals(opts2.exporterType, "ET2");
-    Assert.assertEquals(opts2.keyType, "KT2");
-    Assert.assertEquals(opts2.valueType, "VT2");
-    Assert.assertEquals(opts2.numBuckets, 200);
-    Assert.assertEquals(opts2.bucketsPerTablet.intValue(), 20);
-    Assert.assertEquals(opts2.bufferSize.intValue(), 1000000);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/RefInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/RefInfo.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/RefInfo.java
deleted file mode 100644
index abc518c..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/RefInfo.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-class RefInfo {
-  long seq;
-  boolean deleted;
-
-  public RefInfo(long seq, boolean deleted) {
-    this.seq = seq;
-    this.deleted = deleted;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/RefUpdates.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/RefUpdates.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/RefUpdates.java
deleted file mode 100644
index 31c61b7..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/RefUpdates.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Set;
-
-public class RefUpdates {
-  private Set<String> addedRefs;
-  private Set<String> deletedRefs;
-
-  public RefUpdates() {}
-
-  public RefUpdates(Set<String> addedRefs, Set<String> deletedRefs) {
-    this.addedRefs = addedRefs;
-    this.deletedRefs = deletedRefs;
-  }
-
-  public Set<String> getAddedRefs() {
-    return addedRefs;
-  }
-
-  public Set<String> getDeletedRefs() {
-    return deletedRefs;
-  }
-
-  @Override
-  public String toString() {
-    return "added:" + addedRefs + " deleted:" + deletedRefs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/map/BigUpdateIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/map/BigUpdateIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/map/BigUpdateIT.java
deleted file mode 100644
index d6409a6..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/map/BigUpdateIT.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.map;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import org.apache.commons.io.FileUtils;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-import org.apache.fluo.recipes.types.StringEncoder;
-import org.apache.fluo.recipes.types.TypeLayer;
-import org.apache.fluo.recipes.types.TypedSnapshot;
-import org.apache.fluo.recipes.types.TypedTransactionBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * This test configures a small buffer size and verifies that multiple passes are made to process
- * updates.
- */
-public class BigUpdateIT {
-  private static final TypeLayer tl = new TypeLayer(new StringEncoder());
-
-  private MiniFluo miniFluo;
-
-  private CollisionFreeMap<String, Long> wcMap;
-
-  static final String MAP_ID = "bu";
-
-  public static class LongCombiner implements Combiner<String, Long> {
-
-    @Override
-    public Optional<Long> combine(String key, Iterator<Long> updates) {
-      long[] count = new long[] {0};
-      updates.forEachRemaining(l -> count[0] += l);
-      return Optional.of(count[0]);
-    }
-  }
-
-  static final Column DSCOL = new Column("debug", "sum");
-
-  private static AtomicInteger globalUpdates = new AtomicInteger(0);
-
-  public static class MyObserver extends UpdateObserver<String, Long> {
-
-    @Override
-    public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
-      TypedTransactionBase ttx = tl.wrap(tx);
-
-      Map<String, Long> expectedOld = new HashMap<>();
-
-
-      while (updates.hasNext()) {
-        Update<String, Long> update = updates.next();
-
-        if (update.getOldValue().isPresent()) {
-          expectedOld.put("side:" + update.getKey(), update.getOldValue().get());
-        }
-
-        ttx.mutate().row("side:" + update.getKey()).col(DSCOL).set(update.getNewValue().get());
-      }
-
-      // get last values set to verify same as passed in old value
-      Map<String, Long> actualOld =
-          Maps.transformValues(
-              ttx.get().rowsString(expectedOld.keySet()).columns(ImmutableSet.of(DSCOL))
-                  .toStringMap(), m -> m.get(DSCOL).toLong());
-
-      MapDifference<String, Long> diff = Maps.difference(expectedOld, actualOld);
-
-      Assert.assertTrue(diff.toString(), diff.areEqual());
-
-      globalUpdates.incrementAndGet();
-    }
-  }
-
-  @Before
-  public void setUpFluo() throws Exception {
-    FileUtils.deleteQuietly(new File("target/mini"));
-
-    FluoConfiguration props = new FluoConfiguration();
-    props.setApplicationName("eqt");
-    props.setWorkerThreads(20);
-    props.setMiniDataDir("target/mini");
-
-    SimpleSerializer.setSetserlializer(props, TestSerializer.class);
-
-    CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, LongCombiner.class,
-        MyObserver.class, String.class, Long.class, 2).setBufferSize(1 << 10));
-
-    miniFluo = FluoFactory.newMiniFluo(props);
-
-    wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration());
-
-    globalUpdates.set(0);
-  }
-
-  @After
-  public void tearDownFluo() throws Exception {
-    if (miniFluo != null) {
-      miniFluo.close();
-    }
-  }
-
-  @Test
-  public void testBigUpdates() {
-    try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-      updateMany(fc);
-
-      miniFluo.waitForObservers();
-
-      int numUpdates = 0;
-
-      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
-        checkUpdates(snap, 1, 1000);
-        numUpdates = globalUpdates.get();
-        // there are two buckets, expect update processing at least twice per bucket
-        Assert.assertTrue(numUpdates >= 4);
-      }
-
-      updateMany(fc);
-      updateMany(fc);
-
-      miniFluo.waitForObservers();
-
-      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
-        checkUpdates(snap, 3, 1000);
-        numUpdates = globalUpdates.get() - numUpdates;
-        Assert.assertTrue(numUpdates >= 4);
-      }
-
-      for (int i = 0; i < 10; i++) {
-        updateMany(fc);
-      }
-
-      miniFluo.waitForObservers();
-
-      try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
-        checkUpdates(snap, 13, 1000);
-        numUpdates = globalUpdates.get() - numUpdates;
-        Assert.assertTrue(numUpdates >= 4);
-      }
-    }
-  }
-
-  private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) {
-    RowIterator iter = snap.get(new ScannerConfiguration().setSpan(Span.prefix("side:")));
-
-    int row = 0;
-
-    while (iter.hasNext()) {
-      Entry<Bytes, ColumnIterator> entry = iter.next();
-
-      Assert.assertEquals(String.format("side:%06d", row++), entry.getKey().toString());
-
-      ColumnIterator colITer = entry.getValue();
-      while (colITer.hasNext()) {
-        Entry<Column, Bytes> entry2 = colITer.next();
-        Assert.assertEquals(new Column("debug", "sum"), entry2.getKey());
-        Assert.assertEquals("row : " + entry.getKey(), "" + expectedVal, entry2.getValue()
-            .toString());
-      }
-    }
-
-    Assert.assertEquals(expectedRows, row);
-  }
-
-  private void updateMany(FluoClient fc) {
-    try (Transaction tx = fc.newTransaction()) {
-      Map<String, Long> updates = new HashMap<>();
-      for (int i = 0; i < 1000; i++) {
-        updates.put(String.format("%06d", i), 1L);
-      }
-
-      wcMap.update(tx, updates);
-      tx.commit();
-    }
-  }
-}