You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 22:07:50 UTC
[02/10] incubator-fluo-recipes git commit: Updated package names in
core module
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshot.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshot.java
new file mode 100644
index 0000000..64fa7c2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshot.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.client.Snapshot;
+
+public class MockSnapshot extends MockSnapshotBase implements Snapshot {
+
+ MockSnapshot(String... entries) {
+ super(entries);
+ }
+
+ @Override
+ public void close() {
+ // no resources need to be closed
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
new file mode 100644
index 0000000..d31b36c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.core.impl.TxStringUtil;
+
+public class MockSnapshotBase implements SnapshotBase {
+
+ final Map<Bytes, Map<Column, Bytes>> getData;
+
+ /**
+ * Initializes {@link #getData} using {@link #toRCVM(String...)}
+ */
+ MockSnapshotBase(String... entries) {
+ getData = toRCVM(entries);
+ }
+
+ @Override
+ public Bytes get(Bytes row, Column column) {
+ Map<Column, Bytes> cols = getData.get(row);
+ if (cols != null) {
+ return cols.get(column);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+ Map<Column, Bytes> ret = new HashMap<>();
+ Map<Column, Bytes> cols = getData.get(row);
+ if (cols != null) {
+ for (Column column : columns) {
+ Bytes val = cols.get(column);
+ if (val != null) {
+ ret.put(column, val);
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+
+ Map<Bytes, Map<Column, Bytes>> ret = new HashMap<>();
+
+ for (Bytes row : rows) {
+ Map<Column, Bytes> colMap = get(row, columns);
+ if (colMap != null && colMap.size() > 0) {
+ ret.put(row, colMap);
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public RowIterator get(ScannerConfiguration config) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * toRCVM stands for "To Row Column Value Map". This is a convenience function that takes strings
+ * of the format {@code <row>,<col fam>:<col qual>[:col vis],
+ * <value>} and generates a row, column, value map.
+ */
+ public static Map<Bytes, Map<Column, Bytes>> toRCVM(String... entries) {
+ Map<Bytes, Map<Column, Bytes>> ret = new HashMap<>();
+
+ for (String entry : entries) {
+ String[] rcv = entry.split(",");
+ if (rcv.length != 3 && !(rcv.length == 2 && entry.trim().endsWith(","))) {
+ throw new IllegalArgumentException(
+ "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+ }
+
+ Bytes row = Bytes.of(rcv[0]);
+ String[] colFields = rcv[1].split(":");
+
+ Column col;
+ if (colFields.length == 3) {
+ col = new Column(colFields[0], colFields[1], colFields[2]);
+ } else if (colFields.length == 2) {
+ col = new Column(colFields[0], colFields[1]);
+ } else {
+ throw new IllegalArgumentException(
+ "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+ }
+
+ Bytes val;
+ if (rcv.length == 2) {
+ val = Bytes.EMPTY;
+ } else {
+ val = Bytes.of(rcv[2]);
+ }
+
+ Map<Column, Bytes> cols = ret.get(row);
+ if (cols == null) {
+ cols = new HashMap<>();
+ ret.put(row, cols);
+ }
+
+ cols.put(col, val);
+ }
+ return ret;
+ }
+
+ /**
+ * toRCM stands for "To Row Column Map". This is a convenience function that takes strings of the
+ * format {@code <row>,<col fam>:<col qual>[:col vis]} and generates a row, column map.
+ */
+ public static Map<Bytes, Set<Column>> toRCM(String... entries) {
+ Map<Bytes, Set<Column>> ret = new HashMap<>();
+
+ for (String entry : entries) {
+ String[] rcv = entry.split(",");
+ if (rcv.length != 2) {
+ throw new IllegalArgumentException(
+ "expected <row>,<col fam>:<col qual>[:col vis] but saw : " + entry);
+ }
+
+ Bytes row = Bytes.of(rcv[0]);
+ String[] colFields = rcv[1].split(":");
+
+ Column col;
+ if (colFields.length == 3) {
+ col = new Column(colFields[0], colFields[1], colFields[2]);
+ } else if (colFields.length == 2) {
+ col = new Column(colFields[0], colFields[1]);
+ } else {
+ throw new IllegalArgumentException(
+ "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+ }
+
+ Set<Column> cols = ret.get(row);
+ if (cols == null) {
+ cols = new HashSet<>();
+ ret.put(row, cols);
+ }
+
+ cols.add(col);
+ }
+ return ret;
+ }
+
+ @Override
+ public long getStartTimestamp() {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public String gets(String row, Column column) {
+ return TxStringUtil.gets(this, row, column);
+ }
+
+ @Override
+ public Map<Column, String> gets(String row, Set<Column> columns) {
+ return TxStringUtil.gets(this, row, columns);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+ return TxStringUtil.gets(this, rows, columns);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+ return TxStringUtil.gets(this, rowColumns);
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransaction.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransaction.java
new file mode 100644
index 0000000..750e0ee
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransaction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.exceptions.CommitException;
+
+public class MockTransaction extends MockTransactionBase implements Transaction {
+
+ MockTransaction(String... entries) {
+ super(entries);
+ }
+
+ @Override
+ public void commit() throws CommitException {
+ // does nothing
+ }
+
+ @Override
+ public void close() {
+ // no resources to close
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransactionBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransactionBase.java
new file mode 100644
index 0000000..05ab87e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockTransactionBase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+
+/**
+ * A very simple implementation of {@link TransactionBase} used for testing. All reads are serviced
+ * from {@link #getData}. Updates are stored in {@link #setData}, {@link #deletes}, or
+ * {@link #weakNotifications} depending on the update type.
+ */
+public class MockTransactionBase extends MockSnapshotBase implements TransactionBase {
+
+ final Map<Bytes, Map<Column, Bytes>> setData = new HashMap<>();
+ final Map<Bytes, Set<Column>> deletes = new HashMap<>();
+ final Map<Bytes, Set<Column>> weakNotifications = new HashMap<>();
+
+ MockTransactionBase(String... entries) {
+ super(entries);
+ }
+
+ @Override
+ public void setWeakNotification(Bytes row, Column col) {
+ Set<Column> cols = weakNotifications.get(row);
+ if (cols == null) {
+ cols = new HashSet<>();
+ weakNotifications.put(row, cols);
+ }
+
+ cols.add(col);
+ }
+
+ @Override
+ public void set(Bytes row, Column col, Bytes value) {
+ Map<Column, Bytes> cols = setData.get(row);
+ if (cols == null) {
+ cols = new HashMap<>();
+ setData.put(row, cols);
+ }
+
+ cols.put(col, value);
+ }
+
+ @Override
+ public void delete(Bytes row, Column col) {
+ Set<Column> cols = deletes.get(row);
+ if (cols == null) {
+ cols = new HashSet<>();
+ deletes.put(row, cols);
+ }
+
+ cols.add(col);
+ }
+
+ @Override
+ public void setWeakNotification(String row, Column col) {
+ setWeakNotification(Bytes.of(row), col);
+ }
+
+ @Override
+ public void set(String row, Column col, String value) throws AlreadySetException {
+ set(Bytes.of(row), col, Bytes.of(value));
+ }
+
+ @Override
+ public void delete(String row, Column col) {
+ delete(Bytes.of(row), col);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java
new file mode 100644
index 0000000..6b38e12
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.types.TypedSnapshotBase.Value;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TypeLayerTest {
+
+ @Test
+ public void testColumns() throws Exception {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockTransactionBase tt =
+ new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+ "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+ TypedTransactionBase ttx = tl.wrap(tt);
+
+ Map<Column, Value> results =
+ ttx.get().row("r2")
+ .columns(ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7")));
+
+ Assert.assertNull(results.get(new Column("cf2", "6")).toInteger());
+ Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0));
+ Assert.assertEquals(12, (int) results.get(new Column("cf2", "7")).toInteger());
+ Assert.assertEquals(12, results.get(new Column("cf2", "7")).toInteger(0));
+
+ Assert.assertEquals(1, results.size());
+
+ results =
+ ttx.get()
+ .row("r2")
+ .columns(
+ ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2",
+ "8")));
+
+ Assert.assertNull(results.get(new Column("cf2", "6")).toInteger());
+ Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0));
+ Assert.assertEquals(12, (int) results.get(new Column("cf2", "7")).toInteger());
+ Assert.assertEquals(12, results.get(new Column("cf2", "7")).toInteger(0));
+ Assert.assertEquals(13, (int) results.get(new Column("cf2", "8")).toInteger());
+ Assert.assertEquals(13, results.get(new Column("cf2", "8")).toInteger(0));
+
+ Assert.assertEquals(2, results.size());
+
+ // test var args
+ Map<Column, Value> results2 =
+ ttx.get().row("r2")
+ .columns(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2", "8"));
+ Assert.assertEquals(results, results2);
+ }
+
+ @Test
+ public void testVis() throws Exception {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockTransactionBase tt = new MockTransactionBase("r1,cf1:cq1:A,v1", "r1,cf1:cq2:A&B,v2");
+
+ TypedTransactionBase ttx = tl.wrap(tt);
+
+ Assert.assertNull(ttx.get().row("r1").fam("cf1").qual("cq1").toString());
+ Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A").toString());
+ Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A".getBytes())
+ .toString());
+ Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A"))
+ .toString());
+ Assert.assertEquals("v1",
+ ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A".getBytes())).toString());
+
+ Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString());
+ Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes())
+ .toString());
+ Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B"))
+ .toString());
+ Assert.assertNull("v1",
+ ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes()))
+ .toString());
+
+ Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString("v3"));
+ Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes())
+ .toString("v3"));
+ Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B"))
+ .toString("v3"));
+ Assert.assertEquals(
+ "v3",
+ ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes()))
+ .toString("v3"));
+
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").set(3);
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).set(4);
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&D")).set(5);
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&F".getBytes())).set(7);
+
+ Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1:A&B,3", "r1,cf1:cq1:A&C,4",
+ "r1,cf1:cq1:A&D,5", "r1,cf1:cq1:A&F,7"), tt.setData);
+ tt.setData.clear();
+
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").delete();
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).delete();
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&D")).delete();
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&F".getBytes())).delete();
+
+ Assert.assertEquals(MockTransactionBase.toRCM("r1,cf1:cq1:A&B", "r1,cf1:cq1:A&C",
+ "r1,cf1:cq1:A&D", "r1,cf1:cq1:A&F"), tt.deletes);
+ tt.deletes.clear();
+ Assert.assertEquals(0, tt.setData.size());
+ Assert.assertEquals(0, tt.weakNotifications.size());
+
+ }
+
+ @Test
+ public void testBuildColumn() {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0".getBytes()).qual("q0".getBytes())
+ .vis());
+ Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0").qual("q0").vis());
+ Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5).qual(7).vis());
+ Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5l).qual(7l).vis());
+ Assert.assertEquals(new Column("5", "7"), tl.bc().fam(Bytes.of("5")).qual(Bytes.of("7")).vis());
+ Assert.assertEquals(new Column("5", "7"),
+ tl.bc().fam(ByteBuffer.wrap("5".getBytes())).qual(ByteBuffer.wrap("7".getBytes())).vis());
+
+ Assert.assertEquals(new Column("f0", "q0", "A&B"),
+ tl.bc().fam("f0".getBytes()).qual("q0".getBytes()).vis("A&B"));
+ Assert.assertEquals(new Column("f0", "q0", "A&C"),
+ tl.bc().fam("f0").qual("q0").vis("A&C".getBytes()));
+ Assert.assertEquals(new Column("5", "7", "A&D"), tl.bc().fam(5).qual(7).vis(Bytes.of("A&D")));
+ Assert.assertEquals(new Column("5", "7", "A&D"),
+ tl.bc().fam(5).qual(7).vis(ByteBuffer.wrap("A&D".getBytes())));
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockSnapshot ms =
+ new MockSnapshot("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+ "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20",
+ "r3,cf3:cq3,28.195", "r4,cf4:cq4,true");
+
+ TypedSnapshot tts = tl.wrap(ms);
+
+ Assert.assertEquals("v1", tts.get().row("r1").fam("cf1").qual("cq1").toString());
+ Assert.assertEquals("v1", tts.get().row("r1").fam("cf1").qual("cq1").toString("b"));
+ Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual("8").toString());
+ Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual("8").toString("b"));
+ Assert.assertEquals("28.195", tts.get().row("r3").fam("cf3").qual("cq3").toString());
+ Assert.assertEquals("28.195", tts.get().row("r3").fam("cf3").qual("cq3").toString("b"));
+ Assert.assertEquals("true", tts.get().row("r4").fam("cf4").qual("cq4").toString());
+ Assert.assertEquals("true", tts.get().row("r4").fam("cf4").qual("cq4").toString("b"));
+
+ // try converting to different types
+ Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual(8).toString());
+ Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual(8).toString("b"));
+ Assert.assertEquals((Integer) 13, tts.get().row("r2").fam("cf2").qual(8).toInteger());
+ Assert.assertEquals(13, tts.get().row("r2").fam("cf2").qual(8).toInteger(14));
+ Assert.assertEquals((Long) 13l, tts.get().row("r2").fam("cf2").qual(8).toLong());
+ Assert.assertEquals(13l, tts.get().row("r2").fam("cf2").qual(8).toLong(14l));
+ Assert.assertEquals("13", new String(tts.get().row("r2").fam("cf2").qual(8).toBytes()));
+ Assert.assertEquals("13",
+ new String(tts.get().row("r2").fam("cf2").qual(8).toBytes("14".getBytes())));
+ Assert
+ .assertEquals("13", new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes()));
+ Assert.assertEquals("13",
+ new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes("14".getBytes())));
+ Assert.assertEquals("13",
+ Bytes.of(tts.get().row("r2").col(new Column("cf2", "8")).toByteBuffer()).toString());
+ Assert.assertEquals(
+ "13",
+ Bytes.of(
+ tts.get().row("r2").col(new Column("cf2", "8"))
+ .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString());
+
+ // test non-existent
+ Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toInteger());
+ Assert.assertEquals(14, tts.get().row("r2").fam("cf3").qual(8).toInteger(14));
+ Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toLong());
+ Assert.assertEquals(14l, tts.get().row("r2").fam("cf3").qual(8).toLong(14l));
+ Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toString());
+ Assert.assertEquals("14", tts.get().row("r2").fam("cf3").qual(8).toString("14"));
+ Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toBytes());
+ Assert.assertEquals("14",
+ new String(tts.get().row("r2").fam("cf3").qual(8).toBytes("14".getBytes())));
+ Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toBytes());
+ Assert.assertEquals("14",
+ new String(tts.get().row("r2").col(new Column("cf3", "8")).toBytes("14".getBytes())));
+ Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toByteBuffer());
+ Assert.assertEquals(
+ "14",
+ Bytes.of(
+ tts.get().row("r2").col(new Column("cf3", "8"))
+ .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString());
+
+ // test float & double
+ Assert.assertEquals((Float) 28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat());
+ Assert.assertEquals(28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat(39.383f), 0.0);
+ Assert.assertEquals((Double) 28.195d, tts.get().row("r3").fam("cf3").qual("cq3").toDouble());
+ Assert.assertEquals(28.195d, tts.get().row("r3").fam("cf3").qual("cq3").toDouble(39.383d), 0.0);
+
+ // test boolean
+ Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean());
+ Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean());
+ Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean(false));
+ Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean(false));
+
+ // try different types for row
+ Assert.assertEquals("20", tts.get().row(13).fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row(13l).fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13".getBytes()).fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row(ByteBuffer.wrap("13".getBytes())).fam("9").qual("17")
+ .toString());
+
+ // try different types for cf
+ Assert.assertEquals("20", tts.get().row("13").fam(9).qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam(9l).qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9".getBytes()).qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam(ByteBuffer.wrap("9".getBytes())).qual("17")
+ .toString());
+
+ // try different types for cq
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17l).toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17).toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17".getBytes()).toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual(ByteBuffer.wrap("17".getBytes()))
+ .toString());
+
+ ms.close();
+ tts.close();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockTransactionBase tt =
+ new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+ "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+ TypedTransactionBase ttx = tl.wrap(tt);
+
+ // test increments data
+ ttx.mutate().row("13").fam("9").qual("17").increment(1);
+ ttx.mutate().row("13").fam("9").qual(18).increment(2);
+ ttx.mutate().row("13").fam("9").qual(19l).increment(3);
+ ttx.mutate().row("13").fam("9").qual("20".getBytes()).increment(4);
+ ttx.mutate().row("13").fam("9").qual(Bytes.of("21")).increment(5); // increment non existent
+ ttx.mutate().row("13").col(new Column("9", "22")).increment(6); // increment non existent
+ ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7); // increment
+ // non
+ // existent
+
+ Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,22", "13,9:19,23",
+ "13,9:20,24", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData);
+ tt.setData.clear();
+
+ // test increments long
+ ttx.mutate().row("13").fam("9").qual("17").increment(1l);
+ ttx.mutate().row("13").fam("9").qual(18).increment(2l);
+ ttx.mutate().row("13").fam("9").qual(19l).increment(3l);
+ ttx.mutate().row("13").fam("9").qual("20".getBytes()).increment(4l);
+ ttx.mutate().row("13").fam("9").qual(Bytes.of("21")).increment(5l); // increment non existent
+ ttx.mutate().row("13").col(new Column("9", "22")).increment(6l); // increment non existent
+ ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7l); // increment
+ // non
+ // existent
+
+ Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,22", "13,9:19,23",
+ "13,9:20,24", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData);
+ tt.setData.clear();
+
+ // test setting data
+ ttx.mutate().row("13").fam("9").qual("16").set();
+ ttx.mutate().row("13").fam("9").qual("17").set(3);
+ ttx.mutate().row("13").fam("9").qual(18).set(4l);
+ ttx.mutate().row("13").fam("9").qual(19l).set("5");
+ ttx.mutate().row("13").fam("9").qual("20".getBytes()).set("6".getBytes());
+ ttx.mutate().row("13").col(new Column("9", "21")).set("7".getBytes());
+ ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes()))
+ .set(ByteBuffer.wrap("8".getBytes()));
+ ttx.mutate().row("13").fam("9").qual("23").set(2.54f);
+ ttx.mutate().row("13").fam("9").qual("24").set(-6.135d);
+ ttx.mutate().row("13").fam("9").qual("25").set(false);
+
+ Assert.assertEquals(MockTransactionBase.toRCVM("13,9:16,", "13,9:17,3", "13,9:18,4",
+ "13,9:19,5", "13,9:20,6", "13,9:21,7", "13,9:22,8", "13,9:23,2.54", "13,9:24,-6.135",
+ "13,9:25,false"), tt.setData);
+ tt.setData.clear();
+
+ // test deleting data
+ ttx.mutate().row("13").fam("9").qual("17").delete();
+ ttx.mutate().row("13").fam("9").qual(18).delete();
+ ttx.mutate().row("13").fam("9").qual(19l).delete();
+ ttx.mutate().row("13").fam("9").qual("20".getBytes()).delete();
+ ttx.mutate().row("13").col(new Column("9", "21")).delete();
+ ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).delete();
+
+ Assert
+ .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20",
+ "13,9:21", "13,9:22"), tt.deletes);
+ tt.deletes.clear();
+ Assert.assertEquals(0, tt.setData.size());
+ Assert.assertEquals(0, tt.weakNotifications.size());
+
+ // test weak notifications
+ ttx.mutate().row("13").fam("9").qual("17").weaklyNotify();
+ ttx.mutate().row("13").fam("9").qual(18).weaklyNotify();
+ ttx.mutate().row("13").fam("9").qual(19l).weaklyNotify();
+ ttx.mutate().row("13").fam("9").qual("20".getBytes()).weaklyNotify();
+ ttx.mutate().row("13").col(new Column("9", "21")).weaklyNotify();
+ ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).weaklyNotify();
+
+ Assert
+ .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20",
+ "13,9:21", "13,9:22"), tt.weakNotifications);
+ tt.weakNotifications.clear();
+ Assert.assertEquals(0, tt.setData.size());
+ Assert.assertEquals(0, tt.deletes.size());
+ }
+
+ @Test
+ public void testMultiRow() throws Exception {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockTransactionBase tt =
+ new MockTransactionBase("11,cf1:cq1,1", "11,cf1:cq2,2", "12,cf1:cq1,3", "12,cf1:cq2,4",
+ "13,cf1:cq1,5", "13,cf1:cq2,6");
+
+ TypedTransactionBase ttx = tl.wrap(tt);
+
+ Bytes br1 = Bytes.of("11");
+ Bytes br2 = Bytes.of("12");
+ Bytes br3 = Bytes.of("13");
+
+ Column c1 = new Column("cf1", "cq1");
+ Column c2 = new Column("cf1", "cq2");
+
+ Map<Bytes, Map<Column, Value>> map1 =
+ ttx.get().rows(Arrays.asList(br1, br2)).columns(c1).toBytesMap();
+
+ Assert.assertEquals(map1, ttx.get().rows(br1, br2).columns(c1).toBytesMap());
+
+ Assert.assertEquals("1", map1.get(br1).get(c1).toString());
+ Assert.assertEquals("1", map1.get(br1).get(c1).toString("5"));
+ Assert.assertEquals((Long) (1l), map1.get(br1).get(c1).toLong());
+ Assert.assertEquals(1l, map1.get(br1).get(c1).toLong(5));
+ Assert.assertEquals((Integer) (1), map1.get(br1).get(c1).toInteger());
+ Assert.assertEquals(1, map1.get(br1).get(c1).toInteger(5));
+
+ Assert.assertEquals("5", map1.get(br3).get(c1).toString("5"));
+ Assert.assertNull(map1.get(br3).get(c1).toString());
+ Assert.assertEquals(5l, map1.get(br3).get(c1).toLong(5l));
+ Assert.assertNull(map1.get(br3).get(c1).toLong());
+ Assert.assertEquals(5, map1.get(br1).get(c2).toInteger(5));
+ Assert.assertNull(map1.get(br1).get(c2).toInteger());
+
+ Assert.assertEquals(2, map1.size());
+ Assert.assertEquals(1, map1.get(br1).size());
+ Assert.assertEquals(1, map1.get(br2).size());
+ Assert.assertEquals("3", map1.get(br2).get(c1).toString());
+
+ Map<String, Map<Column, Value>> map2 =
+ ttx.get().rowsString(Arrays.asList("11", "13")).columns(c1).toStringMap();
+
+ Assert.assertEquals(map2, ttx.get().rowsString("11", "13").columns(c1).toStringMap());
+
+ Assert.assertEquals(2, map2.size());
+ Assert.assertEquals(1, map2.get("11").size());
+ Assert.assertEquals(1, map2.get("13").size());
+ Assert.assertEquals((Long) (1l), map2.get("11").get(c1).toLong());
+ Assert.assertEquals(5l, map2.get("13").get(c1).toLong(6));
+
+ Map<Long, Map<Column, Value>> map3 =
+ ttx.get().rowsLong(Arrays.asList(11l, 13l)).columns(c1).toLongMap();
+
+ Assert.assertEquals(map3, ttx.get().rowsLong(11l, 13l).columns(c1).toLongMap());
+
+ Assert.assertEquals(2, map3.size());
+ Assert.assertEquals(1, map3.get(11l).size());
+ Assert.assertEquals(1, map3.get(13l).size());
+ Assert.assertEquals((Long) (1l), map3.get(11l).get(c1).toLong());
+ Assert.assertEquals(5l, map3.get(13l).get(c1).toLong(6));
+
+ Map<Integer, Map<Column, Value>> map4 =
+ ttx.get().rowsInteger(Arrays.asList(11, 13)).columns(c1).toIntegerMap();
+
+ Assert.assertEquals(map4, ttx.get().rowsInteger(11, 13).columns(c1).toIntegerMap());
+
+ Assert.assertEquals(2, map4.size());
+ Assert.assertEquals(1, map4.get(11).size());
+ Assert.assertEquals(1, map4.get(13).size());
+ Assert.assertEquals((Long) (1l), map4.get(11).get(c1).toLong());
+ Assert.assertEquals(5l, map4.get(13).get(c1).toLong(6));
+
+ Map<Integer, Map<Column, Value>> map5 =
+ ttx.get().rowsBytes(Arrays.asList("11".getBytes(), "13".getBytes())).columns(c1)
+ .toIntegerMap();
+
+ Assert.assertEquals(map5, ttx.get().rowsBytes("11".getBytes(), "13".getBytes()).columns(c1)
+ .toIntegerMap());
+
+ Assert.assertEquals(2, map5.size());
+ Assert.assertEquals(1, map5.get(11).size());
+ Assert.assertEquals(1, map5.get(13).size());
+ Assert.assertEquals((Long) (1l), map5.get(11).get(c1).toLong());
+ Assert.assertEquals(5l, map5.get(13).get(c1).toLong(6));
+
+ Map<Integer, Map<Column, Value>> map6 =
+ ttx.get()
+ .rowsByteBuffers(
+ Arrays.asList(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes())))
+ .columns(c1).toIntegerMap();
+
+ Assert.assertEquals(
+ map6,
+ ttx.get()
+ .rowsByteBuffers(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes()))
+ .columns(c1).toIntegerMap());
+
+ Assert.assertEquals(2, map6.size());
+ Assert.assertEquals(1, map6.get(11).size());
+ Assert.assertEquals(1, map6.get(13).size());
+ Assert.assertEquals((Long) (1l), map6.get(11).get(c1).toLong());
+ Assert.assertEquals(5l, map6.get(13).get(c1).toLong(6));
+
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockTransactionBase tt =
+ new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+ "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+ TypedTransactionBase ttx = tl.wrap(tt);
+
+ Assert.assertEquals(Bytes.of("12"), ttx.get(Bytes.of("r2"), new Column("cf2", "7")));
+ Assert.assertNull(ttx.get(Bytes.of("r2"), new Column("cf2", "9")));
+
+ Map<Column, Bytes> map =
+ ttx.get(Bytes.of("r2"), ImmutableSet.of(new Column("cf2", "7"), new Column("cf2", "8")));
+ Assert.assertEquals(2, map.size());
+ Assert.assertEquals("12", map.get(new Column("cf2", "7")).toString());
+ Assert.assertEquals("13", map.get(new Column("cf2", "8")).toString());
+
+ map = ttx.get(Bytes.of("r6"), ImmutableSet.of(new Column("cf2", "7"), new Column("cf2", "8")));
+ Assert.assertEquals(0, map.size());
+
+ ttx.set(Bytes.of("r6"), new Column("cf2", "7"), Bytes.of("3"));
+ Assert.assertEquals(MockTransactionBase.toRCVM("r6,cf2:7,3"), tt.setData);
+ tt.setData.clear();
+
+ Map<Bytes, Map<Column, Bytes>> map2 =
+ ttx.get(ImmutableSet.of(Bytes.of("r1"), Bytes.of("r2")),
+ ImmutableSet.of(new Column("cf1", "cq1"), new Column("cf2", "8")));
+ Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1,v1", "r2,cf2:8,13"), map2);
+
+ ttx.delete(Bytes.of("r6"), new Column("cf2", "7"));
+ Assert.assertEquals(MockTransactionBase.toRCM("r6,cf2:7"), tt.deletes);
+ tt.deletes.clear();
+
+ ttx.setWeakNotification(Bytes.of("r6"), new Column("cf2", "8"));
+ Assert.assertEquals(MockTransactionBase.toRCM("r6,cf2:8"), tt.weakNotifications);
+ tt.weakNotifications.clear();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/data/RowHasherTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/data/RowHasherTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/data/RowHasherTest.java
deleted file mode 100644
index 3d52c51..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/data/RowHasherTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.data;
-
-import org.apache.fluo.api.data.Bytes;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class RowHasherTest {
-
- @Test
- public void testBadPrefixes() {
- String[] badPrefixes =
- {"q:she6:test1", "q:she6:test1", "p:Mhe6:test1", "p;she6:test1", "p:she6;test1",
- "p;she6;test1", "p:+he6:test1", "p:s?e6:test1", "p:sh{6:test1", "p:sh6:"};
-
- RowHasher rh = new RowHasher("p");
- for (String badPrefix : badPrefixes) {
- try {
- rh.removeHash(Bytes.of(badPrefix));
- Assert.fail();
- } catch (IllegalArgumentException e) {
- }
- }
- }
-
- @Test
- public void testBasic() {
- RowHasher rh = new RowHasher("p");
- Assert.assertTrue(rh.removeHash(rh.addHash("abc")).toString().equals("abc"));
- rh = new RowHasher("p2");
- Assert.assertTrue(rh.removeHash(rh.addHash("abc")).toString().equals("abc"));
-
- Assert.assertTrue(rh.addHash("abc").toString().startsWith("p2:"));
-
- // test to ensure hash is stable over time
- Assert.assertEquals("p2:she6:test1", rh.addHash("test1").toString());
- Assert.assertEquals("p2:hgt0:0123456789abcdefghijklmnopqrstuvwxyz",
- rh.addHash("0123456789abcdefghijklmnopqrstuvwxyz").toString());
- Assert.assertEquals("p2:fluo:86ce3b094982c6a", rh.addHash("86ce3b094982c6a").toString());
- }
-
- @Test
- public void testBalancerRegex() {
- RowHasher rh = new RowHasher("p");
- String regex = rh.getTableOptimizations(3).getTabletGroupingRegex();
- Assert.assertEquals("(\\Qp:\\E).*", regex);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentLoader.java
deleted file mode 100644
index 8fe2b19..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentLoader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.fluo.recipes.types.TypedLoader;
-import org.apache.fluo.recipes.types.TypedTransactionBase;
-
-public class DocumentLoader extends TypedLoader {
-
- String docid;
- String refs[];
-
- DocumentLoader(String docid, String... refs) {
- this.docid = docid;
- this.refs = refs;
- }
-
- @Override
- public void load(TypedTransactionBase tx, Context context) throws Exception {
- tx.mutate().row("d:" + docid).fam("content").qual("new").set(StringUtils.join(refs, " "));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentObserver.java
deleted file mode 100644
index a31461c..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/DocumentObserver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.export.ExportTestBase.RefExporter;
-import org.apache.fluo.recipes.types.TypedObserver;
-import org.apache.fluo.recipes.types.TypedTransactionBase;
-
-public class DocumentObserver extends TypedObserver {
-
- ExportQueue<String, RefUpdates> refExportQueue;
-
- @Override
- public void init(Context context) throws Exception {
- refExportQueue = ExportQueue.getInstance(RefExporter.QUEUE_ID, context.getAppConfiguration());
- }
-
- @Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG);
- }
-
- @Override
- public void process(TypedTransactionBase tx, Bytes row, Column col) {
- String newContent = tx.get().row(row).col(col).toString();
- Set<String> newRefs = new HashSet<>(Arrays.asList(newContent.split(" ")));
- Set<String> currentRefs =
- new HashSet<>(Arrays.asList(tx.get().row(row).fam("content").qual("current").toString("")
- .split(" ")));
-
- Set<String> addedRefs = new HashSet<>(newRefs);
- addedRefs.removeAll(currentRefs);
-
- Set<String> deletedRefs = new HashSet<>(currentRefs);
- deletedRefs.removeAll(newRefs);
-
- String key = row.toString().substring(2);
- RefUpdates val = new RefUpdates(addedRefs, deletedRefs);
-
- refExportQueue.add(tx, key, val);
-
- tx.mutate().row(row).fam("content").qual("current").set(newContent);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportBufferIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportBufferIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportBufferIT.java
deleted file mode 100644
index 53d838d..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportBufferIT.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Transaction;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ExportBufferIT extends ExportTestBase {
-
- @Override
- protected int getNumBuckets() {
- return 2;
- }
-
- @Override
- protected Integer getBufferSize() {
- return 1024;
- }
-
- @Test
- public void testSmallExportBuffer() {
- // try setting the export buffer size small. Make sure everything is exported.
-
- try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
- ExportQueue<String, RefUpdates> refExportQueue =
- ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
- try (Transaction tx = fc.newTransaction()) {
- for (int i = 0; i < 1000; i++) {
- refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 10, i + 20), ns(new int[0])));
- }
-
- tx.commit();
- }
- }
-
- miniFluo.waitForObservers();
-
- Map<String, Set<String>> erefs = getExportedReferees();
- Map<String, Set<String>> expected = new HashMap<>();
-
- for (int i = 0; i < 1000; i++) {
- expected.computeIfAbsent(nk(i + 10), s -> new HashSet<>()).add(nk(i));
- expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
- }
-
- assertEquals(expected, erefs);
- int prevNumExportCalls = getNumExportCalls();
- Assert.assertTrue(prevNumExportCalls > 10); // with small buffer there should be lots of exports
- // calls
-
- try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
- ExportQueue<String, RefUpdates> refExportQueue =
- ExportQueue.getInstance(RefExporter.QUEUE_ID, fc.getAppConfiguration());
- try (Transaction tx = fc.newTransaction()) {
- for (int i = 0; i < 1000; i++) {
- refExportQueue.add(tx, nk(i), new RefUpdates(ns(i + 12), ns(i + 10)));
- }
-
- tx.commit();
- }
- }
-
- miniFluo.waitForObservers();
-
- erefs = getExportedReferees();
- expected = new HashMap<>();
-
- for (int i = 0; i < 1000; i++) {
- expected.computeIfAbsent(nk(i + 12), s -> new HashSet<>()).add(nk(i));
- expected.computeIfAbsent(nk(i + 20), s -> new HashSet<>()).add(nk(i));
- }
-
- assertEquals(expected, erefs);
- prevNumExportCalls = getNumExportCalls() - prevNumExportCalls;
- Assert.assertTrue(prevNumExportCalls > 10);
- }
-
- public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual) {
- if (!expected.equals(actual)) {
- System.out.println("*** diff ***");
- diff(expected, actual);
- Assert.fail();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportQueueIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportQueueIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportQueueIT.java
deleted file mode 100644
index baa979a..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportQueueIT.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.LoaderExecutor;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ExportQueueIT extends ExportTestBase {
-
- @Test
- public void testExport() {
- try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
- try (LoaderExecutor loader = fc.newLoaderExecutor()) {
- loader.execute(new DocumentLoader("0999", "0005", "0002"));
- loader.execute(new DocumentLoader("0002", "0999", "0042"));
- loader.execute(new DocumentLoader("0005", "0999", "0042"));
- loader.execute(new DocumentLoader("0042", "0999"));
- }
-
- miniFluo.waitForObservers();
-
- Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
- Assert.assertEquals(ns("0999"), getExportedReferees("0002"));
- Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
- Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
-
- try (LoaderExecutor loader = fc.newLoaderExecutor()) {
- loader.execute(new DocumentLoader("0999", "0005", "0042"));
- }
-
- try (LoaderExecutor loader = fc.newLoaderExecutor()) {
- loader.execute(new DocumentLoader("0999", "0005"));
- }
-
- miniFluo.waitForObservers();
-
- Assert.assertEquals(ns("0002", "0005", "0042"), getExportedReferees("0999"));
- Assert.assertEquals(ns(new String[0]), getExportedReferees("0002"));
- Assert.assertEquals(ns("0999"), getExportedReferees("0005"));
- Assert.assertEquals(ns("0002", "0005"), getExportedReferees("0042"));
-
- try (LoaderExecutor loader = fc.newLoaderExecutor()) {
- loader.execute(new DocumentLoader("0042", "0999", "0002", "0005"));
- loader.execute(new DocumentLoader("0005", "0002"));
- }
-
- try (LoaderExecutor loader = fc.newLoaderExecutor()) {
- loader.execute(new DocumentLoader("0005", "0003"));
- }
-
- miniFluo.waitForObservers();
-
- Assert.assertEquals(ns("0002", "0042"), getExportedReferees("0999"));
- Assert.assertEquals(ns("0042"), getExportedReferees("0002"));
- Assert.assertEquals(ns("0005"), getExportedReferees("0003"));
- Assert.assertEquals(ns("0999", "0042"), getExportedReferees("0005"));
- Assert.assertEquals(ns("0002"), getExportedReferees("0042"));
-
- }
- }
-
- @Test
- public void exportStressTest() {
- FluoConfiguration config = new FluoConfiguration(miniFluo.getClientConfiguration());
- config.setLoaderQueueSize(100);
- config.setLoaderThreads(20);
-
- try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-
- loadRandom(fc, 1000, 500);
-
- miniFluo.waitForObservers();
-
- diff(getFluoReferees(fc), getExportedReferees());
-
- assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-
- loadRandom(fc, 1000, 500);
-
- miniFluo.waitForObservers();
-
- assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-
- loadRandom(fc, 1000, 10000);
-
- miniFluo.waitForObservers();
-
- assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
-
- loadRandom(fc, 1000, 10000);
-
- miniFluo.waitForObservers();
-
- assertEquals(getFluoReferees(fc), getExportedReferees(), fc);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportTestBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportTestBase.java
deleted file mode 100644
index 2584df4..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/ExportTestBase.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-
-import com.google.common.collect.Iterators;
-import org.apache.commons.io.FileUtils;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.LoaderExecutor;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-
-public class ExportTestBase {
-
- private static Map<String, Map<String, RefInfo>> globalExports = new HashMap<>();
- private static int exportCalls = 0;
-
- protected static Set<String> getExportedReferees(String node) {
- synchronized (globalExports) {
- Set<String> ret = new HashSet<>();
-
- Map<String, RefInfo> referees = globalExports.get(node);
-
- if (referees == null) {
- return ret;
- }
-
- referees.forEach((k, v) -> {
- if (!v.deleted)
- ret.add(k);
- });
-
- return ret;
- }
- }
-
- protected static Map<String, Set<String>> getExportedReferees() {
- synchronized (globalExports) {
-
- Map<String, Set<String>> ret = new HashMap<>();
-
- for (String k : globalExports.keySet()) {
- Set<String> referees = getExportedReferees(k);
- if (referees.size() > 0) {
- ret.put(k, referees);
- }
- }
-
- return ret;
- }
- }
-
- protected static int getNumExportCalls() {
- synchronized (globalExports) {
- return exportCalls;
- }
- }
-
- public static class RefExporter extends Exporter<String, RefUpdates> {
-
- public static final String QUEUE_ID = "req";
-
- private void updateExports(String key, long seq, String addedRef, boolean deleted) {
- Map<String, RefInfo> referees = globalExports.computeIfAbsent(addedRef, k -> new HashMap<>());
- referees.compute(key, (k, v) -> (v == null || v.seq < seq) ? new RefInfo(seq, deleted) : v);
- }
-
- @Override
- protected void processExports(Iterator<SequencedExport<String, RefUpdates>> exportIterator) {
- ArrayList<SequencedExport<String, RefUpdates>> exportList = new ArrayList<>();
- Iterators.addAll(exportList, exportIterator);
-
- synchronized (globalExports) {
- exportCalls++;
-
- for (SequencedExport<String, RefUpdates> se : exportList) {
- for (String addedRef : se.getValue().getAddedRefs()) {
- updateExports(se.getKey(), se.getSequence(), addedRef, false);
- }
-
- for (String deletedRef : se.getValue().getDeletedRefs()) {
- updateExports(se.getKey(), se.getSequence(), deletedRef, true);
- }
- }
- }
- }
- }
-
- protected MiniFluo miniFluo;
-
- protected int getNumBuckets() {
- return 13;
- }
-
- protected Integer getBufferSize() {
- return null;
- }
-
- @Before
- public void setUpFluo() throws Exception {
- FileUtils.deleteQuietly(new File("target/mini"));
-
- FluoConfiguration props = new FluoConfiguration();
- props.setApplicationName("eqt");
- props.setWorkerThreads(20);
- props.setMiniDataDir("target/mini");
-
- ObserverConfiguration doc = new ObserverConfiguration(DocumentObserver.class.getName());
- props.addObserver(doc);
-
- SimpleSerializer.setSetserlializer(props, GsonSerializer.class);
-
- ExportQueue.Options exportQueueOpts =
- new ExportQueue.Options(RefExporter.QUEUE_ID, RefExporter.class, String.class,
- RefUpdates.class, getNumBuckets());
-
- if (getBufferSize() != null) {
- exportQueueOpts.setBufferSize(getBufferSize());
- }
-
- ExportQueue.configure(props, exportQueueOpts);
-
- miniFluo = FluoFactory.newMiniFluo(props);
-
- globalExports.clear();
- exportCalls = 0;
- }
-
- @After
- public void tearDownFluo() throws Exception {
- if (miniFluo != null) {
- miniFluo.close();
- }
- }
-
- protected static Set<String> ns(String... sa) {
- return new HashSet<>(Arrays.asList(sa));
- }
-
- protected static String nk(int i) {
- return String.format("%06d", i);
- }
-
- protected static Set<String> ns(int... ia) {
- HashSet<String> ret = new HashSet<>();
- for (int i : ia) {
- ret.add(nk(i));
- }
- return ret;
- }
-
- public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual,
- FluoClient fc) {
- if (!expected.equals(actual)) {
- System.out.println("*** diff ***");
- diff(expected, actual);
- System.out.println("*** fluo dump ***");
- dump(fc);
- System.out.println("*** map dump ***");
-
- Assert.fail();
- }
- }
-
- protected void loadRandom(FluoClient fc, int num, int maxDocId) {
- try (LoaderExecutor loader = fc.newLoaderExecutor()) {
- Random rand = new Random();
-
- for (int i = 0; i < num; i++) {
- String docid = String.format("%05d", rand.nextInt(maxDocId));
- String[] refs = new String[rand.nextInt(20) + 1];
- for (int j = 0; j < refs.length; j++) {
- refs[j] = String.format("%05d", rand.nextInt(maxDocId));
- }
-
- loader.execute(new DocumentLoader(docid, refs));
- }
- }
- }
-
- protected void diff(Map<String, Set<String>> fr, Map<String, Set<String>> er) {
- HashSet<String> allKeys = new HashSet<>(fr.keySet());
- allKeys.addAll(er.keySet());
-
- for (String k : allKeys) {
- Set<String> s1 = fr.getOrDefault(k, Collections.emptySet());
- Set<String> s2 = er.getOrDefault(k, Collections.emptySet());
-
- HashSet<String> sub1 = new HashSet<>(s1);
- sub1.removeAll(s2);
-
- HashSet<String> sub2 = new HashSet<>(s2);
- sub2.removeAll(s1);
-
- if (sub1.size() > 0 || sub2.size() > 0) {
- System.out.println(k + " " + sub1 + " " + sub2);
- }
-
- }
- }
-
- protected Map<String, Set<String>> getFluoReferees(FluoClient fc) {
- Map<String, Set<String>> fluoReferees = new HashMap<>();
-
- try (Snapshot snap = fc.newSnapshot()) {
- ScannerConfiguration scannerConfig = new ScannerConfiguration();
- scannerConfig.fetchColumn(Bytes.of("content"), Bytes.of("current"));
- scannerConfig.setSpan(Span.prefix("d:"));
- RowIterator scanner = snap.get(scannerConfig);
- while (scanner.hasNext()) {
- Entry<Bytes, ColumnIterator> row = scanner.next();
- ColumnIterator colIter = row.getValue();
-
- String docid = row.getKey().toString().substring(2);
-
- while (colIter.hasNext()) {
- Entry<Column, Bytes> entry = colIter.next();
-
- String[] refs = entry.getValue().toString().split(" ");
-
- for (String ref : refs) {
- if (ref.isEmpty())
- continue;
-
- fluoReferees.computeIfAbsent(ref, k -> new HashSet<>()).add(docid);
- }
- }
- }
- }
- return fluoReferees;
- }
-
- public static void dump(FluoClient fc) {
- try (Snapshot snap = fc.newSnapshot()) {
- RowIterator scanner = snap.get(new ScannerConfiguration());
- while (scanner.hasNext()) {
- Entry<Bytes, ColumnIterator> row = scanner.next();
- ColumnIterator colIter = row.getValue();
-
- while (colIter.hasNext()) {
- Entry<Column, Bytes> entry = colIter.next();
-
- System.out.println("row:[" + row.getKey() + "] col:[" + entry.getKey() + "] val:["
- + entry.getValue() + "]");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/GsonSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/GsonSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/GsonSerializer.java
deleted file mode 100644
index 61bb833..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/GsonSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.nio.charset.StandardCharsets;
-
-import com.google.gson.Gson;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-
-public class GsonSerializer implements SimpleSerializer {
-
- private Gson gson = new Gson();
-
- @Override
- public void init(SimpleConfiguration appConfig) {
-
- }
-
- @Override
- public <T> byte[] serialize(T obj) {
- return gson.toJson(obj).getBytes(StandardCharsets.UTF_8);
- }
-
- @Override
- public <T> T deserialize(byte[] serObj, Class<T> clazz) {
- return gson.fromJson(new String(serObj, StandardCharsets.UTF_8), clazz);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/OptionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/OptionsTest.java
deleted file mode 100644
index 16e853b..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/OptionsTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.recipes.export.ExportQueue.Options;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class OptionsTest {
- @Test
- public void testExportQueueOptions() {
- FluoConfiguration conf = new FluoConfiguration();
-
- ExportQueue.configure(conf, new Options("Q1", "ET", "KT", "VT", 100));
- ExportQueue.configure(conf, new Options("Q2", "ET2", "KT2", "VT2", 200).setBucketsPerTablet(20)
- .setBufferSize(1000000));
-
- Options opts1 = new Options("Q1", conf.getAppConfiguration());
-
- Assert.assertEquals(opts1.exporterType, "ET");
- Assert.assertEquals(opts1.keyType, "KT");
- Assert.assertEquals(opts1.valueType, "VT");
- Assert.assertEquals(opts1.numBuckets, 100);
- Assert.assertEquals(opts1.bucketsPerTablet.intValue(), Options.DEFAULT_BUCKETS_PER_TABLET);
- Assert.assertEquals(opts1.bufferSize.intValue(), Options.DEFAULT_BUFFER_SIZE);
-
- Options opts2 = new Options("Q2", conf.getAppConfiguration());
-
- Assert.assertEquals(opts2.exporterType, "ET2");
- Assert.assertEquals(opts2.keyType, "KT2");
- Assert.assertEquals(opts2.valueType, "VT2");
- Assert.assertEquals(opts2.numBuckets, 200);
- Assert.assertEquals(opts2.bucketsPerTablet.intValue(), 20);
- Assert.assertEquals(opts2.bufferSize.intValue(), 1000000);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/RefInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/RefInfo.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/RefInfo.java
deleted file mode 100644
index abc518c..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/RefInfo.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-class RefInfo {
- long seq;
- boolean deleted;
-
- public RefInfo(long seq, boolean deleted) {
- this.seq = seq;
- this.deleted = deleted;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/export/RefUpdates.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/export/RefUpdates.java b/modules/core/src/test/java/org/apache/fluo/recipes/export/RefUpdates.java
deleted file mode 100644
index 31c61b7..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/export/RefUpdates.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Set;
-
-public class RefUpdates {
- private Set<String> addedRefs;
- private Set<String> deletedRefs;
-
- public RefUpdates() {}
-
- public RefUpdates(Set<String> addedRefs, Set<String> deletedRefs) {
- this.addedRefs = addedRefs;
- this.deletedRefs = deletedRefs;
- }
-
- public Set<String> getAddedRefs() {
- return addedRefs;
- }
-
- public Set<String> getDeletedRefs() {
- return deletedRefs;
- }
-
- @Override
- public String toString() {
- return "added:" + addedRefs + " deleted:" + deletedRefs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/test/java/org/apache/fluo/recipes/map/BigUpdateIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/map/BigUpdateIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/map/BigUpdateIT.java
deleted file mode 100644
index d6409a6..0000000
--- a/modules/core/src/test/java/org/apache/fluo/recipes/map/BigUpdateIT.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.map;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import org.apache.commons.io.FileUtils;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-import org.apache.fluo.recipes.types.StringEncoder;
-import org.apache.fluo.recipes.types.TypeLayer;
-import org.apache.fluo.recipes.types.TypedSnapshot;
-import org.apache.fluo.recipes.types.TypedTransactionBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * This test configures a small buffer size and verifies that multiple passes are made to process
- * updates.
- */
-public class BigUpdateIT {
- private static final TypeLayer tl = new TypeLayer(new StringEncoder());
-
- private MiniFluo miniFluo;
-
- private CollisionFreeMap<String, Long> wcMap;
-
- static final String MAP_ID = "bu";
-
- public static class LongCombiner implements Combiner<String, Long> {
-
- @Override
- public Optional<Long> combine(String key, Iterator<Long> updates) {
- long[] count = new long[] {0};
- updates.forEachRemaining(l -> count[0] += l);
- return Optional.of(count[0]);
- }
- }
-
- static final Column DSCOL = new Column("debug", "sum");
-
- private static AtomicInteger globalUpdates = new AtomicInteger(0);
-
- public static class MyObserver extends UpdateObserver<String, Long> {
-
- @Override
- public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) {
- TypedTransactionBase ttx = tl.wrap(tx);
-
- Map<String, Long> expectedOld = new HashMap<>();
-
-
- while (updates.hasNext()) {
- Update<String, Long> update = updates.next();
-
- if (update.getOldValue().isPresent()) {
- expectedOld.put("side:" + update.getKey(), update.getOldValue().get());
- }
-
- ttx.mutate().row("side:" + update.getKey()).col(DSCOL).set(update.getNewValue().get());
- }
-
- // get last values set to verify same as passed in old value
- Map<String, Long> actualOld =
- Maps.transformValues(
- ttx.get().rowsString(expectedOld.keySet()).columns(ImmutableSet.of(DSCOL))
- .toStringMap(), m -> m.get(DSCOL).toLong());
-
- MapDifference<String, Long> diff = Maps.difference(expectedOld, actualOld);
-
- Assert.assertTrue(diff.toString(), diff.areEqual());
-
- globalUpdates.incrementAndGet();
- }
- }
-
- @Before
- public void setUpFluo() throws Exception {
- FileUtils.deleteQuietly(new File("target/mini"));
-
- FluoConfiguration props = new FluoConfiguration();
- props.setApplicationName("eqt");
- props.setWorkerThreads(20);
- props.setMiniDataDir("target/mini");
-
- SimpleSerializer.setSetserlializer(props, TestSerializer.class);
-
- CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, LongCombiner.class,
- MyObserver.class, String.class, Long.class, 2).setBufferSize(1 << 10));
-
- miniFluo = FluoFactory.newMiniFluo(props);
-
- wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration());
-
- globalUpdates.set(0);
- }
-
- @After
- public void tearDownFluo() throws Exception {
- if (miniFluo != null) {
- miniFluo.close();
- }
- }
-
- @Test
- public void testBigUpdates() {
- try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
- updateMany(fc);
-
- miniFluo.waitForObservers();
-
- int numUpdates = 0;
-
- try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
- checkUpdates(snap, 1, 1000);
- numUpdates = globalUpdates.get();
- // there are two buckets, expect update processing at least twice per bucket
- Assert.assertTrue(numUpdates >= 4);
- }
-
- updateMany(fc);
- updateMany(fc);
-
- miniFluo.waitForObservers();
-
- try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
- checkUpdates(snap, 3, 1000);
- numUpdates = globalUpdates.get() - numUpdates;
- Assert.assertTrue(numUpdates >= 4);
- }
-
- for (int i = 0; i < 10; i++) {
- updateMany(fc);
- }
-
- miniFluo.waitForObservers();
-
- try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) {
- checkUpdates(snap, 13, 1000);
- numUpdates = globalUpdates.get() - numUpdates;
- Assert.assertTrue(numUpdates >= 4);
- }
- }
- }
-
- private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) {
- RowIterator iter = snap.get(new ScannerConfiguration().setSpan(Span.prefix("side:")));
-
- int row = 0;
-
- while (iter.hasNext()) {
- Entry<Bytes, ColumnIterator> entry = iter.next();
-
- Assert.assertEquals(String.format("side:%06d", row++), entry.getKey().toString());
-
- ColumnIterator colITer = entry.getValue();
- while (colITer.hasNext()) {
- Entry<Column, Bytes> entry2 = colITer.next();
- Assert.assertEquals(new Column("debug", "sum"), entry2.getKey());
- Assert.assertEquals("row : " + entry.getKey(), "" + expectedVal, entry2.getValue()
- .toString());
- }
- }
-
- Assert.assertEquals(expectedRows, row);
- }
-
- private void updateMany(FluoClient fc) {
- try (Transaction tx = fc.newTransaction()) {
- Map<String, Long> updates = new HashMap<>();
- for (int i = 0; i < 1000; i++) {
- updates.put(String.format("%06d", i), 1L);
- }
-
- wcMap.update(tx, updates);
- tx.commit();
- }
- }
-}