You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by mw...@apache.org on 2016/07/07 20:44:03 UTC
[2/3] incubator-fluo git commit: #696 - Updated integration tests to
use core API instead of type layer
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
index 6264375..554a762 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
@@ -16,14 +16,13 @@
package org.apache.fluo.integration.impl;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Map;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedSnapshotBase.Value;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.impl.TransactorNode;
import org.apache.fluo.core.oracle.Stamp;
@@ -33,7 +32,6 @@ import org.junit.Assert;
import org.junit.Test;
public class ParallelScannerIT extends ITBaseImpl {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
@Test
public void testRowColumn() {
@@ -76,21 +74,21 @@ public class ParallelScannerIT extends ITBaseImpl {
// parallel scan
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row("bob9").fam("vote").qual("election1").set("N");
- tx1.mutate().row("bob9").fam("vote").qual("election2").set("Y");
+ tx1.set("bob9", new Column("vote", "election1"), "N");
+ tx1.set("bob9", new Column("vote", "election2"), "Y");
- tx1.mutate().row("joe3").fam("vote").qual("election1").set("nay");
- tx1.mutate().row("joe3").fam("vote").qual("election2").set("nay");
+ tx1.set("joe3", new Column("vote", "election1"), "nay");
+ tx1.set("joe3", new Column("vote", "election2"), "nay");
tx1.done();
final TestTransaction tx2 = new TestTransaction(env);
- tx2.mutate().row("sue4").fam("vote").qual("election1").set("+1");
- tx2.mutate().row("sue4").fam("vote").qual("election2").set("-1");
+ tx2.set("sue4", new Column("vote", "election1"), "+1");
+ tx2.set("sue4", new Column("vote", "election2"), "-1");
- tx2.mutate().row("eve2").fam("vote").qual("election1").set("no");
- tx2.mutate().row("eve2").fam("vote").qual("election2").set("no");
+ tx2.set("eve2", new Column("vote", "election1"), "no");
+ tx2.set("eve2", new Column("vote", "election2"), "no");
final CommitData cd2 = tx2.createCommitData();
Assert.assertTrue(tx2.preCommit(cd2));
@@ -116,17 +114,17 @@ public class ParallelScannerIT extends ITBaseImpl {
TestTransaction tx3 = new TestTransaction(env);
- Column e1Col = typeLayer.bc().fam("vote").qual("election1").vis();
+ Column e1Col = new Column("vote", "election1");
// normally when this test runs, some of the row/columns being read below will be locked for a
// bit
- Map<String, Map<Column, Value>> votes =
- tx3.get().rowsString("bob9", "joe3", "sue4", "eve2").columns(e1Col).toStringMap();
+ Map<String, Map<Column, String>> votes =
+ tx3.gets(Arrays.asList("bob9", "joe3", "sue4", "eve2"), Sets.newHashSet(e1Col));
- Assert.assertEquals("N", votes.get("bob9").get(e1Col).toString(""));
- Assert.assertEquals("nay", votes.get("joe3").get(e1Col).toString(""));
- Assert.assertEquals("+1", votes.get("sue4").get(e1Col).toString(""));
- Assert.assertEquals("no", votes.get("eve2").get(e1Col).toString(""));
+ Assert.assertEquals("N", votes.get("bob9").get(e1Col));
+ Assert.assertEquals("nay", votes.get("joe3").get(e1Col));
+ Assert.assertEquals("+1", votes.get("sue4").get(e1Col));
+ Assert.assertEquals("no", votes.get("eve2").get(e1Col));
Assert.assertEquals(4, votes.size());
}
@@ -140,16 +138,18 @@ public class ParallelScannerIT extends ITBaseImpl {
runParallelRecoveryTest(false);
}
- void runParallelRecoveryTest(boolean closeTransID) throws Exception {
+ private static final Column COL = new Column("7", "7");
+
+ private void runParallelRecoveryTest(boolean closeTransID) throws Exception {
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row(5).fam(7).qual(7).set(3);
- tx1.mutate().row(12).fam(7).qual(7).set(10);
- tx1.mutate().row(19).fam(7).qual(7).set(17);
- tx1.mutate().row(26).fam(7).qual(7).set(24);
- tx1.mutate().row(33).fam(7).qual(7).set(31);
- tx1.mutate().row(40).fam(7).qual(7).set(38);
- tx1.mutate().row(47).fam(7).qual(7).set(45);
+ tx1.set("5", COL, "3");
+ tx1.set("12", COL, "10");
+ tx1.set("19", COL, "17");
+ tx1.set("26", COL, "24");
+ tx1.set("33", COL, "31");
+ tx1.set("40", COL, "38");
+ tx1.set("47", COL, "45");
tx1.done();
@@ -157,18 +157,18 @@ public class ParallelScannerIT extends ITBaseImpl {
TestTransaction tx2 = new TestTransaction(env, tNode1);
- tx2.mutate().row(5).fam(7).qual(7).set(7);
- tx2.mutate().row(12).fam(7).qual(7).set(14);
- tx2.mutate().row(19).fam(7).qual(7).set(21);
+ tx2.set("5", COL, "7");
+ tx2.set("12", COL, "14");
+ tx2.set("19", COL, "21");
CommitData cd2 = tx2.createCommitData();
Assert.assertTrue(tx2.preCommit(cd2));
TestTransaction tx3 = new TestTransaction(env, tNode1);
- tx3.mutate().row(26).fam(7).qual(7).set(28);
- tx3.mutate().row(33).fam(7).qual(7).set(35);
- tx3.mutate().row(40).fam(7).qual(7).set(42);
+ tx3.set("26", COL, "28");
+ tx3.set("33", COL, "35");
+ tx3.set("40", COL, "42");
CommitData cd3 = tx3.createCommitData();
Assert.assertTrue(tx3.preCommit(cd3));
@@ -187,24 +187,23 @@ public class ParallelScannerIT extends ITBaseImpl {
}
}
- void check() throws Exception {
+ private void check() throws Exception {
TestTransaction tx = new TestTransaction(env);
- Column scol = typeLayer.bc().fam(7).qual(7).vis();
- Map<String, Map<Column, Value>> votes =
- tx.get().rowsString("5", "12", "19", "26", "33", "40", "47").columns(scol).toStringMap();
+ Map<String, Map<Column, String>> votes =
+ tx.gets(Arrays.asList("5", "12", "19", "26", "33", "40", "47"), Sets.newHashSet(COL));
// following should be rolled back
- Assert.assertEquals(3, votes.get("5").get(scol).toInteger(0));
- Assert.assertEquals(10, votes.get("12").get(scol).toInteger(0));
- Assert.assertEquals(17, votes.get("19").get(scol).toInteger(0));
+ Assert.assertEquals("3", votes.get("5").get(COL));
+ Assert.assertEquals("10", votes.get("12").get(COL));
+ Assert.assertEquals("17", votes.get("19").get(COL));
// following should be rolled forward
- Assert.assertEquals(28, votes.get("26").get(scol).toInteger(0));
- Assert.assertEquals(35, votes.get("33").get(scol).toInteger(0));
- Assert.assertEquals(42, votes.get("40").get(scol).toInteger(0));
+ Assert.assertEquals("28", votes.get("26").get(COL));
+ Assert.assertEquals("35", votes.get("33").get(COL));
+ Assert.assertEquals("42", votes.get("40").get(COL));
// unchanged and not locked
- Assert.assertEquals(45, votes.get("47").get(scol).toInteger(0));
+ Assert.assertEquals("45", votes.get("47").get(COL));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
index 1b99abc..4015339 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
@@ -20,15 +20,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedTransaction;
-import org.apache.fluo.api.types.TypedTransactionBase;
import org.apache.fluo.integration.ITBaseMini;
import org.junit.Assert;
import org.junit.Test;
@@ -38,43 +35,38 @@ import org.junit.Test;
*/
public class SelfNotificationIT extends ITBaseMini {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
-
- static final Column STAT_COUNT_COL = typeLayer.bc().fam("stat").qual("count").vis();
- static final Column EXPORT_CHECK_COL = typeLayer.bc().fam("export").qual("check").vis();
- static final Column EXPORT_COUNT_COL = typeLayer.bc().fam("export").qual("count").vis();
+ private static final Column STAT_COUNT_COL = new Column("stat", "count");
+ private static final Column EXPORT_CHECK_COL = new Column("export", "check");
+ private static final Column EXPORT_COUNT_COL = new Column("export", "count");
@Override
protected List<ObserverConfiguration> getObservers() {
return Collections.singletonList(new ObserverConfiguration(ExportingObserver.class.getName()));
}
- static List<Integer> exports = new ArrayList<>();
+ private static List<String> exports = new ArrayList<>();
public static class ExportingObserver extends AbstractObserver {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-
- TypedTransactionBase ttx = typeLayer.wrap(tx);
-
- Integer currentCount = ttx.get().row(row).col(STAT_COUNT_COL).toInteger();
- Integer exportCount = ttx.get().row(row).col(EXPORT_COUNT_COL).toInteger();
+ String r = row.toString();
+ String currentCount = tx.gets(r, STAT_COUNT_COL);
+ String exportCount = tx.gets(r, EXPORT_COUNT_COL);
if (exportCount != null) {
export(row, exportCount);
if (currentCount == null || exportCount.equals(currentCount)) {
- ttx.mutate().row(row).col(EXPORT_COUNT_COL).delete();
+ tx.delete(row, EXPORT_COUNT_COL);
} else {
- ttx.mutate().row(row).col(EXPORT_COUNT_COL).set(currentCount);
- ttx.mutate().row(row).col(EXPORT_CHECK_COL).set();
+ tx.set(r, EXPORT_COUNT_COL, currentCount);
+ tx.set(r, EXPORT_CHECK_COL, "");
}
-
}
}
- private void export(Bytes row, Integer exportCount) {
+ private void export(Bytes row, String exportCount) {
exports.add(exportCount);
}
@@ -87,36 +79,35 @@ public class SelfNotificationIT extends ITBaseMini {
@Test
public void test1() throws Exception {
- try (TypedTransaction tx1 = typeLayer.wrap(client.newTransaction())) {
- tx1.mutate().row("r1").col(STAT_COUNT_COL).set(3);
- tx1.mutate().row("r1").col(EXPORT_CHECK_COL).set();
- tx1.mutate().row("r1").col(EXPORT_COUNT_COL).set(3);
+ try (Transaction tx1 = client.newTransaction()) {
+ tx1.set("r1", STAT_COUNT_COL, "3");
+ tx1.set("r1", EXPORT_CHECK_COL, "");
+ tx1.set("r1", EXPORT_COUNT_COL, "3");
tx1.commit();
}
miniFluo.waitForObservers();
- Assert.assertEquals(Collections.singletonList(3), exports);
+ Assert.assertEquals(Collections.singletonList("3"), exports);
exports.clear();
miniFluo.waitForObservers();
Assert.assertEquals(0, exports.size());
- try (TypedTransaction tx2 = typeLayer.wrap(client.newTransaction())) {
- Assert.assertNull(tx2.get().row("r1").col(EXPORT_COUNT_COL).toInteger());
+ try (Transaction tx2 = client.newTransaction()) {
+ Assert.assertNull(tx2.gets("r1", EXPORT_COUNT_COL));
- tx2.mutate().row("r1").col(STAT_COUNT_COL).set(5);
- tx2.mutate().row("r1").col(EXPORT_CHECK_COL).set();
- tx2.mutate().row("r1").col(EXPORT_COUNT_COL).set(4);
+ tx2.set("r1", STAT_COUNT_COL, "5");
+ tx2.set("r1", EXPORT_CHECK_COL, "");
+ tx2.set("r1", EXPORT_COUNT_COL, "4");
tx2.commit();
}
miniFluo.waitForObservers();
- Assert.assertEquals(Arrays.asList(4, 5), exports);
+ Assert.assertEquals(Arrays.asList("4", "5"), exports);
exports.clear();
miniFluo.waitForObservers();
Assert.assertEquals(0, exports.size());
-
}
// TODO test self notification w/ weak notifications
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
index 1b19143..720bfd8 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
@@ -40,8 +40,6 @@ import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.iterator.ColumnIterator;
import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
@@ -55,7 +53,6 @@ import org.junit.Test;
*/
public class StochasticBankIT extends ITBaseImpl {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
private static AtomicInteger txCount = new AtomicInteger();
@Test
@@ -100,13 +97,13 @@ public class StochasticBankIT extends ITBaseImpl {
runVerifier(env, numAccounts, 1);
}
- private static Column balanceCol = typeLayer.bc().fam("data").qual("balance").vis();
+ private static Column balanceCol = new Column("data", "balance");
private static void populate(Environment env, int numAccounts) throws Exception {
TestTransaction tx = new TestTransaction(env);
for (int i = 0; i < numAccounts; i++) {
- tx.mutate().row(fmtAcct(i)).col(balanceCol).set(1000);
+ tx.set(fmtAcct(i), balanceCol, "1000");
}
tx.done();
@@ -155,12 +152,12 @@ public class StochasticBankIT extends ITBaseImpl {
while (true) {
try {
TestTransaction tx = new TestTransaction(env);
- int bal1 = tx.get().row(from).col(balanceCol).toInteger();
- int bal2 = tx.get().row(to).col(balanceCol).toInteger();
+ int bal1 = Integer.parseInt(tx.gets(from, balanceCol));
+ int bal2 = Integer.parseInt(tx.gets(to, balanceCol));
if (bal1 - amt >= 0) {
- tx.mutate().row(from).col(balanceCol).set(bal1 - amt);
- tx.mutate().row(to).col(balanceCol).set(bal2 + amt);
+ tx.set(from, balanceCol, (bal1 - amt) + "");
+ tx.set(to, balanceCol, (bal2 + amt) + "");
} else {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
index 9142f8e..ea5921c 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
@@ -19,6 +19,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
+import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.config.ScannerConfiguration;
@@ -28,31 +29,28 @@ import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.iterator.ColumnIterator;
import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedTransaction;
-import org.apache.fluo.api.types.TypedTransactionBase;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.ITBaseMini;
import org.apache.fluo.integration.TestTransaction;
+import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Test;
public class WeakNotificationIT extends ITBaseMini {
- private static TypeLayer tl = new TypeLayer(new StringEncoder());
+ private static final Column STAT_COUNT = new Column("stat", "count");
+ private static final Column STAT_CHECK = new Column("stat", "check");
public static class SimpleObserver extends AbstractObserver {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
- TypedTransactionBase ttx = tl.wrap(tx);
ScannerConfiguration sc = new ScannerConfiguration();
sc.setSpan(Span.exact(row, new Column(Bytes.of("stats"))));
- RowIterator rowIter = ttx.get(sc);
+ RowIterator rowIter = tx.get(sc);
int sum = 0;
@@ -61,19 +59,19 @@ public class WeakNotificationIT extends ITBaseMini {
while (colIter.hasNext()) {
Entry<Column, Bytes> colVal = colIter.next();
sum += Integer.parseInt(colVal.getValue().toString());
- ttx.delete(row, colVal.getKey());
+ tx.delete(row, colVal.getKey());
}
}
if (sum != 0) {
- sum += ttx.get().row(row).fam("stat").qual("count").toInteger(0);
- ttx.mutate().row(row).fam("stat").qual("count").set(sum);
+ sum += TestUtil.getOrDefault(tx, row.toString(), STAT_COUNT, 0);
+ tx.set(row.toString(), STAT_COUNT, sum + "");
}
}
@Override
public ObservedColumn getObservedColumn() {
- return new ObservedColumn(tl.bc().fam("stat").qual("check").vis(), NotificationType.WEAK);
+ return new ObservedColumn(STAT_CHECK, NotificationType.WEAK);
}
}
@@ -87,34 +85,34 @@ public class WeakNotificationIT extends ITBaseMini {
Environment env = new Environment(config);
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row("r1").fam("stat").qual("count").set(3);
+ tx1.set("r1", STAT_COUNT, "3");
tx1.done();
TestTransaction tx2 = new TestTransaction(env);
- tx2.mutate().row("r1").fam("stats").qual("af89").set(5);
- tx2.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ tx2.set("r1", new Column("stats", "af89"), "5");
+ tx2.setWeakNotification("r1", STAT_CHECK);
tx2.done();
TestTransaction tx3 = new TestTransaction(env);
- tx3.mutate().row("r1").fam("stats").qual("af99").set(7);
- tx3.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ tx3.set("r1", new Column("stats", "af99"), "7");
+ tx3.setWeakNotification("r1", STAT_CHECK);
tx3.done();
miniFluo.waitForObservers();
TestTransaction tx4 = new TestTransaction(env);
- Assert.assertEquals(15, tx4.get().row("r1").fam("stat").qual("count").toInteger(0));
+ Assert.assertEquals("15", tx4.gets("r1", STAT_COUNT));
// overlapping transactions that set a weak notification should commit w/ no problem
TestTransaction tx5 = new TestTransaction(env);
- tx5.mutate().row("r1").fam("stats").qual("bff7").set(11);
- tx5.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ tx5.set("r1", new Column("stats", "bff7"), "11");
+ tx5.setWeakNotification("r1", STAT_CHECK);
CommitData cd5 = tx5.createCommitData();
Assert.assertTrue(tx5.preCommit(cd5));
TestTransaction tx6 = new TestTransaction(env);
- tx6.mutate().row("r1").fam("stats").qual("bff0").set(13);
- tx6.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ tx6.set("r1", new Column("stats", "bff0"), "13");
+ tx6.setWeakNotification("r1", STAT_CHECK);
CommitData cd6 = tx6.createCommitData();
Assert.assertTrue(tx6.preCommit(cd6));
@@ -130,7 +128,7 @@ public class WeakNotificationIT extends ITBaseMini {
miniFluo.waitForObservers();
TestTransaction tx7 = new TestTransaction(env);
- Assert.assertEquals(39, tx7.get().row("r1").fam("stat").qual("count").toInteger(0));
+ Assert.assertEquals("39", tx7.gets("r1", STAT_COUNT));
env.close();
}
@@ -139,9 +137,9 @@ public class WeakNotificationIT extends ITBaseMini {
public void testNOOP() throws Exception {
// if an observer makes not updates in a transaction, it should still delete the weak
// notification
- try (TypedTransaction tx1 = tl.wrap(client.newTransaction())) {
- tx1.mutate().row("r1").fam("stat").qual("count").set(3);
- tx1.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ try (Transaction tx1 = client.newTransaction()) {
+ tx1.set("r1", STAT_COUNT, "3");
+ tx1.setWeakNotification("r1", STAT_CHECK);
tx1.commit();
}
@@ -151,9 +149,9 @@ public class WeakNotificationIT extends ITBaseMini {
@Test(expected = IllegalArgumentException.class)
public void testBadColumn() throws Exception {
- try (TypedTransaction tx1 = tl.wrap(client.newTransaction())) {
- tx1.mutate().row("r1").fam("stat").qual("count").set(3);
- tx1.mutate().row("r1").fam("stat").qual("foo").weaklyNotify();
+ try (Transaction tx1 = client.newTransaction()) {
+ tx1.set("r1", STAT_COUNT, "3");
+ tx1.setWeakNotification("r1", new Column("stat", "foo"));
tx1.commit();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
index 4ccfd3d..07ff45f 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
@@ -15,59 +15,63 @@
package org.apache.fluo.integration.impl;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import com.google.common.primitives.Ints;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedObserver;
-import org.apache.fluo.api.types.TypedSnapshot;
-import org.apache.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
+import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Test;
public class WeakNotificationOverlapIT extends ITBaseImpl {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
+ private static final Column STAT_TOTAL = new Column("stat", "total");
+ private static final Column STAT_PROCESSED = new Column("stat", "processed");
+ private static final Column STAT_CHANGED = new Column("stat", "changed");
- public static class TotalObserver extends TypedObserver {
+ public static class TotalObserver extends AbstractObserver {
@Override
public ObservedColumn getObservedColumn() {
- return new ObservedColumn(typeLayer.bc().fam("stat").qual("changed").vis(),
- NotificationType.WEAK);
+ return new ObservedColumn(STAT_CHANGED, NotificationType.WEAK);
}
@Override
- public void process(TypedTransactionBase tx, Bytes row, Column col) {
- Integer total = tx.get().row(row).fam("stat").qual("total").toInteger();
- if (total == null) {
+ public void process(TransactionBase tx, Bytes row, Column col) {
+ String r = row.toString();
+ String totalStr = tx.gets(r, STAT_TOTAL);
+ if (totalStr == null) {
return;
}
- int processed = tx.get().row(row).fam("stat").qual("processed").toInteger(0);
-
- tx.mutate().row(row).fam("stat").qual("processed").set(total);
- tx.mutate().row("all").fam("stat").qual("total").increment(total - processed);
+ Integer total = Integer.parseInt(totalStr);
+ int processed = TestUtil.getOrDefault(tx, r, STAT_PROCESSED, 0);
+ tx.set(r, new Column("stat", "processed"), total + "");
+ TestUtil.increment(tx, "all", new Column("stat", "total"), total - processed);
}
}
+
+
@Override
protected List<ObserverConfiguration> getObservers() {
- return Arrays.asList(new ObserverConfiguration(TotalObserver.class.getName()));
+ return Collections.singletonList(new ObserverConfiguration(TotalObserver.class.getName()));
}
@Test
@@ -75,80 +79,78 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
// this test ensures that processing of weak notification deletes based on startTs and not
// commitTs
- Column ntfyCol = typeLayer.bc().fam("stat").qual("changed").vis();
-
TestTransaction ttx1 = new TestTransaction(env);
- ttx1.mutate().row(1).fam("stat").qual("total").increment(1);
- ttx1.mutate().row(1).col(ntfyCol).weaklyNotify();
+ TestUtil.increment(ttx1, "1", STAT_TOTAL, 1);
+ ttx1.setWeakNotification("1", STAT_CHANGED);
ttx1.done();
- TestTransaction ttx2 = new TestTransaction(env, "1", ntfyCol);
+ TestTransaction ttx2 = new TestTransaction(env, "1", STAT_CHANGED);
TestTransaction ttx3 = new TestTransaction(env);
- ttx3.mutate().row(1).fam("stat").qual("total").increment(1);
- ttx3.mutate().row(1).col(ntfyCol).weaklyNotify();
+ TestUtil.increment(ttx3, "1", STAT_TOTAL, 1);
+ ttx3.setWeakNotification("1", STAT_CHANGED);
ttx3.done();
Assert.assertEquals(1, countNotifications());
- new TotalObserver().process(ttx2, Bytes.of("1"), ntfyCol);
+ new TotalObserver().process(ttx2, Bytes.of("1"), STAT_CHANGED);
// should not delete notification created by ttx3
ttx2.done();
TestTransaction snap1 = new TestTransaction(env);
- Assert.assertEquals(1, snap1.get().row("all").fam("stat").qual("total").toInteger(-1));
+ Assert.assertEquals("1", snap1.gets("all", STAT_TOTAL));
snap1.done();
Assert.assertEquals(1, countNotifications());
- TestTransaction ttx4 = new TestTransaction(env, "1", ntfyCol);
- new TotalObserver().process(ttx4, Bytes.of("1"), ntfyCol);
+ TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
+ new TotalObserver().process(ttx4, Bytes.of("1"), STAT_CHANGED);
ttx4.done();
Assert.assertEquals(0, countNotifications());
TestTransaction snap2 = new TestTransaction(env);
- Assert.assertEquals(2, snap2.get().row("all").fam("stat").qual("total").toInteger(-1));
+ Assert.assertEquals("2", snap2.gets("all", STAT_TOTAL));
snap2.done();
// the following code is a repeat of the above with a slight diff. The following tx creates a
// notification, but deletes the data so there is no work for the
// observer. This test the case where a observer deletes a notification w/o making any updates.
TestTransaction ttx5 = new TestTransaction(env);
- ttx5.mutate().row(1).fam("stat").qual("total").delete();
- ttx5.mutate().row(1).fam("stat").qual("processed").delete();
- ttx5.mutate().row(1).col(ntfyCol).weaklyNotify();
+ ttx5.delete("1", STAT_TOTAL);
+ ttx5.delete("1", STAT_PROCESSED);
+ ttx5.setWeakNotification("1", STAT_CHANGED);
ttx5.done();
Assert.assertEquals(1, countNotifications());
- TestTransaction ttx6 = new TestTransaction(env, "1", ntfyCol);
+ TestTransaction ttx6 = new TestTransaction(env, "1", STAT_CHANGED);
TestTransaction ttx7 = new TestTransaction(env);
- ttx7.mutate().row(1).fam("stat").qual("total").increment(1);
- ttx7.mutate().row(1).col(ntfyCol).weaklyNotify();
+ TestUtil.increment(ttx7, "1", STAT_TOTAL, 1);
+ ttx7.setWeakNotification("1", STAT_CHANGED);
ttx7.done();
Assert.assertEquals(1, countNotifications());
- new TotalObserver().process(ttx6, Bytes.of("1"), ntfyCol);
+ new TotalObserver().process(ttx6, Bytes.of("1"), STAT_CHANGED);
// should not delete notification created by ttx7
ttx6.done();
Assert.assertEquals(1, countNotifications());
TestTransaction snap3 = new TestTransaction(env);
- Assert.assertEquals(2, snap3.get().row("all").fam("stat").qual("total").toInteger(-1));
+ Assert.assertEquals("2", snap3.gets("all", STAT_TOTAL));
snap3.done();
- TestTransaction ttx8 = new TestTransaction(env, "1", ntfyCol);
- new TotalObserver().process(ttx8, Bytes.of("1"), ntfyCol);
+ TestTransaction ttx8 = new TestTransaction(env, "1", STAT_CHANGED);
+ new TotalObserver().process(ttx8, Bytes.of("1"), STAT_CHANGED);
ttx8.done();
Assert.assertEquals(0, countNotifications());
TestTransaction snap4 = new TestTransaction(env);
- Assert.assertEquals(3, snap4.get().row("all").fam("stat").qual("total").toInteger(-1));
+ Assert.assertEquals("3", snap4.gets("all", STAT_TOTAL));
snap4.done();
}
@@ -156,25 +158,23 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
public void testOverlap2() throws Exception {
// this test ensures that setting weak notification is based on commitTs and not startTs
- Column ntfyCol = typeLayer.bc().fam("stat").qual("changed").vis();
-
TestTransaction ttx1 = new TestTransaction(env);
- ttx1.mutate().row(1).fam("stat").qual("total").increment(1);
- ttx1.mutate().row(1).col(ntfyCol).weaklyNotify();
+ TestUtil.increment(ttx1, "1", STAT_TOTAL, 1);
+ ttx1.setWeakNotification("1", STAT_CHANGED);
ttx1.done();
Assert.assertEquals(1, countNotifications());
TestTransaction ttx2 = new TestTransaction(env);
- ttx2.mutate().row(1).fam("stat").qual("total").increment(1);
- ttx2.mutate().row(1).col(ntfyCol).weaklyNotify();
+ TestUtil.increment(ttx2, "1", STAT_TOTAL, 1);
+ ttx2.setWeakNotification("1", STAT_CHANGED);
CommitData cd2 = ttx2.createCommitData();
Assert.assertTrue(ttx2.preCommit(cd2));
// simulate an observer processing the notification created by ttx1 while ttx2 is in the middle
// of committing. Processing this observer should not delete
// the notification for ttx2. It should delete the notification for ttx1.
- TestTransaction ttx3 = new TestTransaction(env, "1", ntfyCol);
+ TestTransaction ttx3 = new TestTransaction(env, "1", STAT_CHANGED);
Stamp commitTs = env.getSharedResources().getOracleClient().getStamp();
Assert.assertTrue(ttx2.commitPrimaryColumn(cd2, commitTs));
@@ -183,21 +183,21 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
Assert.assertEquals(1, countNotifications());
- new TotalObserver().process(ttx3, Bytes.of("1"), ntfyCol);
+ new TotalObserver().process(ttx3, Bytes.of("1"), STAT_CHANGED);
ttx3.done();
Assert.assertEquals(1, countNotifications());
- try (TypedSnapshot snapshot = typeLayer.wrap(client.newSnapshot())) {
- Assert.assertEquals(1, snapshot.get().row("all").fam("stat").qual("total").toInteger(-1));
+ try (Snapshot snapshot = client.newSnapshot()) {
+ Assert.assertEquals("1", snapshot.gets("all", STAT_TOTAL));
}
- TestTransaction ttx4 = new TestTransaction(env, "1", ntfyCol);
- new TotalObserver().process(ttx4, Bytes.of("1"), ntfyCol);
+ TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
+ new TotalObserver().process(ttx4, Bytes.of("1"), STAT_CHANGED);
ttx4.done();
Assert.assertEquals(0, countNotifications());
- try (TypedSnapshot snapshot = typeLayer.wrap(client.newSnapshot())) {
- Assert.assertEquals(2, snapshot.get().row("all").fam("stat").qual("total").toInteger(-1));
+ try (Snapshot snapshot = client.newSnapshot()) {
+ Assert.assertEquals("2", snapshot.gets("all", STAT_TOTAL));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index c89a91a..e4204aa 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -18,6 +18,8 @@ package org.apache.fluo.integration.impl;
import java.util.Collections;
import java.util.List;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.config.ScannerConfiguration;
@@ -28,11 +30,6 @@ import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.iterator.ColumnIterator;
import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.api.observer.Observer;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedSnapshot;
-import org.apache.fluo.api.types.TypedTransaction;
-import org.apache.fluo.api.types.TypedTransactionBase;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.worker.NotificationFinder;
@@ -50,9 +47,10 @@ import org.junit.Test;
*/
public class WorkerIT extends ITBaseMini {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
+ private static final Column LAST_UPDATE = new Column("attr", "lastupdate");
+ private static final Column DEGREE = new Column("attr", "degree");
- private static Column observedColumn = typeLayer.bc().fam("attr").qual("lastupdate").vis();
+ private static Column observedColumn = LAST_UPDATE;
@Override
protected List<ObserverConfiguration> getObservers() {
@@ -65,17 +63,17 @@ public class WorkerIT extends ITBaseMini {
public void init(Context context) {}
@Override
- public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+ public void process(TransactionBase tx, Bytes rowBytes, Column col) throws Exception {
- TypedTransactionBase ttx = typeLayer.wrap(tx);
+ String row = rowBytes.toString();
// get previously calculated degree
- String degree = ttx.get().row(row).fam("attr").qual("degree").toString();
+ String degree = tx.gets(row, DEGREE);
// calculate new degree
int count = 0;
RowIterator riter =
- ttx.get(new ScannerConfiguration().setSpan(Span.exact(row, new Column("link"))));
+ tx.get(new ScannerConfiguration().setSpan(Span.exact(row, new Column("link"))));
while (riter.hasNext()) {
ColumnIterator citer = riter.next().getValue();
while (citer.hasNext()) {
@@ -86,15 +84,15 @@ public class WorkerIT extends ITBaseMini {
String degree2 = "" + count;
if (degree == null || !degree.equals(degree2)) {
- ttx.mutate().row(row).fam("attr").qual("degree").set(degree2);
+ tx.set(row, DEGREE, degree2);
// put new entry in degree index
- ttx.mutate().row("IDEG" + degree2).fam("node").qual(row).set("");
+ tx.set("IDEG" + degree2, new Column("node", row), "");
}
if (degree != null) {
// delete old degree in index
- ttx.mutate().row("IDEG" + degree).fam("node").qual(row).delete();
+ tx.delete("IDEG" + degree, new Column("node", row));
}
}
@@ -119,12 +117,12 @@ public class WorkerIT extends ITBaseMini {
// verify observer updated degree index
TestTransaction tx3 = new TestTransaction(env);
- Assert.assertEquals(2, tx3.get().row("N0003").fam("attr").qual("degree").toInteger(0));
- Assert.assertEquals("", tx3.get().row("IDEG2").fam("node").qual("N0003").toString());
+ Assert.assertEquals("2", tx3.gets("N0003", DEGREE));
+ Assert.assertEquals("", tx3.gets("IDEG2", new Column("node", "N0003")));
// add a link between two nodes in a graph
- tx3.mutate().row("N0003").fam("link").qual("N0010").set("");
- tx3.mutate().row("N0003").fam("attr").qual("lastupdate").set(System.currentTimeMillis());
+ tx3.set("N0003", new Column("link", "N0010"), "");
+ tx3.set("N0003", LAST_UPDATE, System.currentTimeMillis() + "");
tx3.done();
miniFluo.waitForObservers();
@@ -132,28 +130,28 @@ public class WorkerIT extends ITBaseMini {
// verify observer updated degree index. Should have deleted old index entry
// and added a new one
TestTransaction tx4 = new TestTransaction(env);
- Assert.assertEquals(3, tx4.get().row("N0003").fam("attr").qual("degree").toInteger(0));
- Assert.assertNull("", tx4.get().row("IDEG2").fam("node").qual("N0003").toString());
- Assert.assertEquals("", tx4.get().row("IDEG3").fam("node").qual("N0003").toString());
+ Assert.assertEquals("3", tx4.gets("N0003", DEGREE));
+ Assert.assertNull("", tx4.gets("IDEG2", new Column("node", "N0003")));
+ Assert.assertEquals("", tx4.gets("IDEG3", new Column("node", "N0003")));
// test rollback
TestTransaction tx5 = new TestTransaction(env);
- tx5.mutate().row("N0003").fam("link").qual("N0030").set("");
- tx5.mutate().row("N0003").fam("attr").qual("lastupdate").set(System.currentTimeMillis());
+ tx5.set("N0003", new Column("link", "N0030"), "");
+ tx5.set("N0003", LAST_UPDATE, System.currentTimeMillis() + "");
tx5.done();
TestTransaction tx6 = new TestTransaction(env);
- tx6.mutate().row("N0003").fam("link").qual("N0050").set("");
- tx6.mutate().row("N0003").fam("attr").qual("lastupdate").set(System.currentTimeMillis());
+ tx6.set("N0003", new Column("link", "N0050"), "");
+ tx6.set("N0003", LAST_UPDATE, System.currentTimeMillis() + "");
CommitData cd = tx6.createCommitData();
- tx6.preCommit(cd, new RowColumn("N0003", new Column("attr", "lastupdate")));
+ tx6.preCommit(cd, new RowColumn("N0003", LAST_UPDATE));
miniFluo.waitForObservers();
TestTransaction tx7 = new TestTransaction(env);
- Assert.assertEquals(4, tx7.get().row("N0003").fam("attr").qual("degree").toInteger(0));
- Assert.assertNull("", tx7.get().row("IDEG3").fam("node").qual("N0003").toString());
- Assert.assertEquals("", tx7.get().row("IDEG4").fam("node").qual("N0003").toString());
+ Assert.assertEquals("4", tx7.gets("N0003", DEGREE));
+ Assert.assertNull("", tx7.gets("IDEG3", new Column("node", "N0003")));
+ Assert.assertEquals("", tx7.gets("IDEG4", new Column("node", "N0003")));
env.close();
}
@@ -163,11 +161,10 @@ public class WorkerIT extends ITBaseMini {
*/
@Test
public void testDiffObserverConfig() throws Exception {
- Column old = observedColumn;
- observedColumn = typeLayer.bc().fam("attr2").qual("lastupdate").vis();
+ observedColumn = new Column("attr2", "lastupdate");
try {
try (Environment env = new Environment(config); Observers observers = new Observers(env)) {
- observers.getObserver(typeLayer.bc().fam("attr").qual("lastupdate").vis());
+ observers.getObserver(LAST_UPDATE);
}
Assert.fail();
@@ -176,14 +173,14 @@ public class WorkerIT extends ITBaseMini {
Assert.assertTrue(ise.getMessage().contains(
"Mismatch between configured column and class column"));
} finally {
- observedColumn = old;
+ observedColumn = LAST_UPDATE;
}
}
private void addLink(String from, String to) {
- try (TypedTransaction tx = typeLayer.wrap(client.newTransaction())) {
- tx.mutate().row(from).fam("link").qual(to).set("");
- tx.mutate().row(from).fam("attr").qual("lastupdate").set(System.currentTimeMillis());
+ try (Transaction tx = client.newTransaction()) {
+ tx.set(from, new Column("link", to), "");
+ tx.set(from, LAST_UPDATE, System.currentTimeMillis() + "");
tx.commit();
}
}
@@ -207,9 +204,9 @@ public class WorkerIT extends ITBaseMini {
miniFluo.waitForObservers();
- try (TypedSnapshot snap = typeLayer.wrap(client.newSnapshot())) {
- Assert.assertEquals(10, snap.get().row("N0003").fam("attr").qual("degree").toInteger(0));
- Assert.assertEquals("", snap.get().row("IDEG10").fam("node").qual("N0003").toString());
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertEquals("10", snap.gets("N0003", DEGREE));
+ Assert.assertEquals("", snap.gets("IDEG10", new Column("node", "N0003")));
}
nf2.stop();
@@ -220,10 +217,10 @@ public class WorkerIT extends ITBaseMini {
miniFluo.waitForObservers();
- try (TypedSnapshot snap = typeLayer.wrap(client.newSnapshot())) {
- Assert.assertEquals(19, snap.get().row("N0003").fam("attr").qual("degree").toInteger(0));
- Assert.assertEquals("", snap.get().row("IDEG19").fam("node").qual("N0003").toString());
- Assert.assertNull(snap.get().row("IDEG10").fam("node").qual("N0003").toString());
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertEquals("19", snap.gets("N0003", DEGREE));
+ Assert.assertEquals("", snap.gets("IDEG19", new Column("node", "N0003")));
+ Assert.assertNull(snap.gets("IDEG10", new Column("node", "N0003")));
}
nf1.stop();
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 755fcfe..b26f0ee 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -20,20 +20,18 @@ import java.util.Arrays;
import java.util.List;
import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedLoader;
-import org.apache.fluo.api.types.TypedObserver;
-import org.apache.fluo.api.types.TypedSnapshot;
-import org.apache.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.integration.ITBaseMini;
+import org.apache.fluo.integration.TestUtil;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
@@ -43,16 +41,16 @@ import org.junit.Test;
public class LogIT extends ITBaseMini {
- private static TypeLayer tl = new TypeLayer(new StringEncoder());
+ private static final Column STAT_COUNT = new Column("stat", "count");
- static class SimpleLoader extends TypedLoader {
- @Override
- public void load(TypedTransactionBase tx, Context context) throws Exception {
- tx.mutate().row("r1").fam("a").qual("b").increment(1);
+ static class SimpleLoader implements Loader {
+
+ public void load(TransactionBase tx, Context context) throws Exception {
+ TestUtil.increment(tx, "r1", new Column("a", "b"), 1);
}
}
- static class TriggerLoader extends TypedLoader {
+ static class TriggerLoader implements Loader {
int r;
@@ -61,9 +59,9 @@ public class LogIT extends ITBaseMini {
}
@Override
- public void load(TypedTransactionBase tx, Context context) throws Exception {
- tx.mutate().row(r).fam("stat").qual("count").set(1);
- tx.mutate().row(r).fam("stat").qual("count").weaklyNotify();
+ public void load(TransactionBase tx, Context context) throws Exception {
+ tx.set(r + "", STAT_COUNT, "1");
+ tx.setWeakNotification(r + "", STAT_COUNT);
}
}
@@ -75,10 +73,10 @@ public class LogIT extends ITBaseMini {
private static Column bCol2 = new Column(Bytes.of(new byte[] {'c', 0x09, '2'}),
Bytes.of(new byte[] {'c', (byte) 0xe5, '2'}));
- static class BinaryLoader1 extends TypedLoader {
+ static class BinaryLoader1 implements Loader {
@Override
- public void load(TypedTransactionBase tx, Context context) throws Exception {
+ public void load(TransactionBase tx, Context context) throws Exception {
tx.delete(bRow1, bCol1);
tx.get(bRow2, bCol1);
@@ -92,7 +90,7 @@ public class LogIT extends ITBaseMini {
}
}
- public static class BinaryObserver extends TypedObserver {
+ public static class BinaryObserver extends AbstractObserver {
@Override
public ObservedColumn getObservedColumn() {
@@ -100,23 +98,23 @@ public class LogIT extends ITBaseMini {
}
@Override
- public void process(TypedTransactionBase tx, Bytes row, Column col) {
+ public void process(TransactionBase tx, Bytes row, Column col) {
tx.get(bRow1, bCol2);
tx.get(bRow2, ImmutableSet.of(bCol1, bCol2));
tx.get(ImmutableSet.of(bRow1, bRow2), ImmutableSet.of(bCol1, bCol2));
}
}
- public static class TestObserver extends TypedObserver {
+ public static class TestObserver extends AbstractObserver {
@Override
public ObservedColumn getObservedColumn() {
- return new ObservedColumn(tl.bc().fam("stat").qual("count").vis(), NotificationType.WEAK);
+ return new ObservedColumn(STAT_COUNT, NotificationType.WEAK);
}
@Override
- public void process(TypedTransactionBase tx, Bytes row, Column col) {
- tx.mutate().row("all").col(col).increment(tx.get().row(row).col(col).toInteger());
+ public void process(TransactionBase tx, Bytes row, Column col) {
+ TestUtil.increment(tx, "all", col, Integer.parseInt(tx.gets(row.toString(), col)));
}
}
@@ -245,9 +243,9 @@ public class LogIT extends ITBaseMini {
}
miniFluo.waitForObservers();
- try (TypedSnapshot snap = tl.wrap(client.newSnapshot())) {
- Assert.assertTrue(snap.get().row("all").fam("stat").qual("count").toInteger(-1) >= 1);
- Assert.assertEquals(1, snap.get().row("r1").fam("a").qual("b").toInteger(-1));
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertTrue(Integer.parseInt(snap.gets("all", STAT_COUNT)) >= 1);
+ Assert.assertEquals("1", snap.gets("r1", new Column("a", "b")));
}
} finally {
logger.removeAppender(appender);
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
index f67f453..d96dc1a 100644
--- a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
+++ b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
@@ -24,8 +24,6 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
import org.apache.fluo.mapreduce.FluoKeyValue;
@@ -45,8 +43,6 @@ import org.junit.rules.TemporaryFolder;
public class FluoFileOutputFormatIT extends ITBaseImpl {
- static final TypeLayer typeLayer = new TypeLayer(new StringEncoder());
-
public static class TestMapper extends Mapper<LongWritable, Text, Key, Value> {
private FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
@@ -106,25 +102,25 @@ public class FluoFileOutputFormatIT extends ITBaseImpl {
TestTransaction tx1 = new TestTransaction(env);
TestTransaction tx2 = new TestTransaction(env);
- Assert.assertEquals(1, tx1.get().row("a").fam("b").qual("c").toInteger(0));
- Assert.assertEquals(2, tx1.get().row("d").fam("b").qual("c").toInteger(0));
- Assert.assertEquals(90, tx1.get().row("foo").fam("moo").qual("moo").toInteger(0));
+ Assert.assertEquals("1", tx1.gets("a", new Column("b", "c")));
+ Assert.assertEquals("2", tx1.gets("d", new Column("b", "c")));
+ Assert.assertEquals("90", tx1.gets("foo", new Column("moo", "moo")));
- tx1.mutate().row("a").fam("b").qual("c").set("3");
- tx1.mutate().row("d").fam("b").qual("c").delete();
+ tx1.set("a", new Column("b", "c"), "3");
+ tx1.delete("d", new Column("b", "c"));
tx1.done();
// should not see changes from tx1
- Assert.assertEquals(1, tx2.get().row("a").fam("b").qual("c").toInteger(0));
- Assert.assertEquals(2, tx2.get().row("d").fam("b").qual("c").toInteger(0));
- Assert.assertEquals(90, tx2.get().row("foo").fam("moo").qual("moo").toInteger(0));
+ Assert.assertEquals("1", tx2.gets("a", new Column("b", "c")));
+ Assert.assertEquals("2", tx2.gets("d", new Column("b", "c")));
+ Assert.assertEquals("90", tx2.gets("foo", new Column("moo", "moo")));
TestTransaction tx3 = new TestTransaction(env);
// should see changes from tx1
- Assert.assertEquals(3, tx3.get().row("a").fam("b").qual("c").toInteger(0));
- Assert.assertNull(tx3.get().row("d").fam("b").qual("c").toInteger());
- Assert.assertEquals(90, tx3.get().row("foo").fam("moo").qual("moo").toInteger(0));
+ Assert.assertEquals("3", tx3.gets("a", new Column("b", "c")));
+ Assert.assertNull(tx3.gets("d", new Column("b", "c")));
+ Assert.assertEquals("90", tx3.gets("foo", new Column("moo", "moo")));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
----------------------------------------------------------------------
diff --git a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
index c515d7b..7f06bf7 100644
--- a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
+++ b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
@@ -18,8 +18,7 @@ package org.apache.fluo.mapreduce.it;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
+import org.apache.fluo.api.data.Column;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
import org.apache.fluo.mapreduce.FluoMutationGenerator;
@@ -28,8 +27,6 @@ import org.junit.Test;
public class MutationBuilderIT extends ITBaseImpl {
- static final TypeLayer tl = new TypeLayer(new StringEncoder());
-
@Test
public void testBatchWrite() throws Exception {
// test initializing a Fluo table by batch writing to it
@@ -39,15 +36,15 @@ public class MutationBuilderIT extends ITBaseImpl {
try {
FluoMutationGenerator mb1 = new FluoMutationGenerator(Bytes.of("row1"));
- mb1.put(tl.bc().fam("cf1").qual("cq1").vis(), Bytes.of("v1"));
- mb1.put(tl.bc().fam("cf1").qual("cq2").vis(), Bytes.of("v2"));
- mb1.put(tl.bc().fam("cf1").qual("cq3").vis(), Bytes.of("v3"));
+ mb1.put(new Column("cf1", "cq1"), Bytes.of("v1"));
+ mb1.put(new Column("cf1", "cq2"), Bytes.of("v2"));
+ mb1.put(new Column("cf1", "cq3"), Bytes.of("v3"));
bw.addMutation(mb1.build());
FluoMutationGenerator mb2 = new FluoMutationGenerator(Bytes.of("row2"));
- mb2.put(tl.bc().fam("cf1").qual("cq1").vis(), Bytes.of("v4"));
- mb2.put(tl.bc().fam("cf1").qual("cq2").vis(), Bytes.of("v5"));
+ mb2.put(new Column("cf1", "cq1"), Bytes.of("v4"));
+ mb2.put(new Column("cf1", "cq2"), Bytes.of("v5"));
bw.addMutation(mb2.build());
@@ -58,32 +55,32 @@ public class MutationBuilderIT extends ITBaseImpl {
TestTransaction tx1 = new TestTransaction(env);
TestTransaction tx2 = new TestTransaction(env);
- Assert.assertEquals("v1", tx1.get().row("row1").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v2", tx1.get().row("row1").fam("cf1").qual("cq2").toString());
- Assert.assertEquals("v3", tx1.get().row("row1").fam("cf1").qual("cq3").toString());
- Assert.assertEquals("v4", tx1.get().row("row2").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v5", tx1.get().row("row2").fam("cf1").qual("cq2").toString());
+ Assert.assertEquals("v1", tx1.gets("row1", new Column("cf1", "cq1")));
+ Assert.assertEquals("v2", tx1.gets("row1", new Column("cf1", "cq2")));
+ Assert.assertEquals("v3", tx1.gets("row1", new Column("cf1", "cq3")));
+ Assert.assertEquals("v4", tx1.gets("row2", new Column("cf1", "cq1")));
+ Assert.assertEquals("v5", tx1.gets("row2", new Column("cf1", "cq2")));
- tx1.mutate().row("row1").fam("cf1").qual("cq2").set("v6");
- tx1.mutate().row("row1").fam("cf1").qual("cq3").delete();
- tx1.mutate().row("row2").fam("cf1").qual("cq2").set("v7");
+ tx1.set("row1", new Column("cf1", "cq2"), "v6");
+ tx1.delete("row1", new Column("cf1", "cq3"));
+ tx1.set("row2", new Column("cf1", "cq2"), "v7");
tx1.done();
// tx2 should see not changes from tx1
- Assert.assertEquals("v1", tx2.get().row("row1").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v2", tx2.get().row("row1").fam("cf1").qual("cq2").toString());
- Assert.assertEquals("v3", tx2.get().row("row1").fam("cf1").qual("cq3").toString());
- Assert.assertEquals("v4", tx2.get().row("row2").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v5", tx2.get().row("row2").fam("cf1").qual("cq2").toString());
+ Assert.assertEquals("v1", tx2.gets("row1", new Column("cf1", "cq1")));
+ Assert.assertEquals("v2", tx2.gets("row1", new Column("cf1", "cq2")));
+ Assert.assertEquals("v3", tx2.gets("row1", new Column("cf1", "cq3")));
+ Assert.assertEquals("v4", tx2.gets("row2", new Column("cf1", "cq1")));
+ Assert.assertEquals("v5", tx2.gets("row2", new Column("cf1", "cq2")));
TestTransaction tx3 = new TestTransaction(env);
// should see changes from tx1
- Assert.assertEquals("v1", tx3.get().row("row1").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v6", tx3.get().row("row1").fam("cf1").qual("cq2").toString());
- Assert.assertNull(tx3.get().row("row1").fam("cf1").qual("cq3").toString());
- Assert.assertEquals("v4", tx3.get().row("row2").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v7", tx3.get().row("row2").fam("cf1").qual("cq2").toString());
+ Assert.assertEquals("v1", tx3.gets("row1", new Column("cf1", "cq1")));
+ Assert.assertEquals("v6", tx3.gets("row1", new Column("cf1", "cq2")));
+ Assert.assertNull(tx3.gets("row1", new Column("cf1", "cq3")));
+ Assert.assertEquals("v4", tx3.gets("row2", new Column("cf1", "cq1")));
+ Assert.assertEquals("v7", tx3.gets("row2", new Column("cf1", "cq2")));
}
}