You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 15:31:42 UTC

[2/3] incubator-fluo git commit: fixes #639 vastly improved the scanner API

fixes #639 vastly improved the scanner API


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/23374784
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/23374784
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/23374784

Branch: refs/heads/master
Commit: 233747847831efc4dad6cc030a895860ead7d71e
Parents: e72a931
Author: Keith Turner <kt...@apache.org>
Authored: Wed Jul 13 18:09:15 2016 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jul 15 11:27:13 2016 -0400

----------------------------------------------------------------------
 .../apache/fluo/api/client/SnapshotBase.java    |  43 +++-
 .../fluo/api/client/scanner/CellScanner.java    |  25 ++
 .../fluo/api/client/scanner/ColumnScanner.java  |  30 +++
 .../fluo/api/client/scanner/RowScanner.java     |  23 ++
 .../api/client/scanner/RowScannerBuilder.java   |  26 ++
 .../fluo/api/client/scanner/ScannerBuilder.java |  61 +++++
 .../fluo/api/config/ScannerConfiguration.java   |  98 --------
 .../org/apache/fluo/api/data/ColumnValue.java   |  76 ++++++
 .../fluo/api/iterator/ColumnIterator.java       |  31 ---
 .../apache/fluo/api/iterator/RowIterator.java   |  30 ---
 .../apache/fluo/cluster/runner/AppRunner.java   | 107 ++++----
 .../apache/fluo/cluster/runner/ScanTest.java    |   8 +-
 .../fluo/core/impl/ColumnIteratorImpl.java      |  90 -------
 .../apache/fluo/core/impl/RowIteratorImpl.java  |  77 ------
 .../apache/fluo/core/impl/SnapshotScanner.java  | 244 +++++++++++--------
 .../apache/fluo/core/impl/TransactionImpl.java  |  61 +++--
 .../fluo/core/impl/scanner/CellScannerImpl.java |  57 +++++
 .../core/impl/scanner/ColumnScannerImpl.java    |  71 ++++++
 .../fluo/core/impl/scanner/RowScannerImpl.java  |  50 ++++
 .../core/impl/scanner/ScannerBuilderImpl.java   |  90 +++++++
 .../fluo/core/log/TracingTransaction.java       |   9 +-
 .../core/config/ScannerConfigurationTest.java   |  92 -------
 .../org/apache/fluo/integration/ITBase.java     |  21 +-
 .../org/apache/fluo/integration/ITBaseImpl.java |   1 +
 .../fluo/integration/TestTransaction.java       |   7 +-
 .../apache/fluo/integration/impl/FluoIT.java    |  31 +--
 .../apache/fluo/integration/impl/ScannerIT.java | 217 +++++++++++++++++
 .../fluo/integration/impl/StochasticBankIT.java |  24 +-
 .../integration/impl/WeakNotificationIT.java    |  21 +-
 .../apache/fluo/integration/impl/WorkerIT.java  |  17 +-
 .../fluo/mapreduce/FluoEntryInputFormat.java    |  45 ++--
 .../fluo/mapreduce/FluoRowInputFormat.java      |  37 +--
 pom.xml                                         |   4 +-
 33 files changed, 1082 insertions(+), 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
index 778aa94..79e1cef 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
@@ -19,11 +19,11 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.data.Span;
 
 /**
  * Allows users to read from a Fluo table at a certain point in time
@@ -57,9 +57,44 @@ public interface SnapshotBase {
   Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns);
 
   /**
-   * Retrieves a {@link RowIterator} with the given {@link ScannerConfiguration}
+   * This method is the starting point for constructing a scanner. Scanners can be constructed over
+   * a {@link Span} and/or with a subset of columns. Below is simple example of building a scanner.
+   * 
+   * <pre>
+   * {@code
+   *   Transaction tx = ...;
+   *   Span span = Span.exact("row4");
+   *   Column col1 = new Column("fam1","qual1");
+   *   Column col2 = new Column("fam1","qual2");
+   * 
+   *   //create a scanner over row4 fetching columns fam1:qual1 and fam1:qual2
+   *   CellScanner cs = tx.scanner().over(span).fetch(col1,col2).build();
+   *   for(RowColumnValue rcv : cs) {
+   *     //do stuff with rcv
+   *   }
+   * }
+   * </pre>
+   *
+   * <p>
+   * The following example shows how to build a row scanner.
+   *
+   * <pre>
+   * {
+   *   &#064;code
+   *   RowScanner rs = tx.scanner().over(span).fetch(col1, col2).byRow().build();
+   *   for (ColumnScanner colScanner : rs) {
+   *     Bytes row = colScanner.getRow();
+   *     for (ColumnValue cv : colScanner) {
+   *       // do stuff with the columns and values in the row
+   *     }
+   *   }
+   * }
+   * </pre>
+   *
+   * @return A scanner builder.
    */
-  RowIterator get(ScannerConfiguration config);
+
+  ScannerBuilder scanner();
 
   /**
    * Wrapper for {@link #get(Collection)} that uses Strings. All strings are encoded and decoded

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java
new file mode 100644
index 0000000..b2bf50e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.client.scanner;
+
+import org.apache.fluo.api.data.RowColumnValue;
+
+/**
+ * @since 1.0.0
+ */
+public interface CellScanner extends Iterable<RowColumnValue> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java
new file mode 100644
index 0000000..9e790ea
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.client.scanner;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.ColumnValue;
+
+/**
+ * @since 1.0.0
+ */
+public interface ColumnScanner extends Iterable<ColumnValue> {
+
+  /**
+   * @return the row for all column values
+   */
+  Bytes getRow();
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java
new file mode 100644
index 0000000..e65816b
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.client.scanner;
+
+/**
+ * @since 1.0.0
+ */
+public interface RowScanner extends Iterable<ColumnScanner> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java
new file mode 100644
index 0000000..fa7c60e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.client.scanner;
+
+/**
+ * @since 1.0.0
+ */
+public interface RowScannerBuilder {
+  /**
+   * @return a new scanner created with any previously set restrictions
+   */
+  RowScanner build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java
new file mode 100644
index 0000000..1217aa0
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.client.scanner;
+
+import java.util.Collection;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+/**
+ * @since 1.0.0
+ */
+public interface ScannerBuilder {
+  /**
+   * @param span restrict the scanner to data within span
+   * @return self
+   */
+  ScannerBuilder over(Span span);
+
+
+  /**
+   * Passing in a Column with only the family set will fetch the entire column family.
+   *
+   * @param columns restrict the scanner to only these columns
+   * @return self
+   */
+  ScannerBuilder fetch(Column... columns);
+
+  /**
+   * Passing in a Column with only the family set will fetch the entire column family.
+   *
+   * @param columns restrict the scanner to only these columns
+   * @return self
+   */
+  ScannerBuilder fetch(Collection<Column> columns);
+
+  /**
+   * @return a new scanner created with any previously set restrictions
+   */
+  CellScanner build();
+
+  /**
+   * Call this to build a row scanner.
+   *
+   * @return a row scanner builder using any previously set restrictions
+   */
+  RowScannerBuilder byRow();
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java
deleted file mode 100644
index 3a73b50..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.config;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
-
-/**
- * Contains configuration for a {@link org.apache.fluo.api.client.Snapshot} scanner. Passed to
- * {@link SnapshotBase#get(ScannerConfiguration)}.
- *
- * @since 1.0.0
- */
-public class ScannerConfiguration implements Cloneable {
-
-  private Span span = new Span();
-  private Set<Column> columns = new HashSet<>();
-
-  /**
-   * Sets {@link Span} for ScannerConfiguration
-   */
-  public ScannerConfiguration setSpan(Span span) {
-    Objects.requireNonNull(span);
-    this.span = span;
-    return this;
-  }
-
-  /**
-   * Retrieves {@link Span} for ScannerConfiguration
-   */
-  public Span getSpan() {
-    return span;
-  }
-
-  /**
-   * List of all {@link Column}s that scanner will retrieve
-   */
-  public Set<Column> getColumns() {
-    return Collections.unmodifiableSet(columns);
-  }
-
-  /**
-   * Configures scanner to retrieve column with the given family
-   */
-  public ScannerConfiguration fetchColumnFamily(Bytes fam) {
-    Objects.requireNonNull(fam);
-    columns.add(new Column(fam));
-    return this;
-  }
-
-  /**
-   * Configures scanner to retrieve column with the given family and qualifier
-   */
-  public ScannerConfiguration fetchColumn(Bytes fam, Bytes qual) {
-    Objects.requireNonNull(fam);
-    Objects.requireNonNull(qual);
-    columns.add(new Column(fam, qual));
-    return this;
-  }
-
-  /**
-   * Clears all fetched column settings
-   */
-  public void clearColumns() {
-    columns.clear();
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public Object clone() throws CloneNotSupportedException {
-    ScannerConfiguration sc = (ScannerConfiguration) super.clone();
-
-    sc.columns = (Set<Column>) ((HashSet<Column>) columns).clone();
-    sc.span = span;
-
-    return sc;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
new file mode 100644
index 0000000..23a3741
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.data;
+
+import java.io.Serializable;
+
+/**
+ * @since 1.0.0
+ */
+
+public class ColumnValue implements Serializable, Comparable<ColumnValue> {
+  private static final long serialVersionUID = 1L;
+
+  private Column column;
+  private Bytes val;
+
+  public ColumnValue(Column col, Bytes val) {
+    this.column = col;
+    this.val = val;
+  }
+
+  public ColumnValue(Column col, String val) {
+    this.column = col;
+    this.val = Bytes.of(val);
+  }
+
+  public Column getColumn() {
+    return column;
+  }
+
+  public Bytes getValue() {
+    return val;
+  }
+
+  @Override
+  public int compareTo(ColumnValue o) {
+    int comp = column.compareTo(o.column);
+    if (comp == 0) {
+      comp = val.compareTo(o.val);
+    }
+    return comp;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof ColumnValue) {
+      ColumnValue ocv = (ColumnValue) o;
+      return column.equals(ocv.column) && val.equals(ocv.val);
+    }
+
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return column.hashCode() + 31 * val.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return column + " " + val;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java b/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java
deleted file mode 100644
index be04738..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.iterator;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-
-/**
- * Iterator for Fluo columns
- *
- * @since 1.0.0
- */
-public interface ColumnIterator extends Iterator<Entry<Column, Bytes>> {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java b/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java
deleted file mode 100644
index 21e53b1..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.iterator;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.fluo.api.data.Bytes;
-
-/**
- * Iterator for Fluo rows
- *
- * @since 1.0.0
- */
-public interface RowIterator extends Iterator<Entry<Bytes, ColumnIterator>> {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index ca9afe0..d40ff9e 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -17,7 +17,8 @@ package org.apache.fluo.cluster.runner;
 
 import java.lang.reflect.Method;
 import java.util.Arrays;
-import java.util.Map;
+import java.util.Collection;
+import java.util.HashSet;
 
 import javax.inject.Provider;
 
@@ -35,14 +36,13 @@ import org.apache.fluo.accumulo.format.FluoFormatter;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.cluster.util.FluoYarnConfig;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
@@ -68,9 +68,8 @@ public abstract class AppRunner {
     this.scriptName = scriptName;
   }
 
-  public static ScannerConfiguration buildScanConfig(ScanOptions options) {
-    ScannerConfiguration scanConfig = new ScannerConfiguration();
-
+  public static Span getSpan(ScanOptions options) {
+    Span span = new Span();
     if ((options.getExactRow() != null)
         && ((options.getStartRow() != null) || (options.getEndRow() != null) || (options
             .getRowPrefix() != null))) {
@@ -87,35 +86,43 @@ public abstract class AppRunner {
 
     // configure span of scanner
     if (options.getExactRow() != null) {
-      scanConfig.setSpan(Span.exact(options.getExactRow()));
+      span = Span.exact(options.getExactRow());
     } else if (options.getRowPrefix() != null) {
-      scanConfig.setSpan(Span.prefix(options.getRowPrefix()));
+      span = Span.prefix(options.getRowPrefix());
     } else {
       if ((options.getStartRow() != null) && (options.getEndRow() != null)) {
-        scanConfig.setSpan(new Span(options.getStartRow(), true, options.getEndRow(), true));
+        span = new Span(options.getStartRow(), true, options.getEndRow(), true);
       } else if (options.getStartRow() != null) {
-        scanConfig.setSpan(new Span(Bytes.of(options.getStartRow()), true, Bytes.EMPTY, true));
+        span = new Span(Bytes.of(options.getStartRow()), true, Bytes.EMPTY, true);
       } else if (options.getEndRow() != null) {
-        scanConfig.setSpan(new Span(Bytes.EMPTY, true, Bytes.of(options.getEndRow()), true));
+        span = new Span(Bytes.EMPTY, true, Bytes.of(options.getEndRow()), true);
       }
     }
 
+    return span;
+  }
+
+  public static Collection<Column> getColumns(ScanOptions options) {
+    Collection<Column> columns = new HashSet<>();
+
     // configure columns of scanner
     for (String column : options.getColumns()) {
       String[] colFields = column.split(":");
       if (colFields.length == 1) {
-        scanConfig.fetchColumnFamily(Bytes.of(colFields[0]));
+        columns.add(new Column(colFields[0]));
       } else if (colFields.length == 2) {
-        scanConfig.fetchColumn(Bytes.of(colFields[0]), Bytes.of(colFields[1]));
+        columns.add(new Column(colFields[0], colFields[1]));
       } else {
         throw new IllegalArgumentException("Failed to scan!  Column '" + column
             + "' has too many fields (indicated by ':')");
       }
     }
 
-    return scanConfig;
+    return columns;
   }
 
+
+
   public long scan(FluoConfiguration config, String[] args) {
     ScanOptions options = new ScanOptions();
     JCommander jcommand = new JCommander(options);
@@ -148,46 +155,46 @@ public abstract class AppRunner {
     try (FluoClient client = FluoFactory.newClient(sConfig)) {
       try (Snapshot s = client.newSnapshot()) {
 
-        ScannerConfiguration scanConfig = null;
+        Span span = null;
+        Collection<Column> columns = null;
         try {
-          scanConfig = buildScanConfig(options);
+          span = getSpan(options);
+          columns = getColumns(options);
         } catch (IllegalArgumentException e) {
           System.err.println(e.getMessage());
           System.exit(-1);
         }
 
-        RowIterator iter = s.get(scanConfig);
-
-        if (!iter.hasNext()) {
-          System.out.println("\nNo data found\n");
-        }
+        CellScanner cellScanner = s.scanner().over(span).fetch(columns).build();
 
         StringBuilder sb = new StringBuilder();
-        while (iter.hasNext() && !System.out.checkError()) {
-          Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next();
-          ColumnIterator citer = rowEntry.getValue();
-          while (citer.hasNext() && !System.out.checkError()) {
-            Map.Entry<Column, Bytes> colEntry = citer.next();
-            if (options.hexEncNonAscii) {
-              sb.setLength(0);
-              Hex.encNonAscii(sb, rowEntry.getKey());
-              sb.append(" ");
-              Hex.encNonAscii(sb, colEntry.getKey(), " ");
-              sb.append("\t");
-              Hex.encNonAscii(sb, colEntry.getValue());
-              System.out.println(sb.toString());
-            } else {
-              sb.setLength(0);
-              sb.append(rowEntry.getKey());
-              sb.append(" ");
-              sb.append(colEntry.getKey());
-              sb.append("\t");
-              sb.append(colEntry.getValue());
-              System.out.println(sb.toString());
-            }
-            entriesFound++;
+        for (RowColumnValue rcv : cellScanner) {
+          if (options.hexEncNonAscii) {
+            sb.setLength(0);
+            Hex.encNonAscii(sb, rcv.getRow());
+            sb.append(" ");
+            Hex.encNonAscii(sb, rcv.getColumn(), " ");
+            sb.append("\t");
+            Hex.encNonAscii(sb, rcv.getValue());
+            System.out.println(sb.toString());
+          } else {
+            sb.setLength(0);
+            sb.append(rcv.getsRow());
+            sb.append(" ");
+            sb.append(rcv.getColumn());
+            sb.append("\t");
+            sb.append(rcv.getsValue());
+            System.out.println(sb.toString());
+          }
+          entriesFound++;
+          if (System.out.checkError()) {
+            break;
           }
         }
+
+        if (entriesFound == 0) {
+          System.out.println("\nNo data found\n");
+        }
       } catch (FluoException e) {
         System.out.println("Scan failed - " + e.getMessage());
       }
@@ -201,9 +208,11 @@ public abstract class AppRunner {
 
     Connector conn = AccumuloUtil.getConnector(sConfig);
 
-    ScannerConfiguration scanConfig = null;
+    Span span = null;
+    Collection<Column> columns = null;
     try {
-      scanConfig = buildScanConfig(options);
+      span = getSpan(options);
+      columns = getColumns(options);
     } catch (IllegalArgumentException e) {
       System.err.println(e.getMessage());
       System.exit(-1);
@@ -213,8 +222,8 @@ public abstract class AppRunner {
 
     try {
       Scanner scanner = conn.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
-      scanner.setRange(SpanUtil.toRange(scanConfig.getSpan()));
-      for (Column col : scanConfig.getColumns()) {
+      scanner.setRange(SpanUtil.toRange(span));
+      for (Column col : columns) {
         if (col.isQualifierSet()) {
           scanner
               .fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier()));

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java b/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java
index 2d7c17d..a84e8ea 100644
--- a/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java
+++ b/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java
@@ -16,10 +16,10 @@
 package org.apache.fluo.cluster.runner;
 
 import com.beust.jcommander.JCommander;
-import org.apache.fluo.api.config.ScannerConfiguration;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.impl.SnapshotScanner;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,16 +28,16 @@ import org.junit.Test;
  */
 public class ScanTest {
 
-  private ScannerConfiguration parseArgs(String args) {
+  private SnapshotScanner.Opts parseArgs(String args) {
     ScanOptions options = new ScanOptions();
     JCommander jcommand = new JCommander(options);
     jcommand.parse(args.split(" "));
-    return AppRunner.buildScanConfig(options);
+    return new SnapshotScanner.Opts(AppRunner.getSpan(options), AppRunner.getColumns(options));
   }
 
   @Test
   public void testValidInput() {
-    ScannerConfiguration config;
+    SnapshotScanner.Opts config;
 
     config = parseArgs("");
     Assert.assertEquals(RowColumn.EMPTY, config.getSpan().getStart());

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java
deleted file mode 100644
index 266df69..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.impl;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.core.util.ByteUtil;
-
-/**
- * Implementation of Column Iterator
- */
-public class ColumnIteratorImpl implements ColumnIterator {
-
-  private Iterator<Entry<Key, Value>> scanner;
-  private Entry<Key, Value> firstEntry;
-
-  ColumnIteratorImpl(Iterator<Entry<Key, Value>> scanner) {
-    this(null, scanner);
-  }
-
-  ColumnIteratorImpl(Entry<Key, Value> firstEntry, Iterator<Entry<Key, Value>> cols) {
-    this.firstEntry = firstEntry;
-    this.scanner = cols;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return firstEntry != null || scanner.hasNext();
-  }
-
-  // TODO create custom class to return instead of entry
-  @Override
-  public Entry<Column, Bytes> next() {
-    Entry<Key, Value> entry;
-    if (firstEntry != null) {
-      entry = firstEntry;
-      firstEntry = null;
-    } else {
-      entry = scanner.next();
-    }
-    final Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData());
-    final Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData());
-    final Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData());
-
-    final Column col = new Column(cf, cq, cv);
-    final Bytes val = Bytes.of(entry.getValue().get());
-
-    return new Entry<Column, Bytes>() {
-
-      @Override
-      public Bytes setValue(Bytes value) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public Bytes getValue() {
-        return val;
-      }
-
-      @Override
-      public Column getKey() {
-        return col;
-      }
-    };
-  }
-
-  @Override
-  public void remove() {
-    scanner.remove();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java
deleted file mode 100644
index efdefeb..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.impl;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-
-/**
- * Implementation of RowIterator
- */
-public class RowIteratorImpl implements RowIterator {
-
-  private final org.apache.accumulo.core.client.RowIterator rowIter;
-
-  RowIteratorImpl(Iterator<Entry<Key, Value>> scanner) {
-    rowIter = new org.apache.accumulo.core.client.RowIterator(scanner);
-  }
-
-  @Override
-  public boolean hasNext() {
-    return rowIter.hasNext();
-  }
-
-  // TODO create custom class to return instead of entry
-  @Override
-  public Entry<Bytes, ColumnIterator> next() {
-    Iterator<Entry<Key, Value>> cols = rowIter.next();
-
-    Entry<Key, Value> entry = cols.next();
-
-    final Bytes row = Bytes.of(entry.getKey().getRowData().toArray());
-    final ColumnIterator coliter = new ColumnIteratorImpl(entry, cols);
-
-    return new Entry<Bytes, ColumnIterator>() {
-
-      @Override
-      public Bytes getKey() {
-        return row;
-      }
-
-      @Override
-      public ColumnIterator getValue() {
-        return coliter;
-      }
-
-      @Override
-      public ColumnIterator setValue(ColumnIterator value) {
-        throw new UnsupportedOperationException();
-      }
-    };
-
-  }
-
-  @Override
-  public void remove() {
-    rowIter.remove();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index 2b9959d..cd2e008 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -16,12 +16,13 @@
 package org.apache.fluo.core.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.Set;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase;
@@ -30,7 +31,6 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.SnapshotIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
-import org.apache.fluo.api.config.ScannerConfiguration;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
@@ -41,45 +41,41 @@ import org.apache.fluo.core.util.UtilWaitThread;
 /**
  * Allows users to iterate over entries of a {@link org.apache.fluo.api.client.Snapshot}
  */
-public class SnapshotScanner implements Iterator<Entry<Key, Value>> {
+public class SnapshotScanner implements Iterable<Entry<Key, Value>> {
+
+  /**
+   * Immutable options for a SnapshotScanner
+   */
+  public static final class Opts {
+    private final Span span;
+    private final Collection<Column> columns;
+
+    public Opts(Span span, Collection<Column> columns) {
+      this.span = span;
+      this.columns = ImmutableSet.copyOf(columns);
+    }
+
+    public Span getSpan() {
+      return span;
+    }
+
+    public Collection<Column> getColumns() {
+      return columns;
+    }
+  }
 
   private final long startTs;
   private final Environment env;
   private final TxStats stats;
-
-  private Iterator<Entry<Key, Value>> iterator;
-  private Entry<Key, Value> next;
-  private ScannerConfiguration config;
+  private final Opts config;
 
   static final long INITIAL_WAIT_TIME = 50;
   // TODO make configurable
   static final long MAX_WAIT_TIME = 60000;
 
-  public SnapshotScanner(Environment env, ScannerConfiguration config, long startTs, TxStats stats) {
-    this.env = env;
-    this.config = config;
-    this.startTs = startTs;
-    this.stats = stats;
-    setUpIterator();
-  }
 
-  private void setUpIterator() {
-    Scanner scanner;
-    try {
-      scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
-    } catch (TableNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-    scanner.clearColumns();
-    scanner.clearScanIterators();
-    scanner.setRange(SpanUtil.toRange(config.getSpan()));
-
-    setupScanner(scanner, config.getColumns(), startTs);
 
-    this.iterator = scanner.iterator();
-  }
-
-  static void setupScanner(ScannerBase scanner, Set<Column> columns, long startTs) {
+  static void setupScanner(ScannerBase scanner, Collection<Column> columns, long startTs) {
     for (Column col : columns) {
       if (col.isQualifierSet()) {
         scanner.fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier()));
@@ -93,119 +89,153 @@ public class SnapshotScanner implements Iterator<Entry<Key, Value>> {
     scanner.addScanIterator(iterConf);
   }
 
-  @Override
-  public boolean hasNext() {
-    if (next == null) {
-      next = getNext();
+  private class SnapIter implements Iterator<Entry<Key, Value>> {
+
+    private Iterator<Entry<Key, Value>> iterator;
+    private Entry<Key, Value> next;
+    private Opts snapIterConfig;
+
+    SnapIter(Opts config) {
+      this.snapIterConfig = config;
+      setUpIterator();
     }
 
-    return next != null;
-  }
+    private void setUpIterator() {
+      Scanner scanner;
+      try {
+        scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
+      } catch (TableNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+      scanner.clearColumns();
+      scanner.clearScanIterators();
+      scanner.setRange(SpanUtil.toRange(snapIterConfig.getSpan()));
 
-  @Override
-  public Entry<Key, Value> next() {
-    if (!hasNext()) {
-      throw new NoSuchElementException();
+      setupScanner(scanner, snapIterConfig.getColumns(), startTs);
+
+      this.iterator = scanner.iterator();
     }
 
-    Entry<Key, Value> tmp = next;
-    next = null;
-    return tmp;
-  }
+    @Override
+    public boolean hasNext() {
+      if (next == null) {
+        next = getNext();
+      }
 
-  private void resetScanner(Span span) {
-    try {
-      config = (ScannerConfiguration) config.clone();
-    } catch (CloneNotSupportedException e) {
-      throw new RuntimeException(e);
+      return next != null;
     }
 
-    config.setSpan(span);
-    setUpIterator();
-  }
+    @Override
+    public Entry<Key, Value> next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      Entry<Key, Value> tmp = next;
+      next = null;
+      return tmp;
+    }
 
-  public void resolveLock(Entry<Key, Value> lockEntry) {
+    private void resetScanner(Span span) {
+      snapIterConfig = new Opts(span, snapIterConfig.columns);
+      setUpIterator();
+    }
 
-    // read ahead a little bit looking for other locks to resolve
+    public void resolveLock(Entry<Key, Value> lockEntry) {
 
-    long startTime = System.currentTimeMillis();
-    long waitTime = INITIAL_WAIT_TIME;
+      // read ahead a little bit looking for other locks to resolve
 
-    List<Entry<Key, Value>> locks = new ArrayList<>();
-    locks.add(lockEntry);
-    int amountRead = 0;
-    int numRead = 0;
+      long startTime = System.currentTimeMillis();
+      long waitTime = INITIAL_WAIT_TIME;
 
-    RowColumn origEnd = config.getSpan().getEnd();
-    boolean isEndInclusive = config.getSpan().isEndInclusive();
+      List<Entry<Key, Value>> locks = new ArrayList<>();
+      locks.add(lockEntry);
+      int amountRead = 0;
+      int numRead = 0;
 
-    while (true) {
-      while (iterator.hasNext()) {
-        Entry<Key, Value> entry = iterator.next();
+      RowColumn origEnd = snapIterConfig.getSpan().getEnd();
+      boolean isEndInclusive = snapIterConfig.getSpan().isEndInclusive();
 
-        long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+      while (true) {
+        while (iterator.hasNext()) {
+          Entry<Key, Value> entry = iterator.next();
 
-        if (colType == ColumnConstants.LOCK_PREFIX) {
-          locks.add(entry);
-        }
+          long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
 
-        amountRead += entry.getKey().getSize() + entry.getValue().getSize();
-        numRead++;
+          if (colType == ColumnConstants.LOCK_PREFIX) {
+            locks.add(entry);
+          }
 
-        if (numRead > 100 || amountRead > 1 << 12) {
-          break;
+          amountRead += entry.getKey().getSize() + entry.getValue().getSize();
+          numRead++;
+
+          if (numRead > 100 || amountRead > 1 << 12) {
+            break;
+          }
         }
-      }
 
-      boolean resolvedLocks = LockResolver.resolveLocks(env, startTs, stats, locks, startTime);
+        boolean resolvedLocks = LockResolver.resolveLocks(env, startTs, stats, locks, startTime);
 
-      if (!resolvedLocks) {
-        UtilWaitThread.sleep(waitTime);
-        stats.incrementLockWaitTime(waitTime);
-        waitTime = Math.min(MAX_WAIT_TIME, waitTime * 2);
+        if (!resolvedLocks) {
+          UtilWaitThread.sleep(waitTime);
+          stats.incrementLockWaitTime(waitTime);
+          waitTime = Math.min(MAX_WAIT_TIME, waitTime * 2);
 
-        RowColumn start = SpanUtil.toRowColumn(locks.get(0).getKey());
-        RowColumn end = SpanUtil.toRowColumn(locks.get(locks.size() - 1).getKey()).following();
+          RowColumn start = SpanUtil.toRowColumn(locks.get(0).getKey());
+          RowColumn end = SpanUtil.toRowColumn(locks.get(locks.size() - 1).getKey()).following();
 
-        resetScanner(new Span(start, true, end, false));
+          resetScanner(new Span(start, true, end, false));
 
-        locks.clear();
+          locks.clear();
 
-      } else {
-        break;
+        } else {
+          break;
+        }
       }
-    }
 
-    RowColumn start = SpanUtil.toRowColumn(lockEntry.getKey());
+      RowColumn start = SpanUtil.toRowColumn(lockEntry.getKey());
 
-    resetScanner(new Span(start, true, origEnd, isEndInclusive));
-  }
+      resetScanner(new Span(start, true, origEnd, isEndInclusive));
+    }
 
-  public Entry<Key, Value> getNext() {
-    mloop: while (true) {
-      // its possible a next could exist then be rolled back
-      if (!iterator.hasNext()) {
-        return null;
-      }
+    public Entry<Key, Value> getNext() {
+      mloop: while (true) {
+        // its possible a next could exist then be rolled back
+        if (!iterator.hasNext()) {
+          return null;
+        }
 
-      Entry<Key, Value> entry = iterator.next();
+        Entry<Key, Value> entry = iterator.next();
 
-      long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+        long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
 
-      if (colType == ColumnConstants.LOCK_PREFIX) {
-        resolveLock(entry);
-        continue mloop;
-      } else if (colType == ColumnConstants.DATA_PREFIX) {
-        stats.incrementEntriesReturned(1);
-        return entry;
-      } else {
-        throw new IllegalArgumentException("Unexpected column type " + colType);
+        if (colType == ColumnConstants.LOCK_PREFIX) {
+          resolveLock(entry);
+          continue mloop;
+        } else if (colType == ColumnConstants.DATA_PREFIX) {
+          stats.incrementEntriesReturned(1);
+          return entry;
+        } else {
+          throw new IllegalArgumentException("Unexpected column type " + colType);
+        }
       }
     }
+
+    @Override
+    public void remove() {
+      iterator.remove();
+    }
+  }
+
+  SnapshotScanner(Environment env, Opts config, long startTs, TxStats stats) {
+    this.env = env;
+    this.config = config;
+    this.startTs = startTs;
+    this.stats = stats;
   }
 
   @Override
-  public void remove() {
-    iterator.remove();
+  public Iterator<Entry<Key, Value>> iterator() {
+    return new SnapIter(config);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 3b0b85c..c4c429e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -29,6 +29,7 @@ import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -53,22 +54,23 @@ import org.apache.fluo.accumulo.util.ColumnConstants;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.accumulo.values.LockValue;
 import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.AlreadySetException;
 import org.apache.fluo.api.exceptions.CommitException;
 import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.core.async.AsyncCommitObserver;
 import org.apache.fluo.core.async.AsyncConditionalWriter;
 import org.apache.fluo.core.async.AsyncTransaction;
 import org.apache.fluo.core.async.SyncCommitObserver;
 import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
 import org.apache.fluo.core.exceptions.StaleScanException;
+import org.apache.fluo.core.impl.scanner.ColumnScannerImpl;
+import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
 import org.apache.fluo.core.oracle.Stamp;
 import org.apache.fluo.core.util.ColumnUtil;
 import org.apache.fluo.core.util.ConditionalFlutation;
@@ -207,36 +209,46 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
     return ret;
   }
 
-  @Override
-  public RowIterator get(ScannerConfiguration config) {
-    checkIfOpen();
-    return getImpl(config);
-  }
-
   private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns) {
 
     // TODO push visibility filtering to server side?
 
     env.getSharedResources().getVisCache().validate(columns);
 
-    ScannerConfiguration config = new ScannerConfiguration();
-    config.setSpan(Span.exact(row));
+    boolean shouldCopy = false;
+
     for (Column column : columns) {
-      config.fetchColumn(column.getFamily(), column.getQualifier());
+      if (column.isVisibilitySet()) {
+        shouldCopy = true;
+      }
     }
 
-    RowIterator iter = getImpl(config);
+    SnapshotScanner.Opts opts;
+    if (shouldCopy) {
+      HashSet<Column> cols = new HashSet<Column>();
+      for (Column column : columns) {
+        if (column.isVisibilitySet()) {
+          cols.add(new Column(column.getFamily(), column.getQualifier()));
+        } else {
+          cols.add(column);
+        }
+      }
+      opts = new SnapshotScanner.Opts(Span.exact(row), columns);
+    } else {
+      opts = new SnapshotScanner.Opts(Span.exact(row), columns);
+    }
 
     Map<Column, Bytes> ret = new HashMap<>();
 
-    while (iter.hasNext()) {
-      Entry<Bytes, ColumnIterator> entry = iter.next();
-      ColumnIterator citer = entry.getValue();
-      while (citer.hasNext()) {
-        Entry<Column, Bytes> centry = citer.next();
-        if (columns.contains(centry.getKey())) {
-          ret.put(centry.getKey(), centry.getValue());
+    Iterable<ColumnValue> scanner =
+        Iterables.transform(new SnapshotScanner(env, opts, startTs, stats), ColumnScannerImpl.E2CV);
+    for (ColumnValue cv : scanner) {
+      if (shouldCopy) {
+        if (columns.contains(cv.getColumn())) {
+          ret.put(cv.getColumn(), cv.getValue());
         }
+      } else {
+        ret.put(cv.getColumn(), cv.getValue());
       }
     }
 
@@ -246,8 +258,10 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
     return ret;
   }
 
-  private RowIterator getImpl(ScannerConfiguration config) {
-    return new RowIteratorImpl(new SnapshotScanner(this.env, config, startTs, stats));
+  @Override
+  public ScannerBuilder scanner() {
+    checkIfOpen();
+    return new ScannerBuilderImpl(this);
   }
 
   private void updateColumnsRead(Bytes row, Set<Column> columns) {
@@ -1193,4 +1207,7 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
     cd.commitObserver.committed();
   }
 
+  public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) {
+    return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns), startTs, stats);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
new file mode 100644
index 0000000..06e68d6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl.scanner;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.core.util.ByteUtil;
+
+public class CellScannerImpl implements CellScanner {
+
+  private Iterable<Entry<Key, Value>> snapshot;
+
+  private static final Function<Entry<Key, Value>, RowColumnValue> E2RCV =
+      new Function<Entry<Key, Value>, RowColumnValue>() {
+        @Override
+        public RowColumnValue apply(Entry<Key, Value> entry) {
+          Bytes row = ByteUtil.toBytes(entry.getKey().getRowData());
+          Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData());
+          Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData());
+          Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData());
+          Column col = new Column(cf, cq, cv);
+          Bytes val = Bytes.of(entry.getValue().get());
+          return new RowColumnValue(row, col, val);
+        }
+      };
+
+  CellScannerImpl(Iterable<Entry<Key, Value>> snapshot) {
+    this.snapshot = snapshot;
+  }
+
+  @Override
+  public Iterator<RowColumnValue> iterator() {
+    return Iterators.transform(snapshot.iterator(), E2RCV);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
new file mode 100644
index 0000000..c85dfeb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl.scanner;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.core.util.ByteUtil;
+
+public class ColumnScannerImpl implements ColumnScanner {
+
+  public static final Function<Entry<Key, Value>, ColumnValue> E2CV =
+      new Function<Entry<Key, Value>, ColumnValue>() {
+        @Override
+        public ColumnValue apply(Entry<Key, Value> entry) {
+          Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData());
+          Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData());
+          Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData());
+          Column col = new Column(cf, cq, cv);
+          Bytes val = Bytes.of(entry.getValue().get());
+          return new ColumnValue(col, val);
+        }
+      };
+
+  private PeekingIterator<Entry<Key, Value>> peekingIter;
+  private Bytes row;
+  private Iterator<ColumnValue> iter;
+  private boolean gotIter = false;
+
+  ColumnScannerImpl(Iterator<Entry<Key, Value>> e) {
+    peekingIter = Iterators.peekingIterator(e);
+    row = ByteUtil.toBytes(peekingIter.peek().getKey().getRowData());
+    iter = Iterators.transform(peekingIter, E2CV);
+  }
+
+  @Override
+  public Iterator<ColumnValue> iterator() {
+    Preconditions.checkState(!gotIter,
+        "Unfortunately this implementation only support getting the iterator once");
+    gotIter = true;
+    return iter;
+  }
+
+  @Override
+  public Bytes getRow() {
+    return row;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
new file mode 100644
index 0000000..4a9eb38
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl.scanner;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+
+public class RowScannerImpl implements RowScanner {
+
+  private Iterable<Entry<Key, Value>> snapshot;
+
+  private static final Function<Iterator<Entry<Key, Value>>, ColumnScanner> RI2CS =
+      new Function<Iterator<Entry<Key, Value>>, ColumnScanner>() {
+        @Override
+        public ColumnScanner apply(Iterator<Entry<Key, Value>> input) {
+          return new ColumnScannerImpl(input);
+        }
+      };
+
+  RowScannerImpl(Iterable<Entry<Key, Value>> snapshot) {
+    this.snapshot = snapshot;
+  }
+
+  @Override
+  public Iterator<ColumnScanner> iterator() {
+    RowIterator rowiter = new RowIterator(snapshot.iterator());
+    return Iterators.transform(rowiter, RI2CS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
new file mode 100644
index 0000000..8c833d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl.scanner;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.client.scanner.RowScannerBuilder;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.impl.SnapshotScanner;
+import org.apache.fluo.core.impl.TransactionImpl;
+
+public class ScannerBuilderImpl implements ScannerBuilder {
+
+  private static final Span EMPTY_SPAN = new Span();
+
+  private TransactionImpl tx;
+  private Span span = EMPTY_SPAN;
+  private Collection<Column> columns = Collections.emptyList();
+
+  public ScannerBuilderImpl(TransactionImpl tx) {
+    this.tx = tx;
+  }
+
+  @Override
+  public ScannerBuilder over(Span span) {
+    Objects.requireNonNull(span);
+    this.span = span;
+    return this;
+  }
+
+  private void setColumns(Collection<Column> columns) {
+    for (Column column : columns) {
+      Preconditions.checkArgument(!column.isVisibilitySet(),
+          "Fetching visibility is not currently supported");
+    }
+    this.columns = columns;
+  }
+
+  @Override
+  public ScannerBuilder fetch(Collection<Column> columns) {
+    Objects.requireNonNull(columns);
+    setColumns(ImmutableSet.copyOf(columns));
+    return this;
+  }
+
+  @Override
+  public ScannerBuilder fetch(Column... columns) {
+    Objects.requireNonNull(columns);
+    setColumns(ImmutableSet.copyOf(columns));
+    return this;
+  }
+
+  @Override
+  public CellScanner build() {
+    SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns);
+    return new CellScannerImpl(snapScanner);
+  }
+
+  @Override
+  public RowScannerBuilder byRow() {
+    return new RowScannerBuilder() {
+      @Override
+      public RowScanner build() {
+        SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns);
+        return new RowScannerImpl(snapScanner);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
index da6b8dd..fe5c21d 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
@@ -23,14 +23,13 @@ import java.util.Set;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.exceptions.AlreadySetException;
 import org.apache.fluo.api.exceptions.CommitException;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.core.async.AsyncCommitObserver;
 import org.apache.fluo.core.async.AsyncTransaction;
 import org.apache.fluo.core.impl.Notification;
@@ -179,10 +178,10 @@ public class TracingTransaction implements AsyncTransaction, Snapshot {
   }
 
   @Override
-  public RowIterator get(ScannerConfiguration config) {
+  public ScannerBuilder scanner() {
     // TODO log something better (see fluo-425)
-    log.trace("txid: {} get(ScannerConfiguration)", txid);
-    return tx.get(config);
+    log.trace("txid: {} newScanner()", txid);
+    return tx.scanner();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java b/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java
deleted file mode 100644
index f3cc7af..0000000
--- a/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.config;
-
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Unit test for ScannerConfiguration class
- */
-public class ScannerConfigurationTest {
-
-  @Test
-  public void testSetGet() {
-
-    ScannerConfiguration config = new ScannerConfiguration();
-    Assert.assertEquals(new Span(), config.getSpan());
-    Assert.assertEquals(0, config.getColumns().size());
-
-    config = new ScannerConfiguration();
-    config.setSpan(Span.exact("row1"));
-    Assert.assertEquals(Span.exact("row1"), config.getSpan());
-    Assert.assertEquals(0, config.getColumns().size());
-
-    config = new ScannerConfiguration();
-    config.fetchColumnFamily(Bytes.of("cf1"));
-    Assert.assertEquals(1, config.getColumns().size());
-    Assert.assertEquals(new Column("cf1"), config.getColumns().iterator().next());
-
-    config = new ScannerConfiguration();
-    config.fetchColumn(Bytes.of("cf2"), Bytes.of("cq2"));
-    Assert.assertEquals(1, config.getColumns().size());
-    Assert.assertEquals(new Column("cf2", "cq2"), config.getColumns().iterator().next());
-
-    config = new ScannerConfiguration();
-    config.fetchColumnFamily(Bytes.of("a"));
-    config.fetchColumnFamily(Bytes.of("b"));
-    config.fetchColumnFamily(Bytes.of("a"));
-    Assert.assertEquals(2, config.getColumns().size());
-
-    config.clearColumns();
-    Assert.assertEquals(0, config.getColumns().size());
-  }
-
-  @Test
-  public void testNullSet() {
-
-    ScannerConfiguration config = new ScannerConfiguration();
-
-    try {
-      config.setSpan(null);
-      Assert.fail();
-    } catch (NullPointerException e) {
-    }
-
-    try {
-      config.fetchColumnFamily(null);
-      Assert.fail();
-    } catch (NullPointerException e) {
-    }
-
-    try {
-      config.fetchColumn(null, Bytes.of("qual"));
-      Assert.fail();
-    } catch (NullPointerException e) {
-    }
-
-    try {
-      config.fetchColumn(Bytes.of("fam"), null);
-      Assert.fail();
-    } catch (NullPointerException e) {
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
index 4cdf146..5e2a7f9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
@@ -18,7 +18,6 @@ package org.apache.fluo.integration;
 import java.io.File;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.client.Connector;
@@ -32,11 +31,7 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.api.data.RowColumnValue;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -99,18 +94,12 @@ public class ITBase {
 
   protected void printSnapshot() throws Exception {
     try (Snapshot s = client.newSnapshot()) {
-      RowIterator iter = s.get(new ScannerConfiguration());
-
       System.out.println("== snapshot start ==");
-      while (iter.hasNext()) {
-        Entry<Bytes, ColumnIterator> rowEntry = iter.next();
-        ColumnIterator citer = rowEntry.getValue();
-        while (citer.hasNext()) {
-          Entry<Column, Bytes> colEntry = citer.next();
-          System.out.println(rowEntry.getKey() + " " + colEntry.getKey() + "\t"
-              + colEntry.getValue());
-        }
+
+      for (RowColumnValue rcv : s.scanner().build()) {
+        System.out.println(rcv.getRow() + " " + rcv.getColumn() + "\t" + rcv.getValue());
       }
+
       System.out.println("=== snapshot end ===");
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
index e2ed47a..3a541d9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
@@ -77,6 +77,7 @@ public class ITBaseImpl extends ITBase {
     config.setTransactionRollbackTime(1, TimeUnit.SECONDS);
     config.addObservers(getObservers());
     config.setProperty(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, "1000");
+    config.setMiniStartAccumulo(false);
 
     try (FluoAdmin admin = FluoFactory.newAdmin(config)) {
       InitOpts opts = new InitOpts().setClearZookeeper(true).setClearTable(true);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
index b64cda5..ceda193 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
@@ -31,14 +31,13 @@ import org.apache.fluo.accumulo.iterators.NotificationIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
 import org.apache.fluo.accumulo.util.NotificationUtil;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.AlreadySetException;
 import org.apache.fluo.api.exceptions.CommitException;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
@@ -205,8 +204,8 @@ public class TestTransaction implements TransactionBase {
   }
 
   @Override
-  public RowIterator get(ScannerConfiguration config) {
-    return tx.get(config);
+  public ScannerBuilder scanner() {
+    return tx.scanner();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
index 321374b..086f1d9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
@@ -29,15 +29,14 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.CommitException;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
 import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
 import org.apache.fluo.core.impl.Environment;
@@ -469,14 +468,11 @@ public class FluoIT extends ITBaseImpl {
     tx3.done();
 
     HashSet<Column> columns = new HashSet<>();
-    RowIterator riter =
-        tx2.get(new ScannerConfiguration().setSpan(Span.exact(Bytes.of("d00001"),
-            new Column(Bytes.of("outlink")))));
-    while (riter.hasNext()) {
-      ColumnIterator citer = riter.next().getValue();
-      while (citer.hasNext()) {
-        columns.add(citer.next().getKey());
-      }
+
+    CellScanner cellScanner =
+        tx2.scanner().over(Span.exact(Bytes.of("d00001"))).fetch(new Column("outlink")).build();
+    for (RowColumnValue rcv : cellScanner) {
+      columns.add(rcv.getColumn());
     }
 
     tx2.done();
@@ -490,15 +486,12 @@ public class FluoIT extends ITBaseImpl {
 
     TestTransaction tx4 = new TestTransaction(env);
     columns.clear();
-    riter =
-        tx4.get(new ScannerConfiguration().setSpan(Span.exact(Bytes.of("d00001"),
-            new Column(Bytes.of("outlink")))));
-    while (riter.hasNext()) {
-      ColumnIterator citer = riter.next().getValue();
-      while (citer.hasNext()) {
-        columns.add(citer.next().getKey());
-      }
+    cellScanner =
+        tx4.scanner().over(Span.exact(Bytes.of("d00001"))).fetch(new Column("outlink")).build();
+    for (RowColumnValue rcv : cellScanner) {
+      columns.add(rcv.getColumn());
     }
+
     expected.add(new Column("outlink", "http://z.com"));
     expected.remove(new Column("outlink", "http://b.com"));
     Assert.assertEquals(expected, columns);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
new file mode 100644
index 0000000..bdf473d
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.impl;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.integration.ITBaseImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ScannerIT extends ITBaseImpl {
+
+  static class ColumnPredicate implements Predicate<RowColumnValue> {
+    Column c;
+
+    ColumnPredicate(Column c) {
+      this.c = c;
+    }
+
+    @Override
+    public boolean apply(RowColumnValue input) {
+      return input.getColumn().equals(c);
+    }
+  }
+
+  static class FamilyPredicate implements Predicate<RowColumnValue> {
+    Bytes fam;
+
+    FamilyPredicate(String family) {
+      this.fam = Bytes.of(family);
+    }
+
+    @Override
+    public boolean apply(RowColumnValue input) {
+      return input.getColumn().getFamily().equals(fam);
+    }
+  }
+
+  static class RowPredicate implements Predicate<RowColumnValue> {
+    Bytes row;
+
+    RowPredicate(String row) {
+      this.row = Bytes.of(row);
+    }
+
+    @Override
+    public boolean apply(RowColumnValue input) {
+      return input.getRow().equals(row);
+    }
+  }
+
+  @Test
+  public void testFiltering() {
+    Set<RowColumnValue> expected = genData();
+
+    HashSet<RowColumnValue> expectedR2 = new HashSet<>();
+    Iterables.addAll(expectedR2, Iterables.filter(expected, new RowPredicate("r2")));
+    Assert.assertEquals(2, expectedR2.size());
+
+
+    HashSet<RowColumnValue> expectedR2c = new HashSet<>();
+    Iterables.addAll(
+        expectedR2c,
+        Iterables.filter(expected,
+            Predicates.and(new RowPredicate("r2"), new ColumnPredicate(new Column("f1", "q2")))));
+    Assert.assertEquals(1, expectedR2c.size());
+
+    HashSet<RowColumnValue> expectedC = new HashSet<>();
+    Iterables.addAll(expectedC,
+        Iterables.filter(expected, new ColumnPredicate(new Column("f1", "q1"))));
+    Assert.assertEquals(2, expectedC.size());
+
+    HashSet<RowColumnValue> expectedCF = new HashSet<>();
+    Iterables.addAll(expectedCF, Iterables.filter(expected, new FamilyPredicate("f2")));
+    Assert.assertEquals(2, expectedCF.size());
+
+    HashSet<RowColumnValue> expectedCols = new HashSet<>();
+    Iterables.addAll(expectedCols, Iterables.filter(expected, Predicates.or(new ColumnPredicate(
+        new Column("f2", "q5")), new ColumnPredicate(new Column("f1", "q1")))));
+    Assert.assertEquals(3, expectedCols.size());
+
+    try (Snapshot snap = client.newSnapshot()) {
+      HashSet<RowColumnValue> actual = new HashSet<>();
+      Iterables.addAll(actual, snap.scanner().over(Span.exact("r2")).build());
+      Assert.assertEquals(expectedR2, actual);
+
+      actual.clear();
+      Iterables.addAll(actual, snap.scanner().over(Span.exact("r2")).fetch(new Column("f1", "q2"))
+          .build());
+      Assert.assertEquals(expectedR2c, actual);
+
+      actual.clear();
+      Iterables.addAll(actual, snap.scanner().fetch(new Column("f1", "q1")).build());
+      Assert.assertEquals(expectedC, actual);
+
+      actual.clear();
+      Iterables.addAll(actual, snap.scanner().fetch(new Column("f2")).build());
+      Assert.assertEquals(expectedCF, actual);
+
+      actual.clear();
+      Iterables.addAll(actual, snap.scanner().fetch(new Column("f2", "q5"), new Column("f1", "q1"))
+          .build());
+      Assert.assertEquals(expectedCols, actual);
+    }
+
+  }
+
+  @Test
+  public void testMultipleIteratorsFromSameRowScanner() {
+    Set<RowColumnValue> expected = genData();
+
+    try (Snapshot snap = client.newSnapshot()) {
+      RowScanner rowScanner = snap.scanner().byRow().build();
+
+      Iterator<ColumnScanner> iter1 = rowScanner.iterator();
+      Iterator<ColumnScanner> iter2 = rowScanner.iterator();
+
+      HashSet<RowColumnValue> actual1 = new HashSet<>();
+      HashSet<RowColumnValue> actual2 = new HashSet<>();
+
+      while (iter1.hasNext()) {
+        ColumnScanner cs1 = iter1.next();
+
+        Assert.assertTrue(iter2.hasNext());
+        ColumnScanner cs2 = iter2.next();
+
+        for (ColumnValue cv : cs1) {
+          actual1.add(new RowColumnValue(cs1.getRow(), cv.getColumn(), cv.getValue()));
+        }
+
+        for (ColumnValue cv : cs2) {
+          actual2.add(new RowColumnValue(cs2.getRow(), cv.getColumn(), cv.getValue()));
+        }
+      }
+
+      Assert.assertFalse(iter2.hasNext());
+
+      Assert.assertEquals(expected, actual1);
+      Assert.assertEquals(expected, actual2);
+    }
+  }
+
+  @Test
+  public void testMultipleIteratorsFromSameIterable() {
+
+    Set<RowColumnValue> expected = genData();
+
+    try (Snapshot snap = client.newSnapshot()) {
+      CellScanner cellScanner = snap.scanner().build();
+      // grab two iterators from same iterable and iterator over them in interleaved fashion
+      Iterator<RowColumnValue> iter1 = cellScanner.iterator();
+      Iterator<RowColumnValue> iter2 = cellScanner.iterator();
+
+      HashSet<RowColumnValue> actual1 = new HashSet<>();
+      HashSet<RowColumnValue> actual2 = new HashSet<>();
+
+      while (iter1.hasNext()) {
+        Assert.assertTrue(iter2.hasNext());
+        actual1.add(iter1.next());
+        actual2.add(iter2.next());
+      }
+
+      Assert.assertFalse(iter2.hasNext());
+
+      Assert.assertEquals(expected, actual1);
+      Assert.assertEquals(expected, actual2);
+    }
+  }
+
+  private Set<RowColumnValue> genData() {
+    Set<RowColumnValue> expected = new HashSet<>();
+    expected.add(new RowColumnValue("r1", new Column("f1", "q1"), "v1"));
+    expected.add(new RowColumnValue("r1", new Column("f2", "q3"), "v2"));
+    expected.add(new RowColumnValue("r2", new Column("f1", "q1"), "v3"));
+    expected.add(new RowColumnValue("r2", new Column("f1", "q2"), "v4"));
+    expected.add(new RowColumnValue("r4", new Column("f2", "q5"), "v5"));
+
+    Assert.assertEquals(5, expected.size());
+
+    try (Transaction tx = client.newTransaction()) {
+      for (RowColumnValue rcv : expected) {
+        tx.set(rcv.getRow(), rcv.getColumn(), rcv.getValue());
+      }
+      tx.commit();
+    }
+
+    return expected;
+  }
+}