You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 22:07:51 UTC
[03/10] incubator-fluo-recipes git commit: Updated package names in
core module
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
new file mode 100644
index 0000000..5a1f5fe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.common;
+
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.recipes.core.export.ExportQueue;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestGrouping {
+ @Test
+ public void testTabletGrouping() {
+ FluoConfiguration conf = new FluoConfiguration();
+
+ CollisionFreeMap.configure(conf, new Options("m1", "ct", "kt", "vt", 119));
+ CollisionFreeMap.configure(conf, new Options("m2", "ct", "kt", "vt", 3));
+
+ ExportQueue.configure(conf, new ExportQueue.Options("eq1", "et", "kt", "vt", 7));
+ ExportQueue.configure(conf, new ExportQueue.Options("eq2", "et", "kt", "vt", 3));
+
+ Pirtos pirtos = CollisionFreeMap.getTableOptimizations(conf.getAppConfiguration());
+ pirtos.merge(ExportQueue.getTableOptimizations(conf.getAppConfiguration()));
+
+ Pattern pattern = Pattern.compile(pirtos.getTabletGroupingRegex());
+
+ Assert.assertEquals("m1:u:", group(pattern, "m1:u:f0c"));
+ Assert.assertEquals("m1:d:", group(pattern, "m1:d:f0c"));
+ Assert.assertEquals("m2:u:", group(pattern, "m2:u:abc"));
+ Assert.assertEquals("m2:d:", group(pattern, "m2:d:590"));
+ Assert.assertEquals("none", group(pattern, "m3:d:590"));
+
+ Assert.assertEquals("eq1:", group(pattern, "eq1:f0c"));
+ Assert.assertEquals("eq2:", group(pattern, "eq2:f0c"));
+ Assert.assertEquals("none", group(pattern, "eq3:f0c"));
+
+ // validate the assumptions this test is making
+ Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq1#")));
+ Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq2#")));
+ Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq1:~")));
+ Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq2:~")));
+ Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m1:u:~")));
+ Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m1:d:~")));
+ Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m2:u:~")));
+ Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m2:d:~")));
+
+ Set<String> expectedGroups =
+ ImmutableSet.of("m1:u:", "m1:d:", "m2:u:", "m2:d:", "eq1:", "eq2:");
+
+ // ensure all splits group as expected
+ for (Bytes split : pirtos.getSplits()) {
+ String g = group(pattern, split.toString());
+
+ if (expectedGroups.contains(g)) {
+ Assert.assertTrue(split.toString().startsWith(g));
+ } else {
+ Assert.assertEquals("none", g);
+ Assert.assertTrue(split.toString().equals("eq1#") || split.toString().equals("eq2#"));
+ }
+
+ }
+
+ }
+
+ private String group(Pattern pattern, String endRow) {
+ Matcher m = pattern.matcher(endRow);
+ if (m.matches() && m.groupCount() == 1) {
+ return m.group(1);
+ }
+ return "none";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TransientRegistryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TransientRegistryTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TransientRegistryTest.java
new file mode 100644
index 0000000..a1be5c9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TransientRegistryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.common;
+
+import java.util.HashSet;
+
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TransientRegistryTest {
+ @Test
+ public void testBasic() {
+ FluoConfiguration fluoConfig = new FluoConfiguration();
+
+ HashSet<RowRange> expected = new HashSet<>();
+
+ TransientRegistry tr = new TransientRegistry(fluoConfig.getAppConfiguration());
+
+ RowRange rr1 = new RowRange(Bytes.of("pr1:g"), Bytes.of("pr1:q"));
+ tr.addTransientRange("foo", rr1);
+ expected.add(rr1);
+
+ tr = new TransientRegistry(fluoConfig.getAppConfiguration());
+ Assert.assertEquals(expected, new HashSet<>(tr.getTransientRanges()));
+
+ RowRange rr2 = new RowRange(Bytes.of("pr2:j"), Bytes.of("pr2:m"));
+ tr.addTransientRange("bar", rr2);
+ expected.add(rr2);
+
+ tr = new TransientRegistry(fluoConfig.getAppConfiguration());
+ Assert.assertEquals(expected, new HashSet<>(tr.getTransientRanges()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java
new file mode 100644
index 0000000..9250d73
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.data;
+
+import org.apache.fluo.api.data.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RowHasherTest {
+
+ @Test
+ public void testBadPrefixes() {
+ String[] badPrefixes =
+ {"q:she6:test1", "q:she6:test1", "p:Mhe6:test1", "p;she6:test1", "p:she6;test1",
+ "p;she6;test1", "p:+he6:test1", "p:s?e6:test1", "p:sh{6:test1", "p:sh6:"};
+
+ RowHasher rh = new RowHasher("p");
+ for (String badPrefix : badPrefixes) {
+ try {
+ rh.removeHash(Bytes.of(badPrefix));
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ }
+ }
+ }
+
+ @Test
+ public void testBasic() {
+ RowHasher rh = new RowHasher("p");
+ Assert.assertTrue(rh.removeHash(rh.addHash("abc")).toString().equals("abc"));
+ rh = new RowHasher("p2");
+ Assert.assertTrue(rh.removeHash(rh.addHash("abc")).toString().equals("abc"));
+
+ Assert.assertTrue(rh.addHash("abc").toString().startsWith("p2:"));
+
+ // test to ensure hash is stable over time
+ Assert.assertEquals("p2:she6:test1", rh.addHash("test1").toString());
+ Assert.assertEquals("p2:hgt0:0123456789abcdefghijklmnopqrstuvwxyz",
+ rh.addHash("0123456789abcdefghijklmnopqrstuvwxyz").toString());
+ Assert.assertEquals("p2:fluo:86ce3b094982c6a", rh.addHash("86ce3b094982c6a").toString());
+ }
+
+ @Test
+ public void testBalancerRegex() {
+ RowHasher rh = new RowHasher("p");
+ String regex = rh.getTableOptimizations(3).getTabletGroupingRegex();
+ Assert.assertEquals("(\\Qp:\\E).*", regex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java
new file mode 100644
index 0000000..5e7f224
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentLoader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.fluo.recipes.core.types.TypedLoader;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentLoader extends TypedLoader {
+
+ String docid;
+ String refs[];
+
+ DocumentLoader(String docid, String... refs) {
+ this.docid = docid;
+ this.refs = refs;
+ }
+
+ @Override
+ public void load(TypedTransactionBase tx, Context context) throws Exception {
+ tx.mutate().row("d:" + docid).fam("content").qual("new").set(StringUtils.join(refs, " "));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java
new file mode 100644
index 0000000..c4c11d8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/DocumentObserver.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.export.ExportTestBase.RefExporter;
+import org.apache.fluo.recipes.core.types.TypedObserver;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentObserver extends TypedObserver {
+
+ ExportQueue<String, RefUpdates> refExportQueue;
+
+ @Override
+ public void init(Context context) throws Exception {
+ refExportQueue = ExportQueue.getInstance(RefExporter.QUEUE_ID, context.getAppConfiguration());
+ }
+
+ @Override
+ public ObservedColumn getObservedColumn() {
+ return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG);
+ }
+
+ @Override
+ public void process(TypedTransactionBase tx, Bytes row, Column col) {
+ String newContent = tx.get().row(row).col(col).toString();
+ Set<String> newRefs = new HashSet<>(Arrays.asList(newContent.split(" ")));
+ Set<String> currentRefs =
+ new HashSet<>(Arrays.asList(tx.get().row(row).fam("content").qual("current").toString("")
+ .split(" ")));
+
+ Set<String> addedRefs = new HashSet<>(newRefs);
+ addedRefs.removeAll(currentRefs);
+
+ Set<String> deletedRefs = new HashSet<>(currentRefs);
+ deletedRefs.removeAll(newRefs);
+
+ String key = row.toString().substring(2);
+ RefUpdates val = new RefUpdates(addedRefs, deletedRefs);
+
+ refExportQueue.add(tx, key, val);
+
+ tx.mutate().row(row).fam("content").qual("current").set(newContent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java
new file mode 100644
index 0000000..d54982b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportBufferIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExportBufferIT extends ExportTestBase {
+
+ @Override
+ protected int getNumBuckets() {
+ return 2;
+ }
+
+ @Override
+ protected Integer getBufferSize() {
+ return 1024;
+ }
+
+ @Test
+ public void testSmallExportBuffer() {
+ // try setting the export buffer size small. Make sure everything is exported.
+
+ try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+ ExportQueue<String, RefUpdates> refExportQueue =
+ ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
+ try (Transaction tx = fc.newTransaction()) {
+ for (int i = 0; i < 1000; i++) {
+ refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 10, i + 20), ns(new int[0])));
+ }
+
+ tx.commit();
+ }
+ }
+
+ miniFluo.waitForObservers();
+
+ Map<String, Set<String>> erefs = getExportedReferees();
+ Map<String, Set<String>> expected = new HashMap<>();
+
+ for (int i = 0; i < 1000; i++) {
+ expected.computeIfAbsent(nk(i + 10), s -> new HashSet<>()).add(nk(i));
+ expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
+ }
+
+ assertEquals(expected, erefs);
+ int prevNumExportCalls = getNumExportCalls();
+ Assert.assertTrue(prevNumExportCalls > 10); // with small buffer there should be lots of exports
+ // calls
+
+ try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+ ExportQueue<String, RefUpdates> refExportQueue =
+ ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
+ try (Transaction tx = fc.newTransaction()) {
+ for (int i = 0; i < 1000; i++) {
+ refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 12), ns(i + 10)));
+ }
+
+ tx.commit();
+ }
+ }
+
+ miniFluo.waitForObservers();
+
+ erefs = getExportedReferees();
+ expected = new HashMap<>();
+
+ for (int i = 0; i < 1000; i++) {
+ expected.computeIfAbsent(nk(i + 12), s -> new HashSet<>()).add(nk(i));
+ expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
+ }
+
+ assertEquals(expected, erefs);
+ prevNumExportCalls = getNumExportCalls() - prevNumExportCalls;
+ Assert.assertTrue(prevNumExportCalls > 10);
+ }
+
+ public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual) {
+ if (!expected.equals(actual)) {
+ System.out.println("*** diff ***");
+ diff(expected, actual);
+ Assert.fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java
new file mode 100644
index 0000000..b4e167c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportQueueIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExportQueueIT extends ExportTestBase {
+
+ @Test
+ public void testExport() {
+ try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ loader.execute(new DocumentLoader("0999", "0005", "0002"));
+ loader.execute(new DocumentLoader("0002", "0999", "0042"));
+ loader.execute(new DocumentLoader("0005", "0999", "0042"));
+ loader.execute(new DocumentLoader("0042", "0999"));
+ }
+
+ miniFluo.waitForObservers();
+
+ Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
+ Assert.assertEquals(ns("0999"), getExportedReferees("0002"));
+ Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
+ Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ loader.execute(new DocumentLoader("0999", "0005", "0042"));
+ }
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ loader.execute(new DocumentLoader("0999", "0005"));
+ }
+
+ miniFluo.waitForObservers();
+
+ Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
+ Assert.assertEquals(ns(new String[0]), getExportedReferees("0002"));
+ Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
+ Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ loader.execute(new DocumentLoader("0042", "0999", "0002", "0005"));
+ loader.execute(new DocumentLoader("0005", "0002"));
+ }
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ loader.execute(new DocumentLoader("0005", "0003"));
+ }
+
+ miniFluo.waitForObservers();
+
+ Assert.assertEquals(ns("0002", "0042"), getExportedReferees("0999"));
+ Assert.assertEquals(ns("0042"), getExportedReferees("0002"));
+ Assert.assertEquals(ns("0005"), getExportedReferees("0003"));
+ Assert.assertEquals(ns("0999", "0042"), getExportedReferees("0005"));
+ Assert.assertEquals(ns("0002"), getExportedReferees("0042"));
+
+ }
+ }
+
+ @Test
+ public void exportStressTest() {
+ FluoConfiguration config = new FluoConfiguration(miniFluo.getClientConfiguration());
+ config.setLoaderQueueSize(100);
+ config.setLoaderThreads(20);
+
+ try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+
+ loadRandom(fc, 1000, 500);
+
+ miniFluo.waitForObservers();
+
+ diff(getFluoReferees(fc), getExportedReferees());
+
+ assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+
+ loadRandom(fc, 1000, 500);
+
+ miniFluo.waitForObservers();
+
+ assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+
+ loadRandom(fc, 1000, 10000);
+
+ miniFluo.waitForObservers();
+
+ assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+
+ loadRandom(fc, 1000, 10000);
+
+ miniFluo.waitForObservers();
+
+ assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
new file mode 100644
index 0000000..c1cf3ce
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+public class ExportTestBase {
+
+ private static Map<String, Map<String, RefInfo>> globalExports = new HashMap<>();
+ private static int exportCalls = 0;
+
+ protected static Set<String> getExportedReferees(String node) {
+ synchronized (globalExports) {
+ Set<String> ret = new HashSet<>();
+
+ Map<String, RefInfo> referees = globalExports.get(node);
+
+ if (referees == null) {
+ return ret;
+ }
+
+ referees.forEach((k, v) -> {
+ if (!v.deleted)
+ ret.add(k);
+ });
+
+ return ret;
+ }
+ }
+
+ protected static Map<String, Set<String>> getExportedReferees() {
+ synchronized (globalExports) {
+
+ Map<String, Set<String>> ret = new HashMap<>();
+
+ for (String k : globalExports.keySet()) {
+ Set<String> referees = getExportedReferees(k);
+ if (referees.size() > 0) {
+ ret.put(k, referees);
+ }
+ }
+
+ return ret;
+ }
+ }
+
+ protected static int getNumExportCalls() {
+ synchronized (globalExports) {
+ return exportCalls;
+ }
+ }
+
+ public static class RefExporter extends Exporter<String, RefUpdates> {
+
+ public static final String QUEUE_ID = "req";
+
+ private void updateExports(String key, long seq, String addedRef, boolean deleted) {
+ Map<String, RefInfo> referees = globalExports.computeIfAbsent(addedRef, k -> new HashMap<>());
+ referees.compute(key, (k, v) -> (v == null || v.seq < seq) ? new RefInfo(seq, deleted) : v);
+ }
+
+ @Override
+ protected void processExports(Iterator<SequencedExport<String, RefUpdates>> exportIterator) {
+ ArrayList<SequencedExport<String, RefUpdates>> exportList = new ArrayList<>();
+ Iterators.addAll(exportList, exportIterator);
+
+ synchronized (globalExports) {
+ exportCalls++;
+
+ for (SequencedExport<String, RefUpdates> se : exportList) {
+ for (String addedRef : se.getValue().getAddedRefs()) {
+ updateExports(se.getKey(), se.getSequence(), addedRef, false);
+ }
+
+ for (String deletedRef : se.getValue().getDeletedRefs()) {
+ updateExports(se.getKey(), se.getSequence(), deletedRef, true);
+ }
+ }
+ }
+ }
+ }
+
+ protected MiniFluo miniFluo;
+
+ protected int getNumBuckets() {
+ return 13;
+ }
+
+ protected Integer getBufferSize() {
+ return null;
+ }
+
+ @Before
+ public void setUpFluo() throws Exception {
+ FileUtils.deleteQuietly(new File("target/mini"));
+
+ FluoConfiguration props = new FluoConfiguration();
+ props.setApplicationName("eqt");
+ props.setWorkerThreads(20);
+ props.setMiniDataDir("target/mini");
+
+ ObserverConfiguration doc = new ObserverConfiguration(DocumentObserver.class.getName());
+ props.addObserver(doc);
+
+ SimpleSerializer.setSetserlializer(props, GsonSerializer.class);
+
+ ExportQueue.Options exportQueueOpts =
+ new ExportQueue.Options(RefExporter.QUEUE_ID, RefExporter.class, String.class,
+ RefUpdates.class, getNumBuckets());
+
+ if (getBufferSize() != null) {
+ exportQueueOpts.setBufferSize(getBufferSize());
+ }
+
+ ExportQueue.configure(props, exportQueueOpts);
+
+ miniFluo = FluoFactory.newMiniFluo(props);
+
+ globalExports.clear();
+ exportCalls = 0;
+ }
+
+ @After
+ public void tearDownFluo() throws Exception {
+ if (miniFluo != null) {
+ miniFluo.close();
+ }
+ }
+
+ protected static Set<String> ns(String... sa) {
+ return new HashSet<>(Arrays.asList(sa));
+ }
+
+ protected static String nk(int i) {
+ return String.format("%06d", i);
+ }
+
+ protected static Set<String> ns(int... ia) {
+ HashSet<String> ret = new HashSet<>();
+ for (int i : ia) {
+ ret.add(nk(i));
+ }
+ return ret;
+ }
+
+ public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual,
+ FluoClient fc) {
+ if (!expected.equals(actual)) {
+ System.out.println("*** diff ***");
+ diff(expected, actual);
+ System.out.println("*** fluo dump ***");
+ dump(fc);
+ System.out.println("*** map dump ***");
+
+ Assert.fail();
+ }
+ }
+
+ protected void loadRandom(FluoClient fc, int num, int maxDocId) {
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ Random rand = new Random();
+
+ for (int i = 0; i < num; i++) {
+ String docid = String.format("%05d", rand.nextInt(maxDocId));
+ String[] refs = new String[rand.nextInt(20) + 1];
+ for (int j = 0; j < refs.length; j++) {
+ refs[j] = String.format("%05d", rand.nextInt(maxDocId));
+ }
+
+ loader.execute(new DocumentLoader(docid, refs));
+ }
+ }
+ }
+
+ protected void diff(Map<String, Set<String>> fr, Map<String, Set<String>> er) {
+ HashSet<String> allKeys = new HashSet<>(fr.keySet());
+ allKeys.addAll(er.keySet());
+
+ for (String k : allKeys) {
+ Set<String> s1 = fr.getOrDefault(k, Collections.emptySet());
+ Set<String> s2 = er.getOrDefault(k, Collections.emptySet());
+
+ HashSet<String> sub1 = new HashSet<>(s1);
+ sub1.removeAll(s2);
+
+ HashSet<String> sub2 = new HashSet<>(s2);
+ sub2.removeAll(s1);
+
+ if (sub1.size() > 0 || sub2.size() > 0) {
+ System.out.println(k + " " + sub1 + " " + sub2);
+ }
+
+ }
+ }
+
+ protected Map<String, Set<String>> getFluoReferees(FluoClient fc) {
+ Map<String, Set<String>> fluoReferees = new HashMap<>();
+
+ try (Snapshot snap = fc.newSnapshot()) {
+ ScannerConfiguration scannerConfig = new ScannerConfiguration();
+ scannerConfig.fetchColumn(Bytes.of("content"), Bytes.of("current"));
+ scannerConfig.setSpan(Span.prefix("d:"));
+ RowIterator scanner = snap.get(scannerConfig);
+ while (scanner.hasNext()) {
+ Entry<Bytes, ColumnIterator> row = scanner.next();
+ ColumnIterator colIter = row.getValue();
+
+ String docid = row.getKey().toString().substring(2);
+
+ while (colIter.hasNext()) {
+ Entry<Column, Bytes> entry = colIter.next();
+
+ String[] refs = entry.getValue().toString().split(" ");
+
+ for (String ref : refs) {
+ if (ref.isEmpty())
+ continue;
+
+ fluoReferees.computeIfAbsent(ref, k -> new HashSet<>()).add(docid);
+ }
+ }
+ }
+ }
+ return fluoReferees;
+ }
+
+ public static void dump(FluoClient fc) {
+ try (Snapshot snap = fc.newSnapshot()) {
+ RowIterator scanner = snap.get(new ScannerConfiguration());
+ while (scanner.hasNext()) {
+ Entry<Bytes, ColumnIterator> row = scanner.next();
+ ColumnIterator colIter = row.getValue();
+
+ while (colIter.hasNext()) {
+ Entry<Column, Bytes> entry = colIter.next();
+
+ System.out.println("row:[" + row.getKey() + "] col:[" + entry.getKey() + "] val:["
+ + entry.getValue() + "]");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java
new file mode 100644
index 0000000..2d45ff3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/GsonSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.nio.charset.StandardCharsets;
+
+import com.google.gson.Gson;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+public class GsonSerializer implements SimpleSerializer {
+
+ private Gson gson = new Gson();
+
+ @Override
+ public void init(SimpleConfiguration appConfig) {
+
+ }
+
+ @Override
+ public <T> byte[] serialize(T obj) {
+ return gson.toJson(obj).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public <T> T deserialize(byte[] serObj, Class<T> clazz) {
+ return gson.fromJson(new String(serObj, StandardCharsets.UTF_8), clazz);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
new file mode 100644
index 0000000..b07caea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.recipes.core.export.ExportQueue.Options;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OptionsTest {
+ @Test
+ public void testExportQueueOptions() {
+ FluoConfiguration conf = new FluoConfiguration();
+
+ ExportQueue.configure(conf, new Options("Q1", "ET", "KT", "VT", 100));
+ ExportQueue.configure(conf, new Options("Q2", "ET2", "KT2", "VT2", 200).setBucketsPerTablet(20)
+ .setBufferSize(1000000));
+
+ Options opts1 = new Options("Q1", conf.getAppConfiguration());
+
+ Assert.assertEquals(opts1.exporterType, "ET");
+ Assert.assertEquals(opts1.keyType, "KT");
+ Assert.assertEquals(opts1.valueType, "VT");
+ Assert.assertEquals(opts1.numBuckets, 100);
+ Assert.assertEquals(opts1.bucketsPerTablet.intValue(), Options.DEFAULT_BUCKETS_PER_TABLET);
+ Assert.assertEquals(opts1.bufferSize.intValue(), Options.DEFAULT_BUFFER_SIZE);
+
+ Options opts2 = new Options("Q2", conf.getAppConfiguration());
+
+ Assert.assertEquals(opts2.exporterType, "ET2");
+ Assert.assertEquals(opts2.keyType, "KT2");
+ Assert.assertEquals(opts2.valueType, "VT2");
+ Assert.assertEquals(opts2.numBuckets, 200);
+ Assert.assertEquals(opts2.bucketsPerTablet.intValue(), 20);
+ Assert.assertEquals(opts2.bufferSize.intValue(), 1000000);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java
new file mode 100644
index 0000000..f4d1c76
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefInfo.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+class RefInfo {
+ long seq;
+ boolean deleted;
+
+ public RefInfo(long seq, boolean deleted) {
+ this.seq = seq;
+ this.deleted = deleted;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java
new file mode 100644
index 0000000..efa94dd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/RefUpdates.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.export;
+
+import java.util.Set;
+
+public class RefUpdates {
+ private Set<String> addedRefs;
+ private Set<String> deletedRefs;
+
+ public RefUpdates() {}
+
+ public RefUpdates(Set<String> addedRefs, Set<String> deletedRefs) {
+ this.addedRefs = addedRefs;
+ this.deletedRefs = deletedRefs;
+ }
+
+ public Set<String> getAddedRefs() {
+ return addedRefs;
+ }
+
+ public Set<String> getDeletedRefs() {
+ return deletedRefs;
+ }
+
+ @Override
+ public String toString() {
+ return "added:" + addedRefs + " deleted:" + deletedRefs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
new file mode 100644
index 0000000..e5f7d55
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+import org.apache.fluo.recipes.core.types.StringEncoder;
+import org.apache.fluo.recipes.core.types.TypeLayer;
+import org.apache.fluo.recipes.core.types.TypedSnapshot;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test configures a small buffer size and verifies that multiple passes are made to process
+ * updates.
+ */
+public class BigUpdateIT {
+ private static final TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ private MiniFluo miniFluo;
+
+ private CollisionFreeMap<String, Long> wcMap;
+
+ static final String MAP_ID = "bu";
+
+ public static class LongCombiner implements Combiner<String, Long> {
+
+ @Override
+ public Optional<Long> combine(String key, Iterator<Long> updates) {
+ long[] count = new long[] {0};
+ updates.forEachRemaining(l -> count[0] += l);
+ return Optional.of(count[0]);
+ }
+ }
+
+ static final Column DSCOL = new Column("debug", "sum");
+
+ private static AtomicInteger globalUpdates = new AtomicInteger(0);
+
+ public static class MyObserver extends UpdateObserver<String, Long> {
+
+ @Override
+ public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
+ TypedTransactionBase ttx = tl.wrap(tx);
+
+ Map<String, Long> expectedOld = new HashMap<>();
+
+
+ while (updates.hasNext()) {
+ Update<String, Long> update = updates.next();
+
+ if (update.getOldValue().isPresent()) {
+ expectedOld.put("side:" + update.getKey(), update.getOldValue().get());
+ }
+
+ ttx.mutate().row("side:" + update.getKey()).col(DSCOL).set(update.getNewValue().get());
+ }
+
+ // get last values set to verify same as passed in old value
+ Map<String, Long> actualOld =
+ Maps.transformValues(
+ ttx.get().rowsString(expectedOld.keySet()).columns(ImmutableSet.of(DSCOL))
+ .toStringMap(), m -> m.get(DSCOL).toLong());
+
+ MapDifference<String, Long> diff = Maps.difference(expectedOld, actualOld);
+
+ Assert.assertTrue(diff.toString(), diff.areEqual());
+
+ globalUpdates.incrementAndGet();
+ }
+ }
+
+ @Before
+ public void setUpFluo() throws Exception {
+ FileUtils.deleteQuietly(new File("target/mini"));
+
+ FluoConfiguration props = new FluoConfiguration();
+ props.setApplicationName("eqt");
+ props.setWorkerThreads(20);
+ props.setMiniDataDir("target/mini");
+
+ SimpleSerializer.setSetserlializer(props, TestSerializer.class);
+
+ CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, LongCombiner.class,
+ MyObserver.class, String.class, Long.class, 2).setBufferSize(1 << 10));
+
+ miniFluo = FluoFactory.newMiniFluo(props);
+
+ wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration());
+
+ globalUpdates.set(0);
+ }
+
+ @After
+ public void tearDownFluo() throws Exception {
+ if (miniFluo != null) {
+ miniFluo.close();
+ }
+ }
+
+ @Test
+ public void testBigUpdates() {
+ try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+ updateMany(fc);
+
+ miniFluo.waitForObservers();
+
+ int numUpdates = 0;
+
+ try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
+ checkUpdates(snap, 1, 1000);
+ numUpdates = globalUpdates.get();
+ // there are two buckets, expect update processing at least twice per bucket
+ Assert.assertTrue(numUpdates >= 4);
+ }
+
+ updateMany(fc);
+ updateMany(fc);
+
+ miniFluo.waitForObservers();
+
+ try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
+ checkUpdates(snap, 3, 1000);
+ numUpdates = globalUpdates.get() - numUpdates;
+ Assert.assertTrue(numUpdates >= 4);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ updateMany(fc);
+ }
+
+ miniFluo.waitForObservers();
+
+ try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
+ checkUpdates(snap, 13, 1000);
+ numUpdates = globalUpdates.get() - numUpdates;
+ Assert.assertTrue(numUpdates >= 4);
+ }
+ }
+ }
+
+ private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) {
+ RowIterator iter = snap.get(new ScannerConfiguration().setSpan(Span.prefix("side:")));
+
+ int row = 0;
+
+ while (iter.hasNext()) {
+ Entry<Bytes, ColumnIterator> entry = iter.next();
+
+ Assert.assertEquals(String.format("side:%06d", row++), entry.getKey().toString());
+
+ ColumnIterator colITer = entry.getValue();
+ while (colITer.hasNext()) {
+ Entry<Column, Bytes> entry2 = colITer.next();
+ Assert.assertEquals(new Column("debug", "sum"), entry2.getKey());
+ Assert.assertEquals("row : " + entry.getKey(), "" + expectedVal, entry2.getValue()
+ .toString());
+ }
+ }
+
+ Assert.assertEquals(expectedRows, row);
+ }
+
+ private void updateMany(FluoClient fc) {
+ try (Transaction tx = fc.newTransaction()) {
+ Map<String, Long> updates = new HashMap<>();
+ for (int i = 0; i < 1000; i++) {
+ updates.put(String.format("%06d", i), 1L);
+ }
+
+ wcMap.update(tx, updates);
+ tx.commit();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
new file mode 100644
index 0000000..f7dbc89
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CollisionFreeMapIT {
+
+ private MiniFluo miniFluo;
+
+ private CollisionFreeMap<String, Long> wcMap;
+
+ static final String MAP_ID = "wcm";
+
+ @Before
+ public void setUpFluo() throws Exception {
+ FileUtils.deleteQuietly(new File("target/mini"));
+
+ FluoConfiguration props = new FluoConfiguration();
+ props.setApplicationName("eqt");
+ props.setWorkerThreads(20);
+ props.setMiniDataDir("target/mini");
+
+ props.addObserver(new ObserverConfiguration(DocumentObserver.class.getName()));
+
+ SimpleSerializer.setSetserlializer(props, TestSerializer.class);
+
+ CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, WordCountCombiner.class,
+ WordCountObserver.class, String.class, Long.class, 17));
+
+ miniFluo = FluoFactory.newMiniFluo(props);
+
+ wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration());
+ }
+
+ @After
+ public void tearDownFluo() throws Exception {
+ if (miniFluo != null) {
+ miniFluo.close();
+ }
+ }
+
+ private Map<String, Long> getComputedWordCounts(FluoClient fc) {
+ Map<String, Long> counts = new HashMap<>();
+
+ try (Snapshot snap = fc.newSnapshot()) {
+ RowIterator scanner = snap.get(new ScannerConfiguration().setSpan(Span.prefix("iwc:")));
+ while (scanner.hasNext()) {
+ Entry<Bytes, ColumnIterator> row = scanner.next();
+
+ String[] tokens = row.getKey().toString().split(":");
+ String word = tokens[2];
+ Long count = Long.valueOf(tokens[1]);
+
+ Assert.assertFalse("Word seen twice in index " + word, counts.containsKey(word));
+
+ counts.put(word, count);
+ }
+ }
+
+ return counts;
+ }
+
+ private Map<String, Long> computeWordCounts(FluoClient fc) {
+ Map<String, Long> counts = new HashMap<>();
+
+ try (Snapshot snap = fc.newSnapshot()) {
+ RowIterator scanner =
+ snap.get(new ScannerConfiguration().setSpan(Span.prefix("d:")).fetchColumn(
+ Bytes.of("content"), Bytes.of("current")));
+ while (scanner.hasNext()) {
+ Entry<Bytes, ColumnIterator> row = scanner.next();
+
+ ColumnIterator colIter = row.getValue();
+
+ while (colIter.hasNext()) {
+ Entry<Column, Bytes> entry = colIter.next();
+
+ String[] words = entry.getValue().toString().split("\\s+");
+ for (String word : words) {
+ if (word.isEmpty()) {
+ continue;
+ }
+
+ counts.merge(word, 1L, Long::sum);
+ }
+ }
+ }
+ }
+
+ return counts;
+ }
+
+ @Test
+ public void testGet() {
+ try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+ try (Transaction tx = fc.newTransaction()) {
+ wcMap.update(tx, ImmutableMap.of("cat", 2L, "dog", 5L));
+ tx.commit();
+ }
+
+ try (Transaction tx = fc.newTransaction()) {
+ wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L));
+ tx.commit();
+ }
+
+ try (Transaction tx = fc.newTransaction()) {
+ wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L, "fish", 2L));
+ tx.commit();
+ }
+
+ // try reading possibly before observer combines... will either see outstanding updates or a
+ // current value
+ try (Snapshot snap = fc.newSnapshot()) {
+ Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
+ Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog"));
+ Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+ }
+
+ miniFluo.waitForObservers();
+
+ // in this case there should be no updates, only a current value
+ try (Snapshot snap = fc.newSnapshot()) {
+ Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
+ Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog"));
+ Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+ }
+
+ Map<String, Long> expectedCounts = new HashMap<>();
+ expectedCounts.put("cat", 4L);
+ expectedCounts.put("dog", 7L);
+ expectedCounts.put("fish", 2L);
+
+ Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+ try (Transaction tx = fc.newTransaction()) {
+ wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", -7L));
+ tx.commit();
+ }
+
+ // there may be outstanding update and a current value for the key in this case
+ try (Snapshot snap = fc.newSnapshot()) {
+ Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat"));
+ Assert.assertNull(wcMap.get(snap, "dog"));
+ Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+ }
+
+ miniFluo.waitForObservers();
+
+ try (Snapshot snap = fc.newSnapshot()) {
+ Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat"));
+ Assert.assertNull(wcMap.get(snap, "dog"));
+ Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish"));
+ }
+
+ expectedCounts.put("cat", 5L);
+ expectedCounts.remove("dog");
+
+ Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+ }
+ }
+
+ @Test
+ public void testBasic() {
+ try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ loader.execute(new DocumentLoader("0001", "dog cat"));
+ loader.execute(new DocumentLoader("0002", "cat hamster"));
+ loader.execute(new DocumentLoader("0003", "milk bread cat food"));
+ loader.execute(new DocumentLoader("0004", "zoo big cat"));
+ }
+
+ miniFluo.waitForObservers();
+
+ try (Snapshot snap = fc.newSnapshot()) {
+ Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat"));
+ Assert.assertEquals((Long) 1L, wcMap.get(snap, "milk"));
+ }
+
+ Map<String, Long> expectedCounts = new HashMap<>();
+ expectedCounts.put("dog", 1L);
+ expectedCounts.put("cat", 4L);
+ expectedCounts.put("hamster", 1L);
+ expectedCounts.put("milk", 1L);
+ expectedCounts.put("bread", 1L);
+ expectedCounts.put("food", 1L);
+ expectedCounts.put("zoo", 1L);
+ expectedCounts.put("big", 1L);
+
+ Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ loader.execute(new DocumentLoader("0001", "dog feline"));
+ }
+
+ miniFluo.waitForObservers();
+
+ expectedCounts.put("cat", 3L);
+ expectedCounts.put("feline", 1L);
+
+ Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ // swap contents of two documents... should not change doc counts
+ loader.execute(new DocumentLoader("0003", "zoo big cat"));
+ loader.execute(new DocumentLoader("0004", "milk bread cat food"));
+ }
+
+ miniFluo.waitForObservers();
+ Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ loader.execute(new DocumentLoader("0003", "zoo big cat"));
+ loader.execute(new DocumentLoader("0004", "zoo big cat"));
+ }
+
+ miniFluo.waitForObservers();
+
+ expectedCounts.put("zoo", 2L);
+ expectedCounts.put("big", 2L);
+ expectedCounts.remove("milk");
+ expectedCounts.remove("bread");
+ expectedCounts.remove("food");
+
+ Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ loader.execute(new DocumentLoader("0002", "cat cat hamster hamster"));
+ }
+
+ miniFluo.waitForObservers();
+
+ expectedCounts.put("cat", 4L);
+ expectedCounts.put("hamster", 2L);
+
+ Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ loader.execute(new DocumentLoader("0002", "dog hamster"));
+ }
+
+ miniFluo.waitForObservers();
+
+ expectedCounts.put("cat", 2L);
+ expectedCounts.put("hamster", 1L);
+ expectedCounts.put("dog", 2L);
+
+ Assert.assertEquals(expectedCounts, getComputedWordCounts(fc));
+ }
+ }
+
+ private static String randDocId(Random rand) {
+ return String.format("%04d", rand.nextInt(5000));
+ }
+
+ private static String randomDocument(Random rand) {
+ StringBuilder sb = new StringBuilder();
+
+ String sep = "";
+ for (int i = 2; i < rand.nextInt(18); i++) {
+ sb.append(sep);
+ sep = " ";
+ sb.append(String.format("%05d", rand.nextInt(50000)));
+ }
+
+ return sb.toString();
+ }
+
+ public void diff(Map<String, Long> m1, Map<String, Long> m2) {
+ for (String word : m1.keySet()) {
+ Long v1 = m1.get(word);
+ Long v2 = m2.get(word);
+
+ if (v2 == null || !v1.equals(v2)) {
+ System.out.println(word + " " + v1 + " != " + v2);
+ }
+ }
+
+ for (String word : m2.keySet()) {
+ Long v1 = m1.get(word);
+ Long v2 = m2.get(word);
+
+ if (v1 == null) {
+ System.out.println(word + " null != " + v2);
+ }
+ }
+ }
+
+ @Test
+ public void testStress() throws Exception {
+ try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+ Random rand = new Random();
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ for (int i = 0; i < 1000; i++) {
+ loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand)));
+ }
+ }
+
+ miniFluo.waitForObservers();
+ assertWordCountsEqual(fc);
+
+ try (LoaderExecutor loader = fc.newLoaderExecutor()) {
+ for (int i = 0; i < 100; i++) {
+ loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand)));
+ }
+ }
+
+ miniFluo.waitForObservers();
+ assertWordCountsEqual(fc);
+ }
+ }
+
+ private void assertWordCountsEqual(FluoClient fc) {
+ Map<String, Long> expected = computeWordCounts(fc);
+ Map<String, Long> actual = getComputedWordCounts(fc);
+ if (!expected.equals(actual)) {
+ diff(expected, actual);
+ Assert.fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java
new file mode 100644
index 0000000..54f5ee1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import org.apache.fluo.recipes.core.types.TypedLoader;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentLoader extends TypedLoader {
+
+ String docid;
+ String doc;
+
+ DocumentLoader(String docid, String doc) {
+ this.docid = docid;
+ this.doc = doc;
+ }
+
+ @Override
+ public void load(TypedTransactionBase tx, Context context) throws Exception {
+ tx.mutate().row("d:" + docid).fam("content").qual("new").set(doc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java
new file mode 100644
index 0000000..2c79f45
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.types.TypedObserver;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+
+public class DocumentObserver extends TypedObserver {
+
+ CollisionFreeMap<String, Long> wcm;
+
+ @Override
+ public void init(Context context) throws Exception {
+ wcm = CollisionFreeMap.getInstance(CollisionFreeMapIT.MAP_ID, context.getAppConfiguration());
+ }
+
+ @Override
+ public ObservedColumn getObservedColumn() {
+ return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG);
+ }
+
+ static Map<String, Long> getWordCounts(String doc) {
+ Map<String, Long> wordCounts = new HashMap<>();
+ String[] words = doc.split(" ");
+ for (String word : words) {
+ if (word.isEmpty()) {
+ continue;
+ }
+ wordCounts.merge(word, 1L, Long::sum);
+ }
+
+ return wordCounts;
+ }
+
+ @Override
+ public void process(TypedTransactionBase tx, Bytes row, Column col) {
+ String newContent = tx.get().row(row).col(col).toString();
+ String currentContent = tx.get().row(row).fam("content").qual("current").toString("");
+
+ Map<String, Long> newWordCounts = getWordCounts(newContent);
+ Map<String, Long> currentWordCounts = getWordCounts(currentContent);
+
+ Map<String, Long> changes = calculateChanges(newWordCounts, currentWordCounts);
+
+ wcm.update(tx, changes);
+
+ tx.mutate().row(row).fam("content").qual("current").set(newContent);
+ }
+
+ private static Map<String, Long> calculateChanges(Map<String, Long> newCounts,
+ Map<String, Long> currCounts) {
+ Map<String, Long> changes = new HashMap<>();
+
+ // guava Maps class
+ MapDifference<String, Long> diffs = Maps.difference(currCounts, newCounts);
+
+ // compute the diffs for words that changed
+ changes.putAll(Maps.transformValues(diffs.entriesDiffering(), vDiff -> vDiff.rightValue()
+ - vDiff.leftValue()));
+
+ // add all new words
+ changes.putAll(diffs.entriesOnlyOnRight());
+
+ // subtract all words no longer present
+ changes.putAll(Maps.transformValues(diffs.entriesOnlyOnLeft(), l -> l * -1));
+
+ return changes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java
new file mode 100644
index 0000000..6587f6e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OptionsTest {
+ @Test
+ public void testExportQueueOptions() {
+ FluoConfiguration conf = new FluoConfiguration();
+
+ CollisionFreeMap.configure(conf, new Options("Q1", "CT", "KT", "VT", 100));
+ CollisionFreeMap.configure(conf, new Options("Q2", "CT2", "KT2", "VT2", 200)
+ .setBucketsPerTablet(20).setBufferSize(1000000));
+
+ Options opts1 = new Options("Q1", conf.getAppConfiguration());
+
+ Assert.assertEquals(opts1.combinerType, "CT");
+ Assert.assertEquals(opts1.keyType, "KT");
+ Assert.assertEquals(opts1.valueType, "VT");
+ Assert.assertEquals(opts1.numBuckets, 100);
+ Assert.assertEquals(opts1.bucketsPerTablet.intValue(), Options.DEFAULT_BUCKETS_PER_TABLET);
+ Assert.assertEquals(opts1.bufferSize.intValue(), Options.DEFAULT_BUFFER_SIZE);
+
+ Options opts2 = new Options("Q2", conf.getAppConfiguration());
+
+ Assert.assertEquals(opts2.combinerType, "CT2");
+ Assert.assertEquals(opts2.keyType, "KT2");
+ Assert.assertEquals(opts2.valueType, "VT2");
+ Assert.assertEquals(opts2.numBuckets, 200);
+ Assert.assertEquals(opts2.bucketsPerTablet.intValue(), 20);
+ Assert.assertEquals(opts2.bufferSize.intValue(), 1000000);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
new file mode 100644
index 0000000..a359598
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.recipes.core.common.Pirtos;
+import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SplitsTest {
+ private static List<Bytes> sort(List<Bytes> in) {
+ ArrayList<Bytes> out = new ArrayList<>(in);
+ Collections.sort(out);
+ return out;
+ }
+
+ @Test
+ public void testSplits() {
+
+ Options opts = new Options("foo", WordCountCombiner.class, String.class, Long.class, 3);
+ opts.setBucketsPerTablet(1);
+ FluoConfiguration fluoConfig = new FluoConfiguration();
+ CollisionFreeMap.configure(fluoConfig, opts);
+
+ Pirtos pirtos1 =
+ CollisionFreeMap.getTableOptimizations("foo", fluoConfig.getAppConfiguration());
+ List<Bytes> expected1 =
+ Lists.transform(
+ Arrays.asList("foo:d:1", "foo:d:2", "foo:d:~", "foo:u:1", "foo:u:2", "foo:u:~"),
+ Bytes::of);
+
+ Assert.assertEquals(expected1, sort(pirtos1.getSplits()));
+
+ Options opts2 = new Options("bar", WordCountCombiner.class, String.class, Long.class, 6);
+ opts2.setBucketsPerTablet(2);
+ CollisionFreeMap.configure(fluoConfig, opts2);
+
+ Pirtos pirtos2 =
+ CollisionFreeMap.getTableOptimizations("bar", fluoConfig.getAppConfiguration());
+ List<Bytes> expected2 =
+ Lists.transform(
+ Arrays.asList("bar:d:2", "bar:d:4", "bar:d:~", "bar:u:2", "bar:u:4", "bar:u:~"),
+ Bytes::of);
+ Assert.assertEquals(expected2, sort(pirtos2.getSplits()));
+
+ Pirtos pirtos3 = CollisionFreeMap.getTableOptimizations(fluoConfig.getAppConfiguration());
+
+ ArrayList<Bytes> expected3 = new ArrayList<>(expected2);
+ expected3.addAll(expected1);
+
+ Assert.assertEquals(expected3, sort(pirtos3.getSplits()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java
new file mode 100644
index 0000000..b59705a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+public class TestSerializer implements SimpleSerializer {
+
+ @Override
+ public <T> byte[] serialize(T obj) {
+ return obj.toString().getBytes();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T deserialize(byte[] serObj, Class<T> clazz) {
+ if (clazz.equals(Long.class)) {
+ return (T) Long.valueOf(new String(serObj));
+ }
+
+ if (clazz.equals(String.class)) {
+ return (T) new String(serObj);
+ }
+
+ throw new IllegalArgumentException();
+ }
+
+ @Override
+ public void init(SimpleConfiguration appConfig) {}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java
new file mode 100644
index 0000000..f757c10
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+public class WordCountCombiner implements Combiner<String, Long> {
+ @Override
+ public Optional<Long> combine(String key, Iterator<Long> updates) {
+ long sum = 0;
+
+ while (updates.hasNext()) {
+ sum += updates.next();
+ }
+
+ if (sum == 0) {
+ return Optional.empty();
+ } else {
+ return Optional.of(sum);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java
new file mode 100644
index 0000000..221083c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+public class WordCountObserver extends UpdateObserver<String, Long> {
+
+ @Override
+ public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
+
+ while (updates.hasNext()) {
+ Update<String, Long> update = updates.next();
+
+ Optional<Long> oldVal = update.getOldValue();
+ Optional<Long> newVal = update.getNewValue();
+
+ if (oldVal.isPresent()) {
+ String oldRow = String.format("iwc:%09d:%s", oldVal.get(), update.getKey());
+ tx.delete(Bytes.of(oldRow), new Column(Bytes.EMPTY, Bytes.EMPTY));
+ }
+
+ if (newVal.isPresent()) {
+ String newRow = String.format("iwc:%09d:%s", newVal.get(), update.getKey());
+ tx.set(Bytes.of(newRow), new Column(Bytes.EMPTY, Bytes.EMPTY), Bytes.EMPTY);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
new file mode 100644
index 0000000..7c09b6e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.transaction;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.recipes.core.types.StringEncoder;
+import org.apache.fluo.recipes.core.types.TypeLayer;
+import org.apache.fluo.recipes.core.types.TypedTransaction;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class RecordingTransactionTest {
+
+ private Transaction tx;
+ private RecordingTransaction rtx;
+ private TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ @Before
+ public void setUp() {
+ tx = mock(Transaction.class);
+ rtx = RecordingTransaction.wrap(tx);
+ }
+
+ @Test
+ public void testTx() {
+ rtx.set(Bytes.of("r1"), new Column("cf1"), Bytes.of("v1"));
+ rtx.set(Bytes.of("r2"), new Column("cf2", "cq2"), Bytes.of("v2"));
+ rtx.delete(Bytes.of("r3"), new Column("cf3"));
+ expect(tx.get(Bytes.of("r4"), new Column("cf4"))).andReturn(Bytes.of("v4"));
+ replay(tx);
+ rtx.get(Bytes.of("r4"), new Column("cf4"));
+
+ List<LogEntry> entries = rtx.getTxLog().getLogEntries();
+ Assert.assertEquals(4, entries.size());
+ Assert.assertEquals("LogEntry{op=SET, row=r1, col=cf1 , value=v1}", entries.get(0).toString());
+ Assert.assertEquals("LogEntry{op=SET, row=r2, col=cf2 cq2 , value=v2}", entries.get(1)
+ .toString());
+ Assert
+ .assertEquals("LogEntry{op=DELETE, row=r3, col=cf3 , value=}", entries.get(2).toString());
+ Assert.assertEquals("LogEntry{op=GET, row=r4, col=cf4 , value=v4}", entries.get(3).toString());
+ Assert.assertEquals("{r4 cf4 =v4}", rtx.getTxLog().getOperationMap(LogEntry.Operation.GET)
+ .toString());
+ Assert.assertEquals("{r2 cf2 cq2 =v2, r1 cf1 =v1}",
+ rtx.getTxLog().getOperationMap(LogEntry.Operation.SET).toString());
+ Assert.assertEquals("{r3 cf3 =}", rtx.getTxLog().getOperationMap(LogEntry.Operation.DELETE)
+ .toString());
+ }
+
+ @Test
+ public void testTypedTx() {
+ TypedTransaction ttx = tl.wrap(rtx);
+ ttx.mutate().row("r5").fam("cf5").qual("cq5").set("1");
+ ttx.mutate().row("r6").fam("cf6").qual("cq6").set("1");
+ List<LogEntry> entries = rtx.getTxLog().getLogEntries();
+ Assert.assertEquals(2, entries.size());
+ Assert.assertEquals("LogEntry{op=SET, row=r5, col=cf5 cq5 , value=1}", entries.get(0)
+ .toString());
+ Assert.assertEquals("LogEntry{op=SET, row=r6, col=cf6 cq6 , value=1}", entries.get(1)
+ .toString());
+ }
+
+ @Test
+ public void testFilter() {
+ rtx = RecordingTransaction.wrap(tx, le -> le.getColumn().getFamily().toString().equals("cfa"));
+ TypedTransaction ttx = tl.wrap(rtx);
+ ttx.mutate().row("r1").fam("cfa").qual("cq1").set("1");
+ ttx.mutate().row("r2").fam("cfb").qual("cq2").set("2");
+ ttx.mutate().row("r3").fam("cfa").qual("cq3").set("3");
+ List<LogEntry> entries = rtx.getTxLog().getLogEntries();
+ Assert.assertEquals(2, entries.size());
+ Assert.assertEquals("LogEntry{op=SET, row=r1, col=cfa cq1 , value=1}", entries.get(0)
+ .toString());
+ Assert.assertEquals("LogEntry{op=SET, row=r3, col=cfa cq3 , value=3}", entries.get(1)
+ .toString());
+ }
+
+ @Test
+ public void testClose() {
+ tx.close();
+ replay(tx);
+ rtx.close();
+ verify(tx);
+ }
+
+ @Test
+ public void testCommit() {
+ tx.commit();
+ replay(tx);
+ rtx.commit();
+ verify(tx);
+ }
+
+ @Test
+ public void testDelete() {
+ tx.delete(Bytes.of("r"), Column.EMPTY);
+ replay(tx);
+ rtx.delete(Bytes.of("r"), Column.EMPTY);
+ verify(tx);
+ }
+
+ @Test
+ public void testGet() {
+ expect(tx.get(Bytes.of("r"), Column.EMPTY)).andReturn(Bytes.of("v"));
+ replay(tx);
+ Assert.assertEquals(Bytes.of("v"), rtx.get(Bytes.of("r"), Column.EMPTY));
+ verify(tx);
+ }
+
+ @Test
+ public void testGetColumns() {
+ expect(tx.get(Bytes.of("r"), Collections.emptySet())).andReturn(Collections.emptyMap());
+ replay(tx);
+ Assert.assertEquals(Collections.emptyMap(), rtx.get(Bytes.of("r"), Collections.emptySet()));
+ verify(tx);
+ }
+
+ @Test
+ public void testGetRows() {
+ expect(tx.get(Collections.emptyList(), Collections.emptySet())).andReturn(
+ Collections.emptyMap());
+ replay(tx);
+ Assert.assertEquals(Collections.emptyMap(),
+ rtx.get(Collections.emptyList(), Collections.emptySet()));
+ verify(tx);
+ }
+
+ @Test
+ public void testGetScanNull() {
+ ScannerConfiguration scanConfig = new ScannerConfiguration();
+ expect(tx.get(scanConfig)).andReturn(null);
+ replay(tx);
+ Assert.assertNull(rtx.get(scanConfig));
+ verify(tx);
+ }
+
+ @Test
+ public void testGetScanIter() {
+ ScannerConfiguration scanConfig = new ScannerConfiguration();
+ expect(tx.get(scanConfig)).andReturn(new RowIterator() {
+
+ private boolean hasNextRow = true;
+
+ @Override
+ public boolean hasNext() {
+ return hasNextRow;
+ }
+
+ @Override
+ public Map.Entry<Bytes, ColumnIterator> next() {
+ hasNextRow = false;
+ return new AbstractMap.SimpleEntry<>(Bytes.of("r7"), new ColumnIterator() {
+
+ private boolean hasNextCol = true;
+
+ @Override
+ public boolean hasNext() {
+ return hasNextCol;
+ }
+
+ @Override
+ public Map.Entry<Column, Bytes> next() {
+ hasNextCol = false;
+ return new AbstractMap.SimpleEntry<>(new Column("cf7", "cq7"), Bytes.of("v7"));
+ }
+ });
+ }
+ });
+ replay(tx);
+ RowIterator rowIter = rtx.get(scanConfig);
+ Assert.assertNotNull(rowIter);
+ Assert.assertTrue(rtx.getTxLog().isEmpty());
+ Assert.assertTrue(rowIter.hasNext());
+ Map.Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
+ Assert.assertFalse(rowIter.hasNext());
+ Assert.assertEquals(Bytes.of("r7"), rowEntry.getKey());
+ ColumnIterator colIter = rowEntry.getValue();
+ Assert.assertTrue(colIter.hasNext());
+ Assert.assertTrue(rtx.getTxLog().isEmpty());
+ Map.Entry<Column, Bytes> colEntry = colIter.next();
+ Assert.assertFalse(rtx.getTxLog().isEmpty());
+ Assert.assertFalse(colIter.hasNext());
+ Assert.assertEquals(new Column("cf7", "cq7"), colEntry.getKey());
+ Assert.assertEquals(Bytes.of("v7"), colEntry.getValue());
+ List<LogEntry> entries = rtx.getTxLog().getLogEntries();
+ Assert.assertEquals(1, entries.size());
+ Assert.assertEquals("LogEntry{op=GET, row=r7, col=cf7 cq7 , value=v7}", entries.get(0)
+ .toString());
+ verify(tx);
+ }
+
+ @Test
+ public void testGetTimestamp() {
+ expect(tx.getStartTimestamp()).andReturn(5L);
+ replay(tx);
+ Assert.assertEquals(5L, rtx.getStartTimestamp());
+ verify(tx);
+ }
+}