You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/18 18:00:49 UTC
[2/2] incubator-fluo-recipes git commit: Updated to use new scanner
API from apache/incubator-fluo#639
Updated to use new scanner API from apache/incubator-fluo#639
Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/18933f22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/18933f22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/18933f22
Branch: refs/heads/master
Commit: 18933f226b4b4adfc3e1e5487927b148583ce176
Parents: 22354d0
Author: Keith Turner <kt...@apache.org>
Authored: Fri Jul 15 15:59:21 2016 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jul 15 18:59:15 2016 -0400
----------------------------------------------------------------------
.../fluo/recipes/core/export/ExportBucket.java | 35 ++--
.../fluo/recipes/core/map/CollisionFreeMap.java | 33 ++-
.../transaction/RecordingTransactionBase.java | 209 +++++++++++++++----
.../recipes/core/types/TypedSnapshotBase.java | 13 +-
.../recipes/core/export/ExportTestBase.java | 41 ++--
.../fluo/recipes/core/map/BigUpdateIT.java | 27 +--
.../recipes/core/map/CollisionFreeMapIT.java | 39 ++--
.../transaction/RecordingTransactionTest.java | 113 +++++-----
.../recipes/core/types/MockSnapshotBase.java | 5 +-
.../apache/fluo/recipes/test/FluoITHelper.java | 73 +++----
pom.xml | 2 +-
11 files changed, 325 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
index cf6dfb4..c0dae48 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
@@ -15,19 +15,16 @@
package org.apache.fluo.recipes.core.export;
-import java.util.Collections;
import java.util.Iterator;
-import java.util.Map.Entry;
import com.google.common.base.Preconditions;
import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.recipes.core.impl.BucketUtil;
import org.apache.fluo.recipes.core.types.StringEncoder;
import org.apache.fluo.recipes.core.types.TypeLayer;
@@ -123,35 +120,29 @@ class ExportBucket {
}
public Iterator<ExportEntry> getExportIterator(Bytes continueRow) {
- ScannerConfiguration sc = new ScannerConfiguration();
-
+ Span span;
if (continueRow != null) {
Span tmpSpan = Span.prefix(bucketRow);
Span nextSpan =
new Span(new RowColumn(continueRow, EXPORT_COL), true, tmpSpan.getEnd(),
tmpSpan.isEndInclusive());
- sc.setSpan(nextSpan);
+ span = nextSpan;
} else {
- sc.setSpan(Span.prefix(bucketRow));
+ span = Span.prefix(bucketRow);
}
- sc.fetchColumn(EXPORT_COL.getFamily(), EXPORT_COL.getQualifier());
- RowIterator iter = ttx.get(sc);
+ CellScanner scanner = ttx.scanner().over(span).fetch(EXPORT_COL).build();
- if (iter.hasNext()) {
- return new ExportIterator(iter);
- } else {
- return Collections.<ExportEntry>emptySet().iterator();
- }
+ return new ExportIterator(scanner);
}
private class ExportIterator implements Iterator<ExportEntry> {
- private RowIterator rowIter;
+ private Iterator<RowColumnValue> rowIter;
private Bytes lastRow;
- public ExportIterator(RowIterator rowIter) {
- this.rowIter = rowIter;
+ public ExportIterator(CellScanner scanner) {
+ this.rowIter = scanner.iterator();
}
@Override
@@ -161,8 +152,8 @@ class ExportBucket {
@Override
public ExportEntry next() {
- Entry<Bytes, ColumnIterator> rowCol = rowIter.next();
- Bytes row = rowCol.getKey();
+ RowColumnValue rowColVal = rowIter.next();
+ Bytes row = rowColVal.getRow();
Bytes keyBytes = row.subSequence(bucketRow.length() + 1, row.length() - 8);
Bytes seqBytes = row.subSequence(row.length() - 8, row.length());
@@ -172,7 +163,7 @@ class ExportBucket {
ee.key = keyBytes.toArray();
ee.seq = decodeSeq(seqBytes);
// TODO maybe leave as Bytes?
- ee.value = rowCol.getValue().next().getValue().toArray();
+ ee.value = rowColVal.getValue().toArray();
lastRow = row;
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
index 2fe4a7c..c6c0918 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
@@ -37,7 +37,6 @@ import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.BytesBuilder;
@@ -45,8 +44,6 @@ import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.recipes.core.common.TableOptimizations;
import org.apache.fluo.recipes.core.common.RowRange;
import org.apache.fluo.recipes.core.common.TransientRegistry;
@@ -113,7 +110,7 @@ public class CollisionFreeMap<K, V> {
Bytes nextKey = tx.get(ntfyRow, NEXT_COL);
- ScannerConfiguration sc = new ScannerConfiguration();
+ Span span;
if (nextKey != null) {
Bytes startRow =
@@ -123,14 +120,14 @@ public class CollisionFreeMap<K, V> {
Span nextSpan =
new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(),
tmpSpan.isEndInclusive());
- sc.setSpan(nextSpan);
+ span = nextSpan;
} else {
- sc.setSpan(Span.prefix(ntfyRow));
+ span = Span.prefix(ntfyRow);
}
- sc.setSpan(Span.prefix(ntfyRow));
- sc.fetchColumn(UPDATE_COL.getFamily(), UPDATE_COL.getQualifier());
- RowIterator iter = tx.get(sc);
+ // TODO
+ span = Span.prefix(ntfyRow);
+ Iterator<RowColumnValue> iter = tx.scanner().over(span).fetch(UPDATE_COL).build().iterator();
Map<Bytes, List<Bytes>> updates = new HashMap<>();
@@ -141,8 +138,8 @@ public class CollisionFreeMap<K, V> {
if (iter.hasNext()) {
Bytes lastKey = null;
while (iter.hasNext() && approxMemUsed < bufferSize) {
- Entry<Bytes, ColumnIterator> rowCol = iter.next();
- Bytes curRow = rowCol.getKey();
+ RowColumnValue rcv = iter.next();
+ Bytes curRow = rcv.getRow();
tx.delete(curRow, UPDATE_COL);
@@ -155,7 +152,7 @@ public class CollisionFreeMap<K, V> {
updates.put(serializedKey, updateList);
}
- Bytes val = rowCol.getValue().next().getValue();
+ Bytes val = rcv.getValue();
updateList.add(val);
approxMemUsed += curRow.length();
@@ -163,8 +160,8 @@ public class CollisionFreeMap<K, V> {
}
if (iter.hasNext()) {
- Entry<Bytes, ColumnIterator> rowCol = iter.next();
- Bytes curRow = rowCol.getKey();
+ RowColumnValue rcv = iter.next();
+ Bytes curRow = rcv.getRow();
// check if more updates for last key
if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) {
@@ -293,15 +290,13 @@ public class CollisionFreeMap<K, V> {
BytesBuilder rowBuilder = Bytes.newBuilder();
rowBuilder.append(mapId).append(":u:").append(bucketId).append(":").append(k);
- ScannerConfiguration sc = new ScannerConfiguration();
- sc.setSpan(Span.prefix(rowBuilder.toBytes()));
-
- RowIterator iter = tx.get(sc);
+ Iterator<RowColumnValue> iter =
+ tx.scanner().over(Span.prefix(rowBuilder.toBytes())).build().iterator();
Iterator<V> ui;
if (iter.hasNext()) {
- ui = Iterators.transform(iter, e -> deserVal(e.getValue().next().getValue()));
+ ui = Iterators.transform(iter, rcv -> deserVal(rcv.getValue()));
} else {
ui = Collections.<V>emptyList().iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
index a29cf88..e3b80b7 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
@@ -15,20 +15,26 @@
package org.apache.fluo.recipes.core.transaction;
-import java.util.AbstractMap;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
+import com.google.common.collect.Iterators;
import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.client.scanner.RowScannerBuilder;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.AlreadySetException;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
/**
* An implementation of {@link TransactionBase} that logs all transactions operations (GET, SET, or
@@ -136,50 +142,167 @@ public class RecordingTransactionBase implements TransactionBase {
return rowColVal;
}
+ private class RtxIterator implements Iterator<RowColumnValue> {
+
+ private Iterator<RowColumnValue> iter;
+
+ public RtxIterator(Iterator<RowColumnValue> iterator) {
+ this.iter = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public RowColumnValue next() {
+ RowColumnValue rcv = iter.next();
+ txLog.filteredAdd(LogEntry.newGet(rcv.getRow(), rcv.getColumn(), rcv.getValue()), filter);
+ return rcv;
+ }
+
+ }
+
+ private class RtxCellSanner implements CellScanner {
+
+ private CellScanner scanner;
+
+ public RtxCellSanner(CellScanner scanner) {
+ this.scanner = scanner;
+ }
+
+ @Override
+ public Iterator<RowColumnValue> iterator() {
+ return new RtxIterator(scanner.iterator());
+ }
+
+ }
+
+ private class RtxCVIterator implements Iterator<ColumnValue> {
+
+ private Iterator<ColumnValue> iter;
+ private Bytes row;
+
+ public RtxCVIterator(Bytes row, Iterator<ColumnValue> iterator) {
+ this.row = row;
+ this.iter = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ColumnValue next() {
+ ColumnValue cv = iter.next();
+ txLog.filteredAdd(LogEntry.newGet(row, cv.getColumn(), cv.getValue()), filter);
+ return cv;
+ }
+
+ }
+
+ private class RtxColumnScanner implements ColumnScanner {
+
+ private ColumnScanner cs;
+
+ public RtxColumnScanner(ColumnScanner cs) {
+ this.cs = cs;
+ }
+
+ @Override
+ public Iterator<ColumnValue> iterator() {
+ return new RtxCVIterator(cs.getRow(), cs.iterator());
+ }
+
+ @Override
+ public Bytes getRow() {
+ return cs.getRow();
+ }
+
+ @Override
+ public String getsRow() {
+ return cs.getsRow();
+ }
+
+ }
+
+ private class RtxRowScanner implements RowScanner {
+
+ private RowScanner scanner;
+
+ public RtxRowScanner(RowScanner scanner) {
+ this.scanner = scanner;
+ }
+
+ @Override
+ public Iterator<ColumnScanner> iterator() {
+ return Iterators.transform(scanner.iterator(), cs -> new RtxColumnScanner(cs));
+ }
+
+ }
+
+ private class RtxRowScannerBuilder implements RowScannerBuilder {
+
+ private RowScannerBuilder rsb;
+
+ public RtxRowScannerBuilder(RowScannerBuilder rsb) {
+ this.rsb = rsb;
+ }
+
+ @Override
+ public RowScanner build() {
+ return new RtxRowScanner(rsb.build());
+ }
+
+ }
+
+ private class RtxScannerBuilder implements ScannerBuilder {
+
+ private ScannerBuilder sb;
+
+ public RtxScannerBuilder(ScannerBuilder sb) {
+ this.sb = sb;
+ }
+
+ @Override
+ public ScannerBuilder over(Span span) {
+ sb = sb.over(span);
+ return this;
+ }
+
+ @Override
+ public ScannerBuilder fetch(Column... columns) {
+ sb = sb.fetch(columns);
+ return this;
+ }
+
+ @Override
+ public ScannerBuilder fetch(Collection<Column> columns) {
+ sb = sb.fetch(columns);
+ return this;
+ }
+
+ @Override
+ public CellScanner build() {
+ return new RtxCellSanner(sb.build());
+ }
+
+ @Override
+ public RowScannerBuilder byRow() {
+ return new RtxRowScannerBuilder(sb.byRow());
+ }
+
+ }
+
/**
* Logs GETs for Row/Columns returned by iterators. Requests that return no data will not be
* logged.
*/
@Override
- public RowIterator get(ScannerConfiguration config) {
- final RowIterator rowIter = txb.get(config);
- if (rowIter != null) {
- return new RowIterator() {
-
- @Override
- public boolean hasNext() {
- return rowIter.hasNext();
- }
-
- @Override
- public Map.Entry<Bytes, ColumnIterator> next() {
- final Map.Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
- if ((rowEntry != null) && (rowEntry.getValue() != null)) {
- final ColumnIterator colIter = rowEntry.getValue();
- return new AbstractMap.SimpleEntry<>(rowEntry.getKey(), new ColumnIterator() {
-
- @Override
- public boolean hasNext() {
- return colIter.hasNext();
- }
-
- @Override
- public Map.Entry<Column, Bytes> next() {
- Map.Entry<Column, Bytes> colEntry = colIter.next();
- if (colEntry != null) {
- txLog.filteredAdd(
- LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()),
- filter);
- }
- return colEntry;
- }
- });
- }
- return rowEntry;
- }
- };
- }
- return rowIter;
+ public ScannerBuilder scanner() {
+ return new RtxScannerBuilder(txb.scanner());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
index 7764e67..1f5eb48 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
@@ -29,11 +29,10 @@ import java.util.Set;
import com.google.common.collect.Maps;
import org.apache.commons.collections.map.DefaultedMap;
import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.recipes.core.types.TypeLayer.Data;
import org.apache.fluo.recipes.core.types.TypeLayer.FamilyMethods;
import org.apache.fluo.recipes.core.types.TypeLayer.QualifierMethods;
@@ -509,11 +508,6 @@ public class TypedSnapshotBase implements SnapshotBase {
}
@Override
- public RowIterator get(ScannerConfiguration config) {
- return snapshot.get(config);
- }
-
- @Override
public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
return snapshot.get(rows, columns);
}
@@ -522,6 +516,11 @@ public class TypedSnapshotBase implements SnapshotBase {
return new ValueRowMethods();
}
+ @Override
+ public ScannerBuilder scanner() {
+ return snapshot.scanner();
+ }
+
@SuppressWarnings({"unchecked"})
private Map<Column, Value> wrap(Map<Column, Bytes> map) {
Map<Column, Value> ret = Maps.transformValues(map, input -> new Value(input));
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
index c1cf3ce..4493fdf 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/ExportTestBase.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
@@ -33,14 +32,14 @@ import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
import org.junit.After;
@@ -240,20 +239,15 @@ public class ExportTestBase {
Map<String, Set<String>> fluoReferees = new HashMap<>();
try (Snapshot snap = fc.newSnapshot()) {
- ScannerConfiguration scannerConfig = new ScannerConfiguration();
- scannerConfig.fetchColumn(Bytes.of("content"), Bytes.of("current"));
- scannerConfig.setSpan(Span.prefix("d:"));
- RowIterator scanner = snap.get(scannerConfig);
- while (scanner.hasNext()) {
- Entry<Bytes, ColumnIterator> row = scanner.next();
- ColumnIterator colIter = row.getValue();
- String docid = row.getKey().toString().substring(2);
+ Column currCol = new Column("content", "current");
+ RowScanner rowScanner = snap.scanner().over(Span.prefix("d:")).fetch(currCol).byRow().build();
- while (colIter.hasNext()) {
- Entry<Column, Bytes> entry = colIter.next();
+ for (ColumnScanner columnScanner : rowScanner) {
+ String docid = columnScanner.getsRow().substring(2);
- String[] refs = entry.getValue().toString().split(" ");
+ for (ColumnValue columnValue : columnScanner) {
+ String[] refs = columnValue.getsValue().split(" ");
for (String ref : refs) {
if (ref.isEmpty())
@@ -269,18 +263,9 @@ public class ExportTestBase {
public static void dump(FluoClient fc) {
try (Snapshot snap = fc.newSnapshot()) {
- RowIterator scanner = snap.get(new ScannerConfiguration());
- while (scanner.hasNext()) {
- Entry<Bytes, ColumnIterator> row = scanner.next();
- ColumnIterator colIter = row.getValue();
-
- while (colIter.hasNext()) {
- Entry<Column, Bytes> entry = colIter.next();
-
- System.out.println("row:[" + row.getKey() + "] col:[" + entry.getKey() + "] val:["
- + entry.getValue() + "]");
- }
- }
+ CellScanner scanner = snap.scanner().build();
+ scanner.forEach(rcv -> System.out.println("row:[" + rcv.getRow() + "] col:["
+ + rcv.getColumn() + "] val:[" + rcv.getValue() + "]"));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
index e5f7d55..852d117 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java
@@ -19,7 +19,6 @@ import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,13 +30,12 @@ import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
import org.apache.fluo.recipes.core.types.StringEncoder;
@@ -179,21 +177,18 @@ public class BigUpdateIT {
}
private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) {
- RowIterator iter = snap.get(new ScannerConfiguration().setSpan(Span.prefix("side:")));
- int row = 0;
+ RowScanner rows = snap.scanner().over(Span.prefix("side:")).byRow().build();
- while (iter.hasNext()) {
- Entry<Bytes, ColumnIterator> entry = iter.next();
+ int row = 0;
- Assert.assertEquals(String.format("side:%06d", row++), entry.getKey().toString());
+ for (ColumnScanner columns : rows) {
+ Assert.assertEquals(String.format("side:%06d", row++), columns.getsRow());
- ColumnIterator colITer = entry.getValue();
- while (colITer.hasNext()) {
- Entry<Column, Bytes> entry2 = colITer.next();
- Assert.assertEquals(new Column("debug", "sum"), entry2.getKey());
- Assert.assertEquals("row : " + entry.getKey(), "" + expectedVal, entry2.getValue()
- .toString());
+ for (ColumnValue columnValue : columns) {
+ Assert.assertEquals(new Column("debug", "sum"), columnValue.getColumn());
+ Assert
+ .assertEquals("row : " + columns.getsRow(), "" + expectedVal, columnValue.getsValue());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
index f7dbc89..cb76891 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java
@@ -18,7 +18,6 @@ package org.apache.fluo.recipes.core.map;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Random;
import com.google.common.collect.ImmutableMap;
@@ -28,14 +27,12 @@ import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
import org.junit.After;
@@ -83,11 +80,11 @@ public class CollisionFreeMapIT {
Map<String, Long> counts = new HashMap<>();
try (Snapshot snap = fc.newSnapshot()) {
- RowIterator scanner = snap.get(new ScannerConfiguration().setSpan(Span.prefix("iwc:")));
- while (scanner.hasNext()) {
- Entry<Bytes, ColumnIterator> row = scanner.next();
- String[] tokens = row.getKey().toString().split(":");
+ CellScanner scanner = snap.scanner().over(Span.prefix("iwc:")).build();
+
+ for (RowColumnValue rcv : scanner) {
+ String[] tokens = rcv.getsRow().split(":");
String word = tokens[2];
Long count = Long.valueOf(tokens[1]);
@@ -104,25 +101,19 @@ public class CollisionFreeMapIT {
Map<String, Long> counts = new HashMap<>();
try (Snapshot snap = fc.newSnapshot()) {
- RowIterator scanner =
- snap.get(new ScannerConfiguration().setSpan(Span.prefix("d:")).fetchColumn(
- Bytes.of("content"), Bytes.of("current")));
- while (scanner.hasNext()) {
- Entry<Bytes, ColumnIterator> row = scanner.next();
-
- ColumnIterator colIter = row.getValue();
- while (colIter.hasNext()) {
- Entry<Column, Bytes> entry = colIter.next();
- String[] words = entry.getValue().toString().split("\\s+");
- for (String word : words) {
- if (word.isEmpty()) {
- continue;
- }
+ CellScanner scanner =
+ snap.scanner().over(Span.prefix("d:")).fetch(new Column("content", "current")).build();
- counts.merge(word, 1L, Long::sum);
+ for (RowColumnValue rcv : scanner) {
+ String[] words = rcv.getsValue().split("\\s+");
+ for (String word : words) {
+ if (word.isEmpty()) {
+ continue;
}
+
+ counts.merge(word, 1L, Long::sum);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
index 7c09b6e..1e9b262 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java
@@ -15,17 +15,21 @@
package org.apache.fluo.recipes.core.transaction;
-import java.util.AbstractMap;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import com.google.common.collect.Iterators;
import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.client.scanner.RowScannerBuilder;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.recipes.core.types.StringEncoder;
import org.apache.fluo.recipes.core.types.TypeLayer;
import org.apache.fluo.recipes.core.types.TypedTransaction;
@@ -154,67 +158,64 @@ public class RecordingTransactionTest {
}
@Test
- public void testGetScanNull() {
- ScannerConfiguration scanConfig = new ScannerConfiguration();
- expect(tx.get(scanConfig)).andReturn(null);
- replay(tx);
- Assert.assertNull(rtx.get(scanConfig));
- verify(tx);
- }
-
- @Test
public void testGetScanIter() {
- ScannerConfiguration scanConfig = new ScannerConfiguration();
- expect(tx.get(scanConfig)).andReturn(new RowIterator() {
-
- private boolean hasNextRow = true;
-
+ ScannerBuilder sb = mock(ScannerBuilder.class);
+ expect(sb.build()).andReturn(new CellScanner() {
@Override
- public boolean hasNext() {
- return hasNextRow;
- }
-
- @Override
- public Map.Entry<Bytes, ColumnIterator> next() {
- hasNextRow = false;
- return new AbstractMap.SimpleEntry<>(Bytes.of("r7"), new ColumnIterator() {
-
- private boolean hasNextCol = true;
-
- @Override
- public boolean hasNext() {
- return hasNextCol;
- }
-
- @Override
- public Map.Entry<Column, Bytes> next() {
- hasNextCol = false;
- return new AbstractMap.SimpleEntry<>(new Column("cf7", "cq7"), Bytes.of("v7"));
- }
- });
+ public Iterator<RowColumnValue> iterator() {
+ return Iterators
+ .singletonIterator(new RowColumnValue("r7", new Column("cf7", "cq7"), "v7"));
}
});
- replay(tx);
- RowIterator rowIter = rtx.get(scanConfig);
- Assert.assertNotNull(rowIter);
- Assert.assertTrue(rtx.getTxLog().isEmpty());
- Assert.assertTrue(rowIter.hasNext());
- Map.Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
- Assert.assertFalse(rowIter.hasNext());
- Assert.assertEquals(Bytes.of("r7"), rowEntry.getKey());
- ColumnIterator colIter = rowEntry.getValue();
- Assert.assertTrue(colIter.hasNext());
+
+ expect(tx.scanner()).andReturn(sb);
+
+ replay(tx, sb);
+
+ Iterator<RowColumnValue> iter = rtx.scanner().build().iterator();
Assert.assertTrue(rtx.getTxLog().isEmpty());
- Map.Entry<Column, Bytes> colEntry = colIter.next();
+ Assert.assertTrue(iter.hasNext());
+ Assert.assertEquals(new RowColumnValue("r7", new Column("cf7", "cq7"), "v7"), iter.next());
Assert.assertFalse(rtx.getTxLog().isEmpty());
- Assert.assertFalse(colIter.hasNext());
- Assert.assertEquals(new Column("cf7", "cq7"), colEntry.getKey());
- Assert.assertEquals(Bytes.of("v7"), colEntry.getValue());
List<LogEntry> entries = rtx.getTxLog().getLogEntries();
Assert.assertEquals(1, entries.size());
Assert.assertEquals("LogEntry{op=GET, row=r7, col=cf7 cq7 , value=v7}", entries.get(0)
.toString());
- verify(tx);
+
+ verify(tx, sb);
+ }
+
+ @Test
+ public void testGetRowScanner() {
+ ColumnScanner cs = mock(ColumnScanner.class);
+ RowScanner rs = mock(RowScanner.class);
+ RowScannerBuilder rsb = mock(RowScannerBuilder.class);
+ ScannerBuilder sb = mock(ScannerBuilder.class);
+
+ expect(cs.getRow()).andReturn(Bytes.of("r7")).times(2);
+ expect(cs.iterator()).andReturn(
+ Iterators.singletonIterator(new ColumnValue(new Column("cf7", "cq7"), "v7")));
+ expect(rs.iterator()).andReturn(Iterators.singletonIterator(cs));
+ expect(rsb.build()).andReturn(rs);
+ expect(sb.byRow()).andReturn(rsb);
+ expect(tx.scanner()).andReturn(sb);
+
+ replay(tx, sb, rsb, rs, cs);
+
+ Iterator<ColumnScanner> riter = rtx.scanner().byRow().build().iterator();
+ Assert.assertTrue(riter.hasNext());
+ ColumnScanner cscanner = riter.next();
+ Assert.assertEquals(Bytes.of("r7"), cscanner.getRow());
+ Iterator<ColumnValue> citer = cscanner.iterator();
+ Assert.assertTrue(citer.hasNext());
+ Assert.assertTrue(rtx.getTxLog().isEmpty());
+ Assert.assertEquals(new ColumnValue(new Column("cf7", "cq7"), "v7"), citer.next());
+ List<LogEntry> entries = rtx.getTxLog().getLogEntries();
+ Assert.assertEquals(1, entries.size());
+ Assert.assertEquals("LogEntry{op=GET, row=r7, col=cf7 cq7 , value=v7}", entries.get(0)
+ .toString());
+
+ verify(tx, sb, rsb, rs, cs);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
index d31b36c..8d87fdf 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/MockSnapshotBase.java
@@ -22,11 +22,10 @@ import java.util.Map;
import java.util.Set;
import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.core.impl.TxStringUtil;
public class MockSnapshotBase implements SnapshotBase {
@@ -81,7 +80,7 @@ public class MockSnapshotBase implements SnapshotBase {
}
@Override
- public RowIterator get(ScannerConfiguration config) {
+ public ScannerBuilder scanner() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
----------------------------------------------------------------------
diff --git a/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java b/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
index 92d7eaf..ddc51ef 100644
--- a/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
+++ b/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
@@ -34,12 +34,9 @@ import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,24 +73,16 @@ public class FluoITHelper {
*/
public static void printFluoTable(FluoClient client) {
try (Snapshot s = client.newSnapshot()) {
- RowIterator iter = s.get(new ScannerConfiguration());
-
System.out.println("== fluo start ==");
- while (iter.hasNext()) {
- Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next();
- ColumnIterator citer = rowEntry.getValue();
- while (citer.hasNext()) {
- Map.Entry<Column, Bytes> colEntry = citer.next();
-
- StringBuilder sb = new StringBuilder();
- Hex.encNonAscii(sb, rowEntry.getKey());
- sb.append(" ");
- Hex.encNonAscii(sb, colEntry.getKey(), " ");
- sb.append("\t");
- Hex.encNonAscii(sb, colEntry.getValue());
-
- System.out.println(sb.toString());
- }
+ for (RowColumnValue rcv : s.scanner().build()) {
+ StringBuilder sb = new StringBuilder();
+ Hex.encNonAscii(sb, rcv.getRow());
+ sb.append(" ");
+ Hex.encNonAscii(sb, rcv.getColumn(), " ");
+ sb.append("\t");
+ Hex.encNonAscii(sb, rcv.getValue());
+
+ System.out.println(sb.toString());
}
System.out.println("=== fluo end ===");
}
@@ -120,37 +109,29 @@ public class FluoITHelper {
expected = sort(expected);
try (Snapshot s = client.newSnapshot()) {
- RowIterator rowIter = s.get(new ScannerConfiguration());
+ Iterator<RowColumnValue> fluoIter = s.scanner().build().iterator();
Iterator<RowColumnValue> rcvIter = expected.iterator();
- while (rowIter.hasNext()) {
- Map.Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
- ColumnIterator citer = rowEntry.getValue();
- while (citer.hasNext() && rcvIter.hasNext()) {
- Map.Entry<Column, Bytes> colEntry = citer.next();
- RowColumnValue rcv = rcvIter.next();
- Column col = colEntry.getKey();
-
- boolean retval = diff("fluo row", rcv.getRow(), rowEntry.getKey());
- retval |= diff("fluo fam", rcv.getColumn().getFamily(), col.getFamily());
- retval |= diff("fluo qual", rcv.getColumn().getQualifier(), col.getQualifier());
- retval |= diff("fluo val", rcv.getValue(), colEntry.getValue());
-
- if (retval) {
- log.error("Difference found - row {} cf {} cq {} val {}", rcv.getRow().toString(), rcv
- .getColumn().getFamily().toString(), rcv.getColumn().getQualifier().toString(), rcv
- .getValue().toString());
- return false;
- }
-
- log.debug("Verified {}", Hex.encNonAscii(rcv, " "));
- }
- if (citer.hasNext()) {
- log.error("An column iterator still has more data");
+ while (fluoIter.hasNext() && rcvIter.hasNext()) {
+ RowColumnValue actualRcv = fluoIter.next();
+ RowColumnValue rcv = rcvIter.next();
+
+ boolean retval = diff("fluo row", rcv.getRow(), actualRcv.getRow());
+ retval |= diff("fluo fam", rcv.getColumn().getFamily(), actualRcv.getColumn().getFamily());
+ retval |=
+ diff("fluo qual", rcv.getColumn().getQualifier(), actualRcv.getColumn().getQualifier());
+ retval |= diff("fluo val", rcv.getValue(), actualRcv.getValue());
+
+ if (retval) {
+ log.error("Difference found - row {} cf {} cq {} val {}", rcv.getsRow(), rcv.getColumn()
+ .getsFamily(), rcv.getColumn().getsQualifier(), rcv.getsValue());
return false;
}
+
+ log.debug("Verified {}", Hex.encNonAscii(rcv, " "));
}
- if (rowIter.hasNext() || rcvIter.hasNext()) {
+
+ if (fluoIter.hasNext() || rcvIter.hasNext()) {
log.error("An iterator still has more data");
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/18933f22/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6c3561d..abdc1fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
<properties>
<accumulo.version>1.7.1</accumulo.version>
<findbugs.maxRank>13</findbugs.maxRank>
- <fluo.version>1.0.0-beta-3-SNAPSHOT</fluo.version>
+ <fluo.version>1.0.0-incubating-SNAPSHOT</fluo.version>
<hadoop.version>2.6.3</hadoop.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>